wcs/wcs/formdata.py

654 lines
23 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import copy
import datetime
import json
import re
import sys
import time
from quixote import get_request, get_publisher, get_session
from quixote.http_request import Upload
from qommon.storage import StorableObject
import qommon.misc
from qommon.substitution import Substitutions
from roles import Role
from fields import FileField
def get_dict_with_varnames(fields, data, formdata=None, varnames_only=False):
new_data = {}
for field in fields:
if not hasattr(field, 'get_view_value'):
continue
raw_value = None
if data is not None:
value = data.get(field.id)
if field.convert_value_to_str:
raw_value = value
value = field.convert_value_to_str(value)
display_value = data.get('%s_display' % field.id)
else:
value = ''
display_value = ''
if not varnames_only:
# add it as f$n$
new_data['f%s' % field.id] = value
# also add it as 'field_' + normalized(field label)
identifier_name = qommon.misc.simplify(field.label, space = '_')
new_data['field_' + identifier_name] = value
# and finally add it as its manually defined variable name
if field.varname:
if field.store_display_value:
new_data['var_%s_raw' % field.varname] = value
new_data['var_%s' % field.varname] = display_value
else:
new_data['var_%s' % field.varname] = value
if isinstance(value, Upload):
new_data['var_%s_raw' % field.varname] = value
new_data['var_%s' % field.varname] = value.base_filename
if formdata is not None:
new_data['var_%s_url' % field.varname] = '%sdownload?f=%s' % (
formdata.get_url(), field.id)
elif raw_value:
new_data['var_%s_raw' % field.varname] = raw_value
if field.store_structured_value:
structured_value = data.get('%s_structured' % field.id)
if structured_value:
for k, v in structured_value.items():
if k in ('id', 'text'):
continue
new_data['var_%s_%s' % (field.varname, k)] = v
return new_data
def flatten_dict(d):
for k, v in d.items():
if type(v) is dict:
flatten_dict(v)
for k2, v2 in v.items():
d['%s_%s' % (k, k2)] = v2
del d[k]
def get_json_dict(fields, data, include_files=True):
new_data = {}
for field in fields:
if not field.varname: # exports only named fields
continue
if not include_files and isinstance(field, FileField):
continue
if data is not None:
value = data.get(field.id)
if value and hasattr(field, 'get_json_value'):
value = field.get_json_value(value)
else:
value = display_value = None
if field.store_display_value:
new_data[field.varname + '_raw'] = value
new_data[field.varname] = data.get('%s_display' % field.id)
else:
new_data[field.varname] = value
return new_data
class Evolution(object):
who = None
status = None
time = None
comment = None
parts = None
def get_author_name(self):
if self.who == '_submitter':
return _('Original Submitter')
else:
return get_publisher().user_class.get(self.who).display_name
def add_part(self, part):
if not self.parts:
self.parts = []
self.parts.append(part)
def display_parts(self):
if not self.parts:
return []
l = []
for p in self.parts:
if not hasattr(p, 'view'):
continue
l.append(p.view())
return l
class FormData(StorableObject):
_names = 'XX'
_hashed_indexes = ['user_id', 'user_hash', 'status', 'workflow_roles',
'concerned_roles', 'actions_roles']
id_display = None
user_id = None
user_hash = None
receipt_time = None
status = None
anonymised = None
page_no = 0 # page to use when restoring from draft
evolution = None
data = None
editable_by = None
tracking_code = None
backoffice_submission = False
submission_context = None
workflow_data = None
workflow_roles = None
_formdef = None
def get_formdef(self):
if self._formdef:
return self._formdef
from formdef import FormDef
type, id = self._names.split('-', 1)
try:
self._formdef = FormDef.get_by_urlname(id)
except KeyError:
self._formdef = None
return self._formdef
formdef = property(get_formdef)
def __init__(self, id=None):
self.id = id
def migrate(self):
changed = False
if self.status and not self.status.startswith('wf-') and self.status != 'draft':
self.status = 'wf-%s' % self.status
changed = True
if self.evolution:
for evo in self.evolution:
if evo.status and not evo.status.startswith('wf-'):
evo.status = 'wf-%s' % evo.status
changed = True
if changed:
self.store()
def store(self, *args, **kwargs):
# make sure the class set under the formdef name in the sys.modules
# namespaces is the exact one that was used when creating this
# particular object, as it is required by pickle (or it will raise
# "Can't pickle %r: it's not the same object as %s.%s" if the class
# object has been changed in the course of the request).
setattr(sys.modules['formdef'], self._formdef.url_name.title(), self.__class__)
setattr(sys.modules['wcs.formdef'], self._formdef.url_name.title(), self.__class__)
super(FormData, self).store(*args, **kwargs)
def get_user(self):
if self.user_id and self.user_id != 'ultra-user':
return get_publisher().user_class.get(self.user_id, ignore_errors=True)
return None
def set_user(self, user):
self.user_hash = None
try:
self.user_hash = user.hash
self.user_id = None
except AttributeError:
if user:
self.user_id = user.id
else:
self.user_id = None
user = property(get_user, set_user)
def just_created(self):
self.receipt_time = time.localtime()
self.status = 'wf-%s' % self.formdef.workflow.possible_status[0].id
# we add the initial status to the history, this makes it more readable
# afterwards (also this gets the (previous_status) code to work in all
# cases)
evo = Evolution()
evo.who = '_submitter'
evo.time = self.receipt_time
evo.status = self.status
self.evolution = [evo]
def perform_workflow(self):
url = None
get_publisher().substitutions.feed(self)
wf_status = self.get_status()
url = wf_status.perform_items(self)
return url
def display_workflow_message(self):
wf_status = self.get_status()
if not wf_status:
return ''
for status in wf_status.items:
if hasattr(status, 'get_message'):
return status.get_message(self)
return ''
def get_status(self, status = None):
if not status:
status = self.status
if status is None:
return None
if not self.formdef:
return None
try:
status_id = status.split('-')[1]
wf_status = [x for x in self.formdef.workflow.possible_status if x.id == status_id][0]
except IndexError:
return None
return wf_status
def get_status_label(self, status = None):
wf_status = self.get_status(status)
if not wf_status:
return _('Unknown')
return wf_status.name
def get_visible_status(self, user=None):
if not self.evolution:
return self.get_status()
if not user:
user = get_request().user
for evo in reversed(self.evolution):
if not evo.status:
continue
wf_status = self.get_status(evo.status)
if not wf_status:
continue
if not wf_status.is_visible(self, user):
continue
return wf_status
return None
def get_workflow_form(self, user):
wf_status = self.get_status()
if not wf_status:
return None
return wf_status.get_action_form(self, user)
def get_workflow_subdirectories(self):
wf_status = self.get_status()
if not wf_status: # draft
return []
return wf_status.get_subdirectories(self)
def handle_workflow_form(self, user, form):
wf_status = self.get_status()
if not wf_status:
return None
return wf_status.handle_form(form, self, user)
def jump_status(self, status_id):
evo = Evolution()
evo.time = time.localtime()
evo.status = 'wf-%s' % status_id
if not self.evolution:
self.evolution = []
self.evolution.append(evo)
self.status = evo.status
self.store()
def get_url(self, backoffice = False):
return '%s%s/' % (self.formdef.get_url(backoffice=backoffice), self.id)
def get_display_id(self):
return self.id_display or self.id
def get_handling_role_id(self):
# TODO: look at current status and return the role(s) actually
# concerned by the handling of the formdata
from wcs.workflows import get_role_translation
return get_role_translation(self, '_receiver')
def get_handling_role(self):
try:
return Role.get(self.get_handling_role_id())
except KeyError:
return None
def get_field_value(self, field):
try:
x = [x for x in self.formdef.fields if x.label == field][0]
except IndexError:
return None
return self.data.get(x.id)
def update_workflow_data(self, dict):
if not self.workflow_data:
self.workflow_data = {}
self.workflow_data.update(dict)
def get_as_dict(self):
return get_dict_with_varnames(self.formdef.fields, self.data, self)
def get_substitution_variables(self, minimal=False):
d = {}
if self.id:
d.update({
'form_receipt_date': qommon.strftime.strftime(qommon.misc.date_format(), self.receipt_time),
'form_receipt_time': qommon.strftime.strftime('%H:%M', self.receipt_time),
'form_number': str(self.get_display_id()),
'form_number_raw': '%s' % self.id,
'form_url': self.get_url(),
'form_url_backoffice': self.get_url(backoffice=True),
'form_uri': '%s/%s/' % (self.formdef.url_name, self.id),
})
d['form_status'] = self.get_status_label()
if self.id and self.formdef.workflow and self.status:
endpoint_status_ids = ['wf-%s' % x.id for x in self.formdef.workflow.get_endpoint_status()]
is_endpoint_status = (self.status in endpoint_status_ids)
d['form_status_is_endpoint'] = is_endpoint_status
if self.tracking_code:
d['form_tracking_code'] = self.tracking_code
# formdef and category variables
d.update(self.formdef.get_substitution_variables(minimal=minimal))
if minimal:
d = copy.deepcopy(d)
flatten_dict(d)
return d
if self.id:
d.update({
'form_status_url': '%sstatus' % self.get_url(),
'form_details': self.formdef.get_detailed_email_form(self, self.get_url()),
})
user = self.get_user()
if user:
d.update(user.get_substitution_variables(prefix='form_'))
for k, v in self.get_as_dict().items():
d['form_'+k] = v
# include substitution variables for workflow roles; this will
# typically give variables such as form_role_receiver_name and
# form_role_receiver_emails.
workflow_roles = {}
if self.formdef.workflow_roles:
workflow_roles.update(self.formdef.workflow_roles)
if self.workflow_roles:
workflow_roles.update(self.workflow_roles)
for role_type, role_id in workflow_roles.items():
prefix = 'form_role_%s_' % role_type.strip('_')
try:
d.update(Role.get(role_id).get_substitution_variables(prefix))
except KeyError:
pass
if self.evolution and self.evolution[-1].comment:
d['form_comment'] = self.evolution[-1].comment
else:
d['form_comment'] = ''
d['form_previous_status'] = ''
if self.evolution:
for evolution in reversed(self.evolution):
if evolution.status and evolution.status != self.status:
d['form_previous_status'] = self.get_status_label(evolution.status)
break
if d['form_status'] != d['form_previous_status']:
d['form_status_changed'] = True
d['form_evolution'] = self.formdef.get_detailed_evolution(self)
if self.formdef.workflow and self.status:
wf_status = self.get_status()
if wf_status:
for item in wf_status.items:
d.update(item.get_substitution_variables(self))
if self.workflow_data:
d.update(self.workflow_data)
# pass over uploaded files and attach an extra attribute with the
# url to the file.
for k, v in self.workflow_data.items():
if isinstance(v, Upload):
try:
formvar, fieldvar = re.match('(.*)_var_(.*)_raw$', k).groups()
except AttributeError:
continue
d[k.rsplit('_', 1)[0] + '_url'] = '%sfiles/form-%s-%s/%s' % (
d['form_url'], formvar, fieldvar,
self.workflow_data['%s_var_%s' % (formvar, fieldvar)])
d = copy.deepcopy(d)
flatten_dict(d)
return d
def get_substitution_variables_list(cls):
variables = []
# we can't advertise fields, as this is a metaclass that will be used
# in FormDef.data_class() to create a real class
for field in []: # cls.formdef.fields:
# we only advertise fields with a varname, as they can be
# considered stable
if field.varname:
variables.append((
_('Form'), 'form_var_'+field.varname,
_('Form Field: %s') % field.label))
user_variables = get_publisher().user_class.get_substitution_variables_list(prefix='form_')
for cat, name, comment in user_variables:
variables.append((_('Form'), name, _('Form Submitter Field')))
return variables
get_substitution_variables_list = classmethod(get_substitution_variables_list)
def rebuild_security(cls):
cls.rebuild_indexes(indexes=['concerned_roles', 'actions_roles'])
rebuild_security = classmethod(rebuild_security)
def is_submitter(self, user):
if self.user_id and user and str(self.user_id) == str(user.id):
return True
try:
if self.user_hash and self.user_hash == user.hash:
return True
except AttributeError:
return False
if get_session() and get_session().is_anonymous_submitter(self):
return True
return False
def is_draft(self):
return self.status in ('draft', 'wf-draft')
def get_concerned_roles(self):
if self.is_draft():
# drafts are only visible to submitter
return ['_submitter']
status_action_roles = set()
# make sure the handling roles always gets access to the formdata, till
# the very end (where it may be that there is no workflow status item
# at all).
from wcs.workflows import get_role_translation
for function_key in self.formdef.workflow.roles.keys():
handling_role = get_role_translation(self, function_key)
if handling_role:
status_action_roles.add(handling_role)
wf_status = self.get_status()
if not wf_status:
status_action_roles.add('_submitter')
else:
status_action_roles |= set(self.get_actions_roles())
return status_action_roles
concerned_roles = property(get_concerned_roles)
def get_actions_roles(self):
if self.is_draft():
return []
wf_status = self.get_status()
if not wf_status:
return []
from wcs.workflows import get_role_translation
status_action_roles = set()
for item in wf_status.items or []:
if not hasattr(item, 'by') or not item.by:
continue
for role in item.by:
if role == '_submitter':
status_action_roles.add(role)
else:
real_role = get_role_translation(self, role)
if real_role:
status_action_roles.add(real_role)
return status_action_roles
actions_roles = property(get_actions_roles)
def get_last_update_time(self):
if self.evolution and self.evolution[-1].time:
return self.evolution[-1].time
else:
return self.receipt_time
last_update_time = property(get_last_update_time)
def anonymise(self):
for field in self.formdef.fields:
if field.anonymise:
self.data[field.id] = None
if field.store_display_value:
self.data['%s_display' % field.id] = None
self.anonymised = datetime.datetime.now()
self.user_id = None
self.user_hash = None
self.editable_by = None
self.workflow_data = None
self.workflow_roles = None
if self.evolution:
for evo in self.evolution:
evo.who = None
evo.parts = None
evo.comment = None
evo.parts = None
self.store()
def get_json_export_dict(self, include_files=True):
data = {}
data['id'] = '%s/%s' % (self.formdef.url_name, self.id)
data['display_id'] = self.get_display_id()
data['display_name'] = _('%(name)s #%(id)s') % {
'name': self.formdef.name,
'id': data['display_id']}
data['receipt_time'] = self.receipt_time
data['last_update_time'] = self.last_update_time
data['url'] = self.get_url()
try:
user = get_publisher().user_class.get(self.user_id)
except KeyError:
user = None
# this is custom code so it is possible to mark forms as anonyms, this
# is done through the VoteAnonymity field, this is very specific but
# isn't generalised yet into an useful extension mechanism, as it's not
# clear at the moment what could be useful.
for f in self.formdef.fields:
if f.key == 'vote-anonymity':
user = None
break
if user:
data['user'] = {'id': user.id, 'name': user.display_name}
data['fields'] = get_json_dict(self.formdef.fields, self.data,
include_files=include_files)
data['workflow'] = {}
wf_status = self.get_visible_status()
if wf_status:
data['workflow']['status'] = {'id': wf_status.id, 'name': wf_status.name}
if self.workflow_data:
data['workflow']['data'] = self.workflow_data
return data
def export_to_json(self, include_files=True):
data = self.get_json_export_dict(include_files=include_files)
return json.dumps(data,
cls=qommon.misc.JSONEncoder,
encoding=get_publisher().site_charset)
def feed_session(self):
# this gives a chance to fields to initialize things that would rely on
# current data ahead of times
for field in self.formdef.fields:
field.feed_session(self.data.get(field.id),
self.data.get('%s_display' % field.id))
def __getattr__(self, attr):
try:
return self.__dict__[attr]
except KeyError:
# give direct access to values from the data dictionary
if attr[0] == 'f':
return self.__dict__['data'][attr[1:]]
raise AttributeError(attr)
# don't pickle _formdef cache
def __getstate__(self):
odict = self.__dict__
if odict.has_key('_formdef'):
del odict['_formdef']
return odict
def __setstate__(self, dict):
if '_formdef' in dict:
# there was a time, before October 2007 and 48e46bf0, and pickled
# objects had a _formdef, in case these objects still exists, we
# remove the _formdef beforehand, so it doesn't interfere with the
# cached _formdef already set in data_class()
del dict['_formdef']
self.__dict__ = dict
Substitutions.register('form_receipt_date', category=N_('Form'), comment=N_('Form Receipt Date'))
Substitutions.register('form_receipt_time', category=N_('Form'), comment=N_('Form Receipt Time'))
Substitutions.register('form_number', category=N_('Form'), comment=N_('Form Number'))
Substitutions.register('form_details', category=N_('Form'), comment=N_('Form Details'))
Substitutions.register('form_url', category=N_('Form'), comment=N_('Form URL'))
Substitutions.register('form_url_backoffice', category=N_('Form'), comment=N_('Form URL (backoffice)'))
Substitutions.register('form_status_url', category=N_('Form'), comment=N_('Form Status URL'))
Substitutions.register('form_tracking_code', category=N_('Form'), comment=N_('Form Tracking Code'))
Substitutions.register('form_user', category=N_('Form'), comment=N_('Form Submitter'))
Substitutions.register('form_user_display_name', category=N_('Form'), comment=N_('Form Submitter Name'))
Substitutions.register('form_user_email', category=N_('Form'), comment=N_('Form Submitter Email'))
Substitutions.register_dynamic_source(FormData)