wcs/wcs/formdata.py

357 lines
12 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import time
import urllib
from quixote import get_request, get_publisher
from qommon.storage import StorableObject
import qommon.misc
from qommon.substitution import Substitutions
from anonylink import AnonymityLink
def get_dict_with_varnames(fields, data):
new_data = {}
for field in fields:
if data is not None:
raw_value = data.get(field.id)
if field.convert_value_to_str:
raw_value = field.convert_value_to_str(raw_value)
else:
raw_value = ''
display_value = data.get('%s_display' % field.id)
value = display_value or raw_value
# add it as f$n$
new_data['f%s' % field.id] = value
# also add it as 'field_' + normalized(field label)
identifier_name = qommon.misc.simplify(field.label, space = '_')
new_data['field_' + identifier_name] = value
# and finally add it as its manually defined variable name
if field.varname:
new_data['var_%s' % field.varname] = value
if display_value:
new_data['var_%s_raw' % field.varname] = raw_value
return new_data
class Evolution:
who = None
status = None
time = None
comment = None
parts = None
def get_author_name(self):
if self.who == '_submitter':
return _('Original Submitter')
else:
return get_publisher().user_class.get(self.who).display_name
def add_part(self, part):
if not self.parts:
self.parts = []
self.parts.append(part)
def display_parts(self):
if not self.parts:
return []
l = []
for p in self.parts:
if not hasattr(p, 'view'):
continue
l.append(p.view())
return l
class FormData(StorableObject):
_names = 'XX'
_hashed_indexes = ['user_id', 'user_hash', 'status']
user_id = None
user_hash = None
receipt_time = None
status = None
evolution = None
data = None
editable_by = None
_signature = None
_formdef = None
def get_formdef(self):
if self._formdef:
return self._formdef
from formdef import FormDef
type, id = self._names.split('-', 1)
try:
self._formdef = FormDef.get_by_urlname(id)
except KeyError:
self._formdef = None
return self._formdef
formdef = property(get_formdef)
def __init__(self, id=None):
self.id = id
def migrate(self):
changed = False
if self.status and not self.status.startswith('wf-'):
self.status = 'wf-%s' % self.status
changed = True
if self.evolution:
for evo in self.evolution:
if evo.status and not evo.status.startswith('wf-'):
evo.status = 'wf-%s' % evo.status
changed = True
if changed:
self.store()
def get_user(self):
if self.user_id and self.user_id != 'ultra-user':
return get_publisher().user_class.get(self.user_id, ignore_errors=True)
return None
def set_user(self, user):
self.user_hash = None
try:
self.user_hash = user.hash
self.user_id = None
except AttributeError:
if user:
self.user_id = user.id
else:
self.user_id = None
user = property(get_user, set_user)
def get_signature(self):
return self._signature
def set_signature(self, signature):
self._signature = signature
signature = property(get_signature, set_signature)
def just_created(self):
self.receipt_time = time.localtime()
self.status = 'wf-%s' % self.formdef.workflow.possible_status[0].id
# we add the initial status to the history, this makes it more readable
# afterwards (also this gets the (previous_status) code to work in all
# cases)
evo = Evolution()
evo.who = '_submitter'
evo.time = self.receipt_time
evo.status = self.status
self.evolution = [evo]
def perform_workflow(self):
url = None
get_publisher().substitutions.feed(self)
wf_status = self.get_workflow_status()
url = wf_status.perform_items(self)
return url
def display_workflow_message(self):
wf_status = self.get_workflow_status()
for status in wf_status.items:
if hasattr(status, 'get_message'):
return status.get_message(self)
return ''
def get_status(self, status = None):
if not status:
status = self.status
if not self.formdef:
return None
try:
status_id = status.split('-')[1]
wf_status = [x for x in self.formdef.workflow.possible_status if x.id == status_id][0]
except IndexError:
return None
return wf_status
def get_status_label(self, status = None):
wf_status = self.get_status(status)
if not wf_status:
return _('Unknown')
return wf_status.name
def get_visible_status(self, user=None):
if not self.evolution:
return self.get_status()
if not user:
user = get_request().user
for evo in reversed(self.evolution):
if not evo.status:
continue
wf_status = self.get_status(evo.status)
if not wf_status:
continue
if not wf_status.is_visible(self, user):
continue
return wf_status
return None
def get_workflow_form(self, user):
wf_status = self.get_workflow_status()
return wf_status.get_action_form(self, user)
def get_workflow_subdirectories(self):
wf_status = self.get_workflow_status()
return wf_status.get_subdirectories(self)
def handle_workflow_form(self, user, form):
wf_status = self.get_workflow_status()
return wf_status.handle_form(form, self, user)
def get_url(self, backoffice = False):
if backoffice:
base_url = get_publisher().get_backoffice_url()
else:
base_url = get_publisher().get_frontoffice_url()
return '%s/%s/%s/' % (base_url, self.formdef.url_name, self.id)
def get_workflow_status(self):
try:
status_id = self.status.split('-')[1]
wf_status = [x for x in self.formdef.workflow.possible_status if x.id == status_id][0]
except IndexError:
return None
return wf_status
def get_handling_role(self):
# TODO: look at current status and return the role(s) actually
# concerned by the handling of the formdata
from workflows import get_role_translation
from roles import Role
try:
return Role.get(get_role_translation(self, '_receiver'))
except KeyError:
return None
def get_field_value(self, field):
try:
x = [x for x in self.formdef.fields if x.label == field][0]
except IndexError:
return None
return self.data.get(x.id)
def get_as_dict(self):
return get_dict_with_varnames(self.formdef.fields, self.data)
def get_substitution_variables(self):
d = {
'form_receipt_date': qommon.strftime.strftime(qommon.misc.date_format(), self.receipt_time),
'form_receipt_time': qommon.strftime.strftime('%H:%M', self.receipt_time),
'form_number': str(self.id),
'form_url': self.get_url(),
'form_url_backoffice': self.get_url(backoffice=True),
'form_uri': '%s/%s/' % (self.formdef.url_name, self.id),
'form_status_url': '%sstatus' % self.get_url(),
'form_details': self.formdef.get_detailed_email_form(self, self.get_url()),
}
user = self.get_user()
if user:
d.update(user.get_substitution_variables(prefix='form_'))
data = get_dict_with_varnames(self.formdef.fields, self.data)
for k, v in data.items():
d['form_'+k] = v
for k, v in self.get_as_dict().items():
d['form_'+k] = v
if self.evolution and self.evolution[-1].comment:
d['form_comment'] = self.evolution[-1].comment
else:
d['form_comment'] = ''
d['form_status'] = self.get_status_label()
d['form_previous_status'] = ''
if self.evolution:
for evolution in reversed(self.evolution):
if evolution.status and evolution.status != self.status:
d['form_previous_status'] = self.get_status_label(evolution.status)
break
if d['form_status'] != d['form_previous_status']:
d['form_status_changed'] = True
d['form_evolution'] = self.formdef.get_detailed_evolution(self)
d.update(self.formdef.get_substitution_variables())
if self.formdef.workflow and self.status:
wf_status = self.get_workflow_status()
for item in wf_status.items:
d.update(item.get_substitution_variables(self))
return d
def get_substitution_variables_list(cls):
variables = []
# we can't advertise fields, as this is a metaclass that will be used
# in FormDef.data_class() to create a real class
for field in []: # cls.formdef.fields:
# we only advertise fields with a varname, as they can be
# considered stable
if field.varname:
variables.append((
_('Form'), 'form_var_'+field.varname,
_('Form Field: %s') % field.label))
user_variables = get_publisher().user_class.get_substitution_variables_list(prefix='form_')
for cat, name, comment in user_variables:
variables.append((_('Form'), name, _('Form Submitter Field')))
return variables
get_substitution_variables_list = classmethod(get_substitution_variables_list)
def is_submitter(self, user):
if self.user_id and str(self.user_id) == str(user.id):
return True
try:
if self.user_hash and self.user_hash == user.hash:
return True
except AttributeError:
return False
return False
# don't pickle _formdef cache
def __getstate__(self):
odict = self.__dict__
if odict.has_key('_formdef'):
del odict['_formdef']
return odict
def __setstate__(self, dict):
self.__dict__ = dict
self._formdef = None
Substitutions.register('form_receipt_date', category=N_('Form'), comment=N_('Form Receipt Date'))
Substitutions.register('form_receipt_time', category=N_('Form'), comment=N_('Form Receipt Time'))
Substitutions.register('form_number', category=N_('Form'), comment=N_('Form Number'))
Substitutions.register('form_details', category=N_('Form'), comment=N_('Form Details'))
Substitutions.register('form_url', category=N_('Form'), comment=N_('Form URL'))
Substitutions.register('form_url_backoffice', category=N_('Form'), comment=N_('Form URL (backoffice)'))
Substitutions.register('form_status_url', category=N_('Form'), comment=N_('Form Status URL'))
Substitutions.register('form_user', category=N_('Form'), comment=N_('Form Submitter'))
Substitutions.register('form_user_display_name', category=N_('Form'), comment=N_('Form Submitter Name'))
Substitutions.register('form_user_email', category=N_('Form'), comment=N_('Form Submitter Email'))
Substitutions.register_dynamic_source(FormData)