567 lines
19 KiB
Python
567 lines
19 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import copy
|
|
import datetime
|
|
import json
|
|
import time
|
|
|
|
from quixote import get_request, get_publisher
|
|
|
|
from qommon.storage import StorableObject
|
|
import qommon.misc
|
|
from qommon.substitution import Substitutions
|
|
|
|
from roles import Role
|
|
|
|
|
|
def get_dict_with_varnames(fields, data):
|
|
new_data = {}
|
|
for field in fields:
|
|
if not hasattr(field, 'get_view_value'):
|
|
continue
|
|
if data is not None:
|
|
value = data.get(field.id)
|
|
if field.convert_value_to_str:
|
|
value = field.convert_value_to_str(value)
|
|
display_value = data.get('%s_display' % field.id)
|
|
else:
|
|
value = ''
|
|
display_value = ''
|
|
# add it as f$n$
|
|
new_data['f%s' % field.id] = value
|
|
|
|
# also add it as 'field_' + normalized(field label)
|
|
identifier_name = qommon.misc.simplify(field.label, space = '_')
|
|
new_data['field_' + identifier_name] = value
|
|
|
|
# and finally add it as its manually defined variable name
|
|
if field.varname:
|
|
if field.store_display_value:
|
|
new_data['var_%s_raw' % field.varname] = value
|
|
new_data['var_%s' % field.varname] = display_value
|
|
else:
|
|
new_data['var_%s' % field.varname] = value
|
|
return new_data
|
|
|
|
def flatten_dict(d):
|
|
for k, v in d.items():
|
|
if type(v) is dict:
|
|
flatten_dict(v)
|
|
for k2, v2 in v.items():
|
|
d['%s_%s' % (k, k2)] = v2
|
|
del d[k]
|
|
|
|
|
|
def get_json_dict(fields, data):
|
|
new_data = {}
|
|
for field in fields:
|
|
if not field.varname: # exports only named fields
|
|
continue
|
|
if data is not None:
|
|
value = data.get(field.id)
|
|
if value and hasattr(field, 'get_json_value'):
|
|
value = field.get_json_value(value)
|
|
else:
|
|
value = display_value = None
|
|
if field.store_display_value:
|
|
new_data[field.varname + '_raw'] = value
|
|
new_data[field.varname] = data.get('%s_display' % field.id)
|
|
else:
|
|
new_data[field.varname] = value
|
|
return new_data
|
|
|
|
class JSONEncoder(json.JSONEncoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, time.struct_time):
|
|
return qommon.strftime.strftime('%Y-%m-%dT%H:%M:%S', obj)
|
|
# Let the base class default method raise the TypeError
|
|
return json.JSONEncoder.default(self, obj)
|
|
|
|
|
|
class Evolution:
|
|
who = None
|
|
status = None
|
|
time = None
|
|
comment = None
|
|
parts = None
|
|
|
|
def get_author_name(self):
|
|
if self.who == '_submitter':
|
|
return _('Original Submitter')
|
|
else:
|
|
return get_publisher().user_class.get(self.who).display_name
|
|
|
|
def add_part(self, part):
|
|
if not self.parts:
|
|
self.parts = []
|
|
self.parts.append(part)
|
|
|
|
def display_parts(self):
|
|
if not self.parts:
|
|
return []
|
|
|
|
l = []
|
|
for p in self.parts:
|
|
if not hasattr(p, 'view'):
|
|
continue
|
|
l.append(p.view())
|
|
return l
|
|
|
|
|
|
class FormData(StorableObject):
|
|
_names = 'XX'
|
|
_hashed_indexes = ['user_id', 'user_hash', 'status', 'workflow_roles',
|
|
'concerned_roles']
|
|
|
|
id_display = None
|
|
|
|
user_id = None
|
|
user_hash = None
|
|
receipt_time = None
|
|
status = None
|
|
anonymised = None
|
|
page_no = None # page to use when restoring from draft
|
|
evolution = None
|
|
data = None
|
|
editable_by = None
|
|
|
|
workflow_data = None
|
|
workflow_roles = None
|
|
|
|
_formdef = None
|
|
def get_formdef(self):
|
|
if self._formdef:
|
|
return self._formdef
|
|
from formdef import FormDef
|
|
type, id = self._names.split('-', 1)
|
|
try:
|
|
self._formdef = FormDef.get_by_urlname(id)
|
|
except KeyError:
|
|
self._formdef = None
|
|
return self._formdef
|
|
formdef = property(get_formdef)
|
|
|
|
def __init__(self, id=None):
|
|
self.id = id
|
|
|
|
def migrate(self):
|
|
changed = False
|
|
if self.status and not self.status.startswith('wf-'):
|
|
self.status = 'wf-%s' % self.status
|
|
changed = True
|
|
if self.evolution:
|
|
for evo in self.evolution:
|
|
if evo.status and not evo.status.startswith('wf-'):
|
|
evo.status = 'wf-%s' % evo.status
|
|
changed = True
|
|
if changed:
|
|
self.store()
|
|
|
|
def get_user(self):
|
|
if self.user_id and self.user_id != 'ultra-user':
|
|
return get_publisher().user_class.get(self.user_id, ignore_errors=True)
|
|
return None
|
|
|
|
def set_user(self, user):
|
|
self.user_hash = None
|
|
try:
|
|
self.user_hash = user.hash
|
|
self.user_id = None
|
|
except AttributeError:
|
|
if user:
|
|
self.user_id = user.id
|
|
else:
|
|
self.user_id = None
|
|
user = property(get_user, set_user)
|
|
|
|
def just_created(self):
|
|
self.receipt_time = time.localtime()
|
|
self.status = 'wf-%s' % self.formdef.workflow.possible_status[0].id
|
|
# we add the initial status to the history, this makes it more readable
|
|
# afterwards (also this gets the (previous_status) code to work in all
|
|
# cases)
|
|
evo = Evolution()
|
|
evo.who = '_submitter'
|
|
evo.time = self.receipt_time
|
|
evo.status = self.status
|
|
self.evolution = [evo]
|
|
|
|
def perform_workflow(self):
|
|
url = None
|
|
get_publisher().substitutions.feed(self)
|
|
wf_status = self.get_status()
|
|
url = wf_status.perform_items(self)
|
|
return url
|
|
|
|
def display_workflow_message(self):
|
|
wf_status = self.get_status()
|
|
if not wf_status:
|
|
return ''
|
|
for status in wf_status.items:
|
|
if hasattr(status, 'get_message'):
|
|
return status.get_message(self)
|
|
return ''
|
|
|
|
def get_status(self, status = None):
|
|
if not status:
|
|
status = self.status
|
|
if status is None:
|
|
return None
|
|
if not self.formdef:
|
|
return None
|
|
try:
|
|
status_id = status.split('-')[1]
|
|
wf_status = [x for x in self.formdef.workflow.possible_status if x.id == status_id][0]
|
|
except IndexError:
|
|
return None
|
|
return wf_status
|
|
|
|
def get_status_label(self, status = None):
|
|
wf_status = self.get_status(status)
|
|
if not wf_status:
|
|
return _('Unknown')
|
|
return wf_status.name
|
|
|
|
def get_visible_status(self, user=None):
|
|
if not self.evolution:
|
|
return self.get_status()
|
|
if not user:
|
|
user = get_request().user
|
|
for evo in reversed(self.evolution):
|
|
if not evo.status:
|
|
continue
|
|
wf_status = self.get_status(evo.status)
|
|
if not wf_status:
|
|
continue
|
|
if not wf_status.is_visible(self, user):
|
|
continue
|
|
return wf_status
|
|
return None
|
|
|
|
def get_workflow_form(self, user):
|
|
wf_status = self.get_status()
|
|
if not wf_status:
|
|
return None
|
|
return wf_status.get_action_form(self, user)
|
|
|
|
def get_workflow_subdirectories(self):
|
|
wf_status = self.get_status()
|
|
if not wf_status: # draft
|
|
return []
|
|
return wf_status.get_subdirectories(self)
|
|
|
|
def handle_workflow_form(self, user, form):
|
|
wf_status = self.get_status()
|
|
if not wf_status:
|
|
return None
|
|
return wf_status.handle_form(form, self, user)
|
|
|
|
def jump_status(self, status_id):
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
evo.status = 'wf-%s' % status_id
|
|
if not self.evolution:
|
|
self.evolution = []
|
|
self.evolution.append(evo)
|
|
self.status = evo.status
|
|
self.store()
|
|
|
|
def get_url(self, backoffice = False):
|
|
return '%s%s/' % (self.formdef.get_url(backoffice=backoffice), self.id)
|
|
|
|
def get_display_id(self):
|
|
return self.id_display or self.id
|
|
|
|
def get_handling_role_id(self):
|
|
# TODO: look at current status and return the role(s) actually
|
|
# concerned by the handling of the formdata
|
|
from wcs.workflows import get_role_translation
|
|
return get_role_translation(self, '_receiver')
|
|
|
|
def get_handling_role(self):
|
|
from roles import Role
|
|
try:
|
|
return Role.get(self.get_handling_role_id())
|
|
except KeyError:
|
|
return None
|
|
|
|
def get_field_value(self, field):
|
|
try:
|
|
x = [x for x in self.formdef.fields if x.label == field][0]
|
|
except IndexError:
|
|
return None
|
|
return self.data.get(x.id)
|
|
|
|
def update_workflow_data(self, dict):
|
|
if not self.workflow_data:
|
|
self.workflow_data = {}
|
|
self.workflow_data.update(dict)
|
|
|
|
def get_as_dict(self):
|
|
return get_dict_with_varnames(self.formdef.fields, self.data)
|
|
|
|
def get_substitution_variables(self, minimal=False):
|
|
d = {}
|
|
|
|
if self.id:
|
|
d.update({
|
|
'form_receipt_date': qommon.strftime.strftime(qommon.misc.date_format(), self.receipt_time),
|
|
'form_receipt_time': qommon.strftime.strftime('%H:%M', self.receipt_time),
|
|
'form_number': str(self.get_display_id()),
|
|
'form_number_raw': '%s' % self.id,
|
|
'form_url': self.get_url(),
|
|
'form_url_backoffice': self.get_url(backoffice=True),
|
|
'form_uri': '%s/%s/' % (self.formdef.url_name, self.id),
|
|
})
|
|
|
|
d['form_status'] = self.get_status_label()
|
|
|
|
if self.id and self.formdef.workflow and self.status:
|
|
endpoint_status_ids = ['wf-%s' % x.id for x in self.formdef.workflow.get_endpoint_status()]
|
|
is_endpoint_status = (self.status in endpoint_status_ids)
|
|
d['form_status_is_endpoint'] = is_endpoint_status
|
|
|
|
# formdef and category variables
|
|
d.update(self.formdef.get_substitution_variables(minimal=minimal))
|
|
|
|
if minimal:
|
|
d = copy.deepcopy(d)
|
|
flatten_dict(d)
|
|
return d
|
|
|
|
if self.id:
|
|
d.update({
|
|
'form_status_url': '%sstatus' % self.get_url(),
|
|
'form_details': self.formdef.get_detailed_email_form(self, self.get_url()),
|
|
})
|
|
|
|
user = self.get_user()
|
|
if user:
|
|
d.update(user.get_substitution_variables(prefix='form_'))
|
|
|
|
data = get_dict_with_varnames(self.formdef.fields, self.data)
|
|
for k, v in data.items():
|
|
d['form_'+k] = v
|
|
|
|
for k, v in self.get_as_dict().items():
|
|
d['form_'+k] = v
|
|
|
|
# include substitution variables for workflow roles; this will
|
|
# typically give variables such as form_role_receiver_name and
|
|
# form_role_receiver_emails.
|
|
workflow_roles = self.formdef.workflow_roles.copy()
|
|
if self.workflow_roles:
|
|
workflow_roles.update(self.workflow_roles)
|
|
|
|
for role_type, role_id in workflow_roles.items():
|
|
prefix = 'form_role_%s_' % role_type.strip('_')
|
|
try:
|
|
d.update(Role.get(role_id).get_substitution_variables(prefix))
|
|
except KeyError:
|
|
pass
|
|
|
|
if self.evolution and self.evolution[-1].comment:
|
|
d['form_comment'] = self.evolution[-1].comment
|
|
else:
|
|
d['form_comment'] = ''
|
|
|
|
d['form_previous_status'] = ''
|
|
if self.evolution:
|
|
for evolution in reversed(self.evolution):
|
|
if evolution.status and evolution.status != self.status:
|
|
d['form_previous_status'] = self.get_status_label(evolution.status)
|
|
break
|
|
if d['form_status'] != d['form_previous_status']:
|
|
d['form_status_changed'] = True
|
|
|
|
d['form_evolution'] = self.formdef.get_detailed_evolution(self)
|
|
|
|
if self.formdef.workflow and self.status:
|
|
wf_status = self.get_status()
|
|
if wf_status:
|
|
for item in wf_status.items:
|
|
d.update(item.get_substitution_variables(self))
|
|
|
|
if self.workflow_data:
|
|
d.update(self.workflow_data)
|
|
|
|
d = copy.deepcopy(d)
|
|
flatten_dict(d)
|
|
|
|
return d
|
|
|
|
def get_substitution_variables_list(cls):
|
|
variables = []
|
|
# we can't advertise fields, as this is a metaclass that will be used
|
|
# in FormDef.data_class() to create a real class
|
|
for field in []: # cls.formdef.fields:
|
|
# we only advertise fields with a varname, as they can be
|
|
# considered stable
|
|
if field.varname:
|
|
variables.append((
|
|
_('Form'), 'form_var_'+field.varname,
|
|
_('Form Field: %s') % field.label))
|
|
user_variables = get_publisher().user_class.get_substitution_variables_list(prefix='form_')
|
|
for cat, name, comment in user_variables:
|
|
variables.append((_('Form'), name, _('Form Submitter Field')))
|
|
return variables
|
|
get_substitution_variables_list = classmethod(get_substitution_variables_list)
|
|
|
|
def rebuild_security(cls):
|
|
# this gets overridden in the SQL variant
|
|
pass
|
|
rebuild_security = classmethod(rebuild_security)
|
|
|
|
def is_submitter(self, user):
|
|
if self.user_id and str(self.user_id) == str(user.id):
|
|
return True
|
|
try:
|
|
if self.user_hash and self.user_hash == user.hash:
|
|
return True
|
|
except AttributeError:
|
|
return False
|
|
return False
|
|
|
|
def is_draft(self):
|
|
return self.status in ('draft', 'wf-draft')
|
|
|
|
def get_concerned_roles(self):
|
|
if self.is_draft():
|
|
# drafts are only visible to submitter
|
|
return ['_submitter']
|
|
|
|
status_action_roles = set()
|
|
|
|
# make sure the person defined as handling role always gets access
|
|
# to the formdata, till the very end (where it may be that there is
|
|
# no workflow status item at all).
|
|
handling_role_id = self.get_handling_role_id()
|
|
if handling_role_id:
|
|
status_action_roles.add(handling_role_id)
|
|
|
|
wf_status = self.get_status()
|
|
if not wf_status:
|
|
status_action_roles.add('_submitter')
|
|
return status_action_roles
|
|
|
|
# people that can act on a workflow status item are considered
|
|
# 'concerned' by the formdata
|
|
from wcs.workflows import get_role_translation
|
|
for item in wf_status.items or []:
|
|
if not hasattr(item, 'by') or not item.by:
|
|
continue
|
|
for role in item.by:
|
|
if role == '_submitter':
|
|
status_action_roles.add(role)
|
|
else:
|
|
status_action_roles.add(get_role_translation(self, role))
|
|
|
|
return status_action_roles
|
|
|
|
concerned_roles = property(get_concerned_roles)
|
|
|
|
def anonymise(self):
|
|
for field in self.formdef.fields:
|
|
if field.anonymise:
|
|
self.data[field.id] = None
|
|
if field.store_display_value:
|
|
self.data['%s_display' % field.id] = None
|
|
|
|
self.anonymised = datetime.datetime.now()
|
|
self.user_id = None
|
|
self.user_hash = None
|
|
self.editable_by = None
|
|
self.workflow_data = None
|
|
self.workflow_roles = None
|
|
|
|
for evo in self.evolution:
|
|
evo.who = None
|
|
evo.parts = None
|
|
evo.comment = None
|
|
evo.parts = None
|
|
self.store()
|
|
|
|
def export_to_json(self):
|
|
data = {}
|
|
data['id'] = '%s/%s' % (self.formdef.url_name, self.id)
|
|
data['display_id'] = self.get_display_id()
|
|
data['receipt_time'] = self.receipt_time
|
|
|
|
try:
|
|
user = get_publisher().user_class.get(self.user_id)
|
|
except KeyError:
|
|
user = None
|
|
# this is custom code so it is possible to mark forms as anonyms, this
|
|
# is done through the VoteAnonymity field, this is very specific but
|
|
# isn't generalised yet into an useful extension mechanism, as it's not
|
|
# clear at the moment what could be useful.
|
|
for f in self.formdef.fields:
|
|
if f.key == 'vote-anonymity':
|
|
user = None
|
|
break
|
|
if user:
|
|
data['user'] = {'id': user.id, 'name': user.display_name}
|
|
|
|
data['fields'] = get_json_dict(self.formdef.fields, self.data)
|
|
|
|
data['workflow'] = {}
|
|
wf_status = self.get_visible_status()
|
|
if wf_status:
|
|
data['workflow']['status'] = {'id': wf_status.id, 'name': wf_status.name}
|
|
if self.workflow_data:
|
|
data['workflow']['data'] = self.workflow_data
|
|
|
|
return json.dumps(data,
|
|
cls=JSONEncoder,
|
|
encoding=get_publisher().site_charset)
|
|
|
|
def feed_session(self):
|
|
# this gives a chance to fields to initialize things that would rely on
|
|
# current data ahead of times
|
|
for field in self.formdef.fields:
|
|
field.feed_session(self.data.get(field.id),
|
|
self.data.get('%s_display' % field.id))
|
|
|
|
# don't pickle _formdef cache
|
|
def __getstate__(self):
|
|
odict = self.__dict__
|
|
if odict.has_key('_formdef'):
|
|
del odict['_formdef']
|
|
return odict
|
|
|
|
def __setstate__(self, dict):
|
|
if '_formdef' in dict:
|
|
# there was a time, before October 2007 and 48e46bf0, and pickled
|
|
# objects had a _formdef, in case these objects still exists, we
|
|
# remove the _formdef beforehand, so it doesn't interfere with the
|
|
# cached _formdef already set in data_class()
|
|
del dict['_formdef']
|
|
self.__dict__ = dict
|
|
|
|
|
|
Substitutions.register('form_receipt_date', category=N_('Form'), comment=N_('Form Receipt Date'))
|
|
Substitutions.register('form_receipt_time', category=N_('Form'), comment=N_('Form Receipt Time'))
|
|
Substitutions.register('form_number', category=N_('Form'), comment=N_('Form Number'))
|
|
Substitutions.register('form_details', category=N_('Form'), comment=N_('Form Details'))
|
|
Substitutions.register('form_url', category=N_('Form'), comment=N_('Form URL'))
|
|
Substitutions.register('form_url_backoffice', category=N_('Form'), comment=N_('Form URL (backoffice)'))
|
|
Substitutions.register('form_status_url', category=N_('Form'), comment=N_('Form Status URL'))
|
|
Substitutions.register('form_user', category=N_('Form'), comment=N_('Form Submitter'))
|
|
Substitutions.register('form_user_display_name', category=N_('Form'), comment=N_('Form Submitter Name'))
|
|
Substitutions.register('form_user_email', category=N_('Form'), comment=N_('Form Submitter Email'))
|
|
Substitutions.register_dynamic_source(FormData)
|