wcs/wcs/formdata.py

1225 lines
47 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import collections
import copy
import datetime
import json
import re
import sys
import time
from django.utils import six
from quixote import get_request, get_publisher, get_session
from quixote.http_request import Upload
from .qommon import _, N_
from .qommon.storage import StorableObject, Intersects, Contains
from .qommon import misc
from .qommon.evalutils import make_datetime
from .qommon.publisher import get_cfg
from .qommon.substitution import Substitutions, invalidate_substitution_cache
from .qommon.template import Template
from .roles import Role
from .fields import FileField
def get_dict_with_varnames(fields, data, formdata=None, varnames_only=False):
new_data = {}
for field in fields:
if not hasattr(field, 'get_view_value'):
continue
raw_value = None
if data is not None:
value = data.get(field.id)
if field.convert_value_to_str:
raw_value = value
value = field.convert_value_to_str(value)
display_value = data.get('%s_display' % field.id)
else:
value = ''
display_value = ''
if not varnames_only:
# add it as f$n$
new_data['f%s' % field.id.replace('-', '_')] = value
# also add it as 'field_' + normalized(field label)
identifier_name = misc.simplify(field.label, space = '_')
new_data['field_' + identifier_name] = value
# and finally add it as its manually defined variable name
if field.varname:
if field.store_display_value:
new_data['var_%s_raw' % field.varname] = value
new_data['var_%s' % field.varname] = display_value
else:
new_data['var_%s' % field.varname] = value
if field.key == 'file':
new_data['var_%s_raw' % field.varname] = value
new_data['var_%s_url' % field.varname] = None
if value and hasattr(value, 'base_filename'):
new_data['var_%s' % field.varname] = value.base_filename
if formdata is not None:
new_data['var_%s_url' % field.varname] = '%sdownload?f=%s' % (
formdata.get_url(), field.id)
elif raw_value is not None:
new_data['var_%s_raw' % field.varname] = raw_value
if data is not None:
structured_value = field.get_structured_value(data)
if type(structured_value) is dict:
for k, v in structured_value.items():
if k in ('id', 'text'):
continue
new_data['var_%s_%s' % (field.varname, k)] = v
if type(structured_value) is list:
for i, struct_value in enumerate(structured_value):
for k, v in struct_value.items():
if k in ('id', 'text'):
continue
new_data['var_%s_%s_%s' % (field.varname, i, k)] = v
if field.store_structured_value:
new_data['var_%s_structured_raw' % field.varname] = structured_value
return new_data
def flatten_dict(d):
for k, v in list(d.items()):
if type(v) is dict:
flatten_dict(v)
for k2, v2 in v.items():
d['%s_%s' % (k, k2)] = v2
del d[k]
def get_json_dict(fields, data, include_files=True, anonymise=False):
new_data = {}
for field in fields:
if anonymise and field.anonymise:
continue
if not field.varname: # exports only named fields
continue
if not include_files and isinstance(field, FileField):
continue
if data is not None:
value = data.get(field.id)
if value and hasattr(field, 'get_json_value'):
value = field.get_json_value(value)
else:
value = display_value = None
if field.store_display_value:
new_data[field.varname + '_raw'] = value
new_data[field.varname] = data.get('%s_display' % field.id)
else:
new_data[field.varname] = value
if field.store_structured_value:
if data.get('%s_structured' % field.id):
new_data[field.varname + '_structured'] = data.get('%s_structured' % field.id)
return new_data
class Evolution(object):
who = None
status = None
time = None
last_jump_datetime = None
comment = None
parts = None
def __init__(self, formdata=None):
self._formdata = formdata # formdata cache
@property
def formdata(self):
return self._formdata
def get_author_name(self):
user_id = self.who
if self.who == '_submitter':
user_id = self.formdata.user_id
try:
return get_publisher().user_class.get(user_id).display_name
except KeyError:
return None
def get_author_qualification(self):
if self.who == '_submitter' and not self.formdata.is_submitter(get_request().user):
return _('Original Submitter')
return None
def add_part(self, part):
if not self.parts:
self.parts = []
self.parts.append(part)
_display_parts = None # cache
def display_parts(self):
if self._display_parts is not None:
return self._display_parts
if not self.parts:
return []
l = []
for p in self.parts:
if not hasattr(p, 'view'):
continue
text = p.view()
if text:
l.append(text)
self._display_parts = l
return self._display_parts
def get_json_export_dict(self, user, anonymise=False):
data = {
'time': datetime.datetime(*self.time[:6]) if self.time else None,
'last_jump_datetime': self.last_jump_datetime
}
if self.status:
data['status'] = self.status[3:]
if self.who != '_submitter':
try:
user = get_publisher().user_class.get(self.who)
except KeyError:
pass
else:
data['who'] = user.get_json_export_dict()
elif not anonymise and user:
data['who'] = user.get_json_export_dict()
if self.comment and not anonymise:
data['comment'] = self.comment
parts = []
for part in self.parts or []:
if hasattr(part, 'get_json_export_dict'):
parts.append(part.get_json_export_dict(anonymise=anonymise))
if parts:
data['parts'] = parts
return data
# don't pickle _formata cache
def __getstate__(self):
odict = self.__dict__.copy()
if '_formdata' in odict:
del odict['_formdata']
if '_display_parts' in odict:
del odict['_display_parts']
return odict
@property
def datetime(self):
return datetime.datetime(*self.time[:6])
def get_status(self):
status = self.status
if not self.status:
# look for the previous evolution with a status
for evolution in reversed(self.formdata.evolution[:self.formdata.evolution.index(self)]):
status = evolution.status
if status:
break
return self.formdata.get_status(status=status)
def get_status_label(self):
status = self.get_status()
return status.name if status else _('Unknown')
def is_hidden(self, user=None):
status = self.get_status()
if status:
return not status.is_visible(self.formdata, user or get_request().user)
return True
class FormData(StorableObject):
_names = 'XX'
_hashed_indexes = ['user_id', 'status', 'workflow_roles',
'concerned_roles', 'actions_roles']
id_display = None
user_id = None
user_label = None # taken from data, for anonymous users
receipt_time = None
status = None
anonymised = None
page_no = 0 # page to use when restoring from draft
evolution = None
data = None
editable_by = None
tracking_code = None
backoffice_submission = False
submission_context = None
submission_channel = None
criticality_level = 0
digest = None
workflow_data = None
workflow_roles = None
geolocations = None
_formdef = None
def get_formdef(self):
if self._formdef:
return self._formdef
from .formdef import FormDef
type, id = self._names.split('-', 1)
try:
self._formdef = FormDef.get_by_urlname(id)
except KeyError:
self._formdef = None
return self._formdef
formdef = property(get_formdef)
def __init__(self, id=None):
self.id = id
def migrate(self):
changed = False
if self.status and not self.status.startswith('wf-') and self.status != 'draft':
self.status = 'wf-%s' % self.status
changed = True
if self.evolution:
for evo in self.evolution:
evo._formdata = self # link from evolution to formdata
if evo.status and not evo.status.startswith('wf-'):
evo.status = 'wf-%s' % evo.status
changed = True
if changed:
self.store()
@invalidate_substitution_cache
def store(self, *args, **kwargs):
# make sure the class set under the formdef name in the sys.modules
# namespaces is the exact one that was used when creating this
# particular object, as it is required by pickle (or it will raise
# "Can't pickle %r: it's not the same object as %s.%s" if the class
# object has been changed in the course of the request).
setattr(sys.modules[self._formdef.pickle_module_name], self._formdef.url_name.title(), self.__class__)
setattr(sys.modules['wcs.%s' % self._formdef.pickle_module_name], self._formdef.url_name.title(), self.__class__)
has_id = (self.id is not None)
if has_id:
self.set_auto_fields()
super(FormData, self).store(*args, **kwargs)
if not has_id: # got it now
if self.set_auto_fields():
# store changes
super(FormData, self).store(*args, **kwargs)
def get_user(self):
if self.user_id and self.user_id != 'ultra-user':
return get_publisher().user_class.get(self.user_id, ignore_errors=True)
return None
def set_user(self, user):
if user:
self.user_id = user.id
else:
self.user_id = None
user = property(get_user, set_user)
def get_user_label(self):
user = self.user
if user:
return user.get_display_name()
return self.user_label
def has_empty_data(self):
empty = True
for key in self.data or {}:
empty &= (self.data.get(key) is None)
return empty
@classmethod
def get_actionable_count(cls, user_roles):
if get_publisher().is_using_postgresql():
statuses = ['wf-%s' % x.id for x in cls._formdef.workflow.get_not_endpoint_status()]
criterias = [Intersects('actions_roles_array', user_roles),
Contains('status', statuses)]
return cls.count(criterias)
else:
return len(cls.get_actionable_ids(user_roles))
@classmethod
def get_actionable_ids(cls, user_roles):
statuses = ['wf-%s' % x.id for x in cls._formdef.workflow.get_not_endpoint_status()]
if get_publisher().is_using_postgresql():
criterias = [Intersects('actions_roles_array', user_roles),
Contains('status', statuses)]
return cls.keys(criterias)
else:
actions_ids = set()
for role in user_roles:
actions_ids |= set(cls.get_ids_with_indexed_value('actions_roles', str(role)))
open_ids = []
for status_id in statuses:
open_ids.extend(cls.get_ids_with_indexed_value('status', status_id))
return list(actions_ids.intersection(open_ids))
@classmethod
def get_submission_channels(cls):
return collections.OrderedDict([
('mail', _('Mail')),
('email', _('Email')),
('phone', _('Phone')),
('counter', _('Counter')),
('fax', _('Fax')),
('web', _('Web'))])
def get_submission_channel_label(self):
return self.get_submission_channels().get(self.submission_channel) or _('Web')
def just_created(self):
self.receipt_time = time.localtime()
self.status = 'wf-%s' % self.formdef.workflow.possible_status[0].id
# we add the initial status to the history, this makes it more readable
# afterwards (also this gets the (previous_status) code to work in all
# cases)
evo = Evolution(self)
evo.who = '_submitter'
evo.time = self.receipt_time
evo.status = self.status
self.evolution = [evo]
def set_auto_fields(self, *args, **kwargs):
fields = {'digest': self.formdef.digest_template}
if not self.id_display:
# only set id_display once as it may have been set automatically
# by interpreting a webservice response.
fields['id_display'] = self.formdef.get_display_id_format().strip()
changed = False
users_cfg = get_cfg('users', {})
if not self.user_id and users_cfg and users_cfg.get('field_name'):
field_name_values = users_cfg.get('field_name')
form_user_data = {}
for field in self.formdef.fields:
if not hasattr(field, 'prefill'):
continue
if field.prefill and field.prefill.get('type') == 'user':
form_user_data[field.prefill['value']] = self.data.get(field.id)
user_label = ' '.join([form_user_data.get(x) for x in field_name_values
if isinstance(form_user_data.get(x), six.string_types)])
if user_label != self.user_label:
self.user_label = user_label
changed = True
if any(fields.values()):
context = self.get_substitution_variables()
context['formdef_id'] = self.formdef.id
for attribute, template in fields.items():
if template is None:
new_value = None
else:
new_value = Template(template, autoescape=False).render(context)
if new_value != getattr(self, attribute, None):
setattr(self, attribute, new_value)
changed = True
return changed
# criticality levels are stored as [0, 101, 102, 103...], this makes it
# easier to group "uncritical" formdatas (=0) together when sorting.
def get_current_criticality_level(self):
levels = len(self.formdef.workflow.criticality_levels or [0])
current_level = self.criticality_level or 0
if current_level >= 100 + levels:
# too high, probably because the workflow was changed and there is
# fewer levels than before
current_level = 100 + levels - 1
return current_level
def increase_criticality_level(self):
levels = len(self.formdef.workflow.criticality_levels or [0])
current_level = self.get_current_criticality_level()
if current_level == 0:
current_level = 100
if current_level < (100 + levels - 1):
self.criticality_level = current_level + 1
self.store()
def decrease_criticality_level(self):
levels = len(self.formdef.workflow.criticality_levels or [0])
current_level = self.get_current_criticality_level()
if current_level == 0:
return
self.criticality_level = current_level - 1
if self.criticality_level <= 100:
self.criticality_level = 0
self.store()
def set_criticality_level(self, level):
levels = len(self.formdef.workflow.criticality_levels or [0])
level = min(levels-1, level)
if level > 0:
self.criticality_level = 100 + level
else:
self.criticality_level = 0
self.store()
def get_criticality_level_object(self):
levels = self.formdef.workflow.criticality_levels or []
if not levels:
raise IndexError()
current_level = self.get_current_criticality_level()
if current_level > 0:
current_level = current_level - 100
return levels[current_level]
def perform_workflow(self):
url = None
get_publisher().substitutions.feed(self)
wf_status = self.get_status()
from wcs.workflows import perform_items
url = perform_items(wf_status.items, self)
return url
def perform_global_action(self, action_id, user):
from wcs.workflows import perform_items
for action in self.formdef.workflow.get_global_actions_for_user(formdata=self, user=user):
if action.id != action_id:
continue
perform_items(action.items, self)
break
def get_workflow_messages(self, position='top'):
wf_status = self.get_status()
if not wf_status:
return []
messages = []
for item in wf_status.items:
if not item.check_condition(self):
continue
if hasattr(item, 'get_message'):
message = item.get_message(self, position=position)
if message:
messages.append(message)
return messages
def get_status(self, status = None):
if not status:
status = self.status
if status is None:
return None
if not self.formdef:
return None
if status.startswith('wf-'):
status = status[3:]
try:
wf_status = [x for x in self.formdef.workflow.possible_status if x.id == status][0]
except IndexError:
return None
return wf_status
def get_status_label(self, status=None):
if self.is_draft(status):
return _('Draft')
wf_status = self.get_status(status)
if not wf_status:
return _('Unknown')
return wf_status.name
def get_visible_status(self, user=None):
if not self.evolution:
return self.get_status()
if not user and get_request():
user = get_request().user
for evo in reversed(self.evolution):
if not evo.status:
continue
wf_status = self.get_status(evo.status)
if not wf_status:
continue
if not wf_status.is_visible(self, user):
continue
return wf_status
return None
def get_visible_evolution_parts(self, user=None):
last_seen_status = None
last_seen_author = None
for evolution_part in self.evolution or []:
if evolution_part.is_hidden(user=user):
continue
if (evolution_part.status is None or last_seen_status == evolution_part.status) and (
evolution_part.who is None or last_seen_author == evolution_part.who):
# don't include empty evolution parts if status and author
# didn't change.
if not evolution_part.comment and not evolution_part.display_parts():
continue
last_seen_status = evolution_part.status or last_seen_status
last_seen_author = evolution_part.who or last_seen_author
yield evolution_part
def get_workflow_form(self, user, displayed_fields=None):
wf_status = self.get_status()
if not wf_status:
return None
return wf_status.get_action_form(self, user, displayed_fields=displayed_fields)
def handle_workflow_form(self, user, form):
wf_status = self.get_status()
if not wf_status:
return None
return wf_status.handle_form(form, self, user)
def evaluate_live_workflow_form(self, user, form):
wf_status = self.get_status()
if not wf_status:
return None
wf_status.evaluate_live_form(form, self, user)
def pop_previous_marked_status(self):
if not self.workflow_data or not '_markers_stack' in self.workflow_data:
return None
try:
marker_data = self.workflow_data['_markers_stack'].pop()
status_id = marker_data['status_id']
except IndexError:
return None
return self.formdef.workflow.get_status(status_id)
def jump_status(self, status_id):
if status_id == '_previous':
previous_status = self.pop_previous_marked_status()
assert previous_status, 'failed to compute previous status'
status_id = previous_status.id
status = 'wf-%s' % status_id
if not self.evolution:
self.evolution = []
elif (self.status == status
and self.evolution[-1].status == status
and not self.evolution[-1].comment
and not self.evolution[-1].display_parts()):
# if status do not change and last evolution is empty,
# just update last jump time on last evolution, do not add one
self.evolution[-1].last_jump_datetime = datetime.datetime.now()
self.store()
return
evo = Evolution(self)
evo.time = time.localtime()
evo.status = status
self.evolution.append(evo)
self.status = status
self.store()
def get_url(self, backoffice = False):
return '%s%s/' % (self.formdef.get_url(backoffice=backoffice), self.id)
def get_api_url(self):
return '%s%s/' % (self.formdef.get_api_url(), self.id)
def get_display_id(self):
return str(self.id_display or self.id)
def get_handling_role_id(self):
# TODO: look at current status and return the role(s) actually
# concerned by the handling of the formdata
from wcs.workflows import get_role_translation
return get_role_translation(self, '_receiver')
def get_handling_role(self):
try:
return Role.get(self.get_handling_role_id())
except KeyError:
return None
def get_field_view_value(self, field, max_length=None):
# return the value of the given field, with special handling for "fake"
# field types that are shortcuts to internal properties.
if field.type == 'id':
return self.get_display_id()
if field.type == 'display_name':
return self.get_display_name()
if field.type == 'time':
return misc.localstrftime(self.receipt_time)
if field.type == 'last_update_time':
return misc.localstrftime(self.last_update_time)
if field.type == 'user-label':
return self.get_user_label() or '-'
if field.type == 'status':
return self.get_status_label()
if field.type == 'submission_channel':
return self.get_submission_channel_label()
if field.type == 'submission_agent':
try:
agent_user = self.submission_context['agent_id']
return get_publisher().user_class.get(agent_user).display_name
except (KeyError, TypeError):
return '-'
if field.type == 'anonymised':
return _('Yes') if self.anonymised else _('No')
field_value = self.data.get(field.id)
if field_value is None:
return ''
if max_length is not None:
# if max_length is set the target is a backoffice listing/table,
# return an html value, appropriately shortened.
field_value = self.data.get('%s_display' % field.id, field_value)
return field.get_view_short_value(field_value, max_length)
else:
# otherwise return the actual "raw" field value
return field_value
def update_workflow_data(self, dict):
if not self.workflow_data:
self.workflow_data = {}
self.workflow_data.update(dict)
def get_as_dict(self):
return get_dict_with_varnames(self.formdef.get_all_fields(), self.data, self)
def is_at_endpoint_status(self):
endpoint_status_ids = ['wf-%s' % x.id for x in self.formdef.workflow.get_endpoint_status()]
return (self.status in endpoint_status_ids)
def get_static_substitution_variables(self, minimal=False):
d = {}
if self.id:
d.update({
'form_receipt_date': misc.strftime(misc.date_format(), self.receipt_time),
'form_receipt_time': misc.strftime('%H:%M', self.receipt_time),
'form_number': str(self.get_display_id()),
'form_number_raw': '%s' % self.id,
'form_url': self.get_url(),
'form_url_backoffice': self.get_url(backoffice=True),
'form_uri': '%s/%s/' % (self.formdef.url_name, self.id),
'form_criticality_level': self.criticality_level,
'form_digest': self.digest,
'form_display_name': self.get_display_name(),
})
if self.receipt_time:
# always get receipt time as a datetime object, this handles
# both normal formdata (where receipt_time is a time.struct_time)
# and sql.AnyFormData where it's already a datetime object.
d['form_receipt_datetime'] = make_datetime(self.receipt_time)
if self.formdef.workflow.criticality_levels:
try:
level = self.get_criticality_level_object()
except IndexError:
pass
else:
d['form_criticality_label'] = level.name
d['form_status'] = self.get_status_label()
if self.id and self.formdef.workflow and self.status:
d['form_status_is_endpoint'] = self.is_at_endpoint_status()
if self.tracking_code:
d['form_tracking_code'] = self.tracking_code
elif not self.status and self.data:
if 'future_tracking_code' in self.data:
d['form_tracking_code'] = self.data['future_tracking_code']
elif 'draft_formdata_id' in self.data:
try:
d['form_tracking_code'] = self.formdef.data_class().get(self.data['draft_formdata_id']).tracking_code
except KeyError:
pass
d['form_submission_backoffice'] = self.backoffice_submission
d['form_submission_channel'] = self.submission_channel
d['form_submission_channel_label'] = self.get_submission_channel_label()
if self.submission_context:
d['form_submission_context'] = self.submission_context
# formdef and category variables
d.update(self.formdef.get_static_substitution_variables(minimal=minimal))
if minimal:
d = copy.deepcopy(d)
flatten_dict(d)
return d
if self.id:
d.update({
'form_status_url': '%sstatus' % self.get_url(),
'form_details': self.formdef.get_detailed_email_form(self, self.get_url()),
})
user = self.get_user()
if user:
d.update(user.get_substitution_variables(prefix='form_'))
for k, v in self.get_as_dict().items():
d['form_'+k] = v
# include substitution variables for workflow roles; this will
# typically give variables such as form_role_receiver_name and
# form_role_receiver_emails.
workflow_roles = {}
if self.formdef.workflow_roles:
workflow_roles.update(self.formdef.workflow_roles)
if self.workflow_roles:
workflow_roles.update(self.workflow_roles)
for role_type, role_id in workflow_roles.items():
prefix = 'form_role_%s_' % role_type.replace('-', '_').strip('_')
try:
d.update(Role.get(role_id).get_substitution_variables(prefix))
except KeyError:
pass
if self.evolution and self.evolution[-1].comment:
d['form_comment'] = self.evolution[-1].comment
else:
d['form_comment'] = ''
d['form_previous_status'] = ''
d['form_status_changed'] = False
if self.evolution:
first_evolution_in_current_status = None
for evolution in reversed(self.evolution):
if evolution.status and evolution.status != self.status:
d['form_previous_status'] = self.get_status_label(evolution.status)
break
if evolution.status:
first_evolution_in_current_status = evolution
if (d['form_status'] != d['form_previous_status'] and
self.evolution[-1].status and
first_evolution_in_current_status is self.evolution[-1] and
not self.evolution[-1].last_jump_datetime):
# mark status has changed if the previous status was different
# and we are not on a change done on the same status.
d['form_status_changed'] = True
d['form_evolution'] = self.formdef.get_detailed_evolution(self)
if self.formdef.workflow and self.status:
wf_status = self.get_status()
if wf_status:
for item in wf_status.items:
d.update(item.get_substitution_variables(self))
# Add variables from evolution parts classes
evolution_parts_classes = set(part.__class__ for evolution in self.evolution or [] for part in
evolution.parts or [])
for klass in evolution_parts_classes:
if hasattr(klass, 'get_substitution_variables'):
d.update(klass.get_substitution_variables(self))
if self.geolocations:
for k, v in self.geolocations.items():
d['form_geoloc_%s_lat' % k] = v.get('lat')
d['form_geoloc_%s_lon' % k] = v.get('lon')
d['form_geoloc_%s' % k] = v
lazy = self.get_substitution_variables()
del lazy['form']
del lazy['attachments']
d.update(lazy)
d = copy.deepcopy(d)
flatten_dict(d)
return d
def get_substitution_variables(self, minimal=False):
from .qommon.substitution import CompatibilityNamesDict
from wcs.variables import LazyFormData
from wcs.workflows import AttachmentsSubstitutionProxy
variables = CompatibilityNamesDict({
'form': LazyFormData(self),
'attachments': AttachmentsSubstitutionProxy(self),
})
if self.formdef.category:
variables.update(self.formdef.category.get_substitution_variables(minimal=minimal))
if minimal:
return variables
if self.workflow_data:
d = {}
# pass over workflow data to:
# - attach an extra url attribute to uploaded files
# - ignore "private" attributes
for k, v in self.workflow_data.items():
if k[0] == '_':
continue
d[k] = v
# recompute _url variable of attached files
form_url = self.get_url()
for k, v in self.workflow_data.items():
if isinstance(v, Upload):
try:
formvar, fieldvar = re.match('(.*)_var_(.*)_raw$', k).groups()
except AttributeError:
continue
d[k.rsplit('_', 1)[0] + '_url'] = '%sfiles/form-%s-%s/%s' % (
form_url, formvar, fieldvar,
self.workflow_data['%s_var_%s' % (formvar, fieldvar)])
d = copy.deepcopy(d)
flatten_dict(d)
variables.update(d)
return variables
@classmethod
def get_substitution_variables_list(cls):
variables = []
# we can't advertise fields, as this is a metaclass that will be used
# in FormDef.data_class() to create a real class
for field in []: # cls.formdef.fields:
# we only advertise fields with a varname, as they can be
# considered stable
if field.varname:
variables.append((
_('Form'), 'form_var_'+field.varname,
_('Form Field: %s') % field.label))
user_variables = get_publisher().user_class.get_substitution_variables_list(prefix='form_')
for cat, name, comment in user_variables:
variables.append((_('Form'), name, _('Form Submitter Field')))
return variables
@classmethod
def rebuild_security(cls):
with get_publisher().substitutions.temporary_feed(cls._formdef):
cls.rebuild_indexes(indexes=['concerned_roles', 'actions_roles'])
def is_submitter(self, user):
if self.user_id and user and str(self.user_id) == str(user.id):
return True
if get_session() and get_session().is_anonymous_submitter(self):
return True
return False
def is_draft(self, status=None):
if status is None:
status = self.status
return status in ('draft', 'wf-draft')
def get_concerned_roles(self):
if self.is_draft():
# drafts are only visible to submitter
return ['_submitter']
status_action_roles = set()
# make sure the handling roles always gets access to the formdata, till
# the very end (where it may be that there is no workflow status item
# at all).
from wcs.workflows import get_role_translation
for function_key in self.formdef.workflow.roles.keys():
handling_role = get_role_translation(self, function_key)
if handling_role:
status_action_roles.add(handling_role)
wf_status = self.get_status()
if not wf_status:
status_action_roles.add('_submitter')
else:
status_action_roles |= set(self.get_actions_roles())
return status_action_roles
concerned_roles = property(get_concerned_roles)
def get_actions_roles(self):
if self.is_draft():
return []
wf_status = self.get_status()
if not wf_status:
return []
from wcs.workflows import get_role_translation
status_action_roles = set()
for item in wf_status.items or []:
if not hasattr(item, 'by') or not item.by:
continue
with get_publisher().substitutions.temporary_feed(self):
if not item.check_condition(self):
continue
for role in item.by:
if role == '_submitter':
status_action_roles.add(role)
else:
real_role = get_role_translation(self, role)
if real_role:
status_action_roles.add(real_role)
return status_action_roles
actions_roles = property(get_actions_roles)
def get_last_update_time(self):
if hasattr(self, '_last_update_time'):
return self._last_update_time
if self.evolution and self.evolution[-1].last_jump_datetime:
return self.evolution[-1].last_jump_datetime.timetuple()
elif self.evolution and self.evolution[-1].time:
return self.evolution[-1].time
else:
return self.receipt_time
def set_last_update_time(self, value):
self._last_update_time = value
last_update_time = property(get_last_update_time, set_last_update_time)
def anonymise(self):
for field in self.formdef.get_all_fields():
if field.anonymise:
self.data[field.id] = None
if field.store_display_value:
self.data['%s_display' % field.id] = None
self.anonymised = datetime.datetime.now()
self.user_id = None
self.user_label = None
self.editable_by = None
self.workflow_data = None
self.workflow_roles = None
self.submission_context = None
if self.evolution:
for evo in self.evolution:
evo.who = None
evo.parts = None
evo.comment = None
evo.parts = None
self.store()
def get_display_name(self):
return _('%(name)s #%(id)s') % {
'name': self.formdef.name,
'id': self.get_display_id()}
def get_display_label(self):
if self.digest:
return '%s (%s)' % (self.get_display_name(), self.digest)
return self.get_display_name()
def get_auto_geoloc(self):
# use proper geolocation if it exists
if self.geolocations:
for k, v in self.geolocations.items():
if v:
return v
# fallback to 1st map field
for field in self.formdef.get_all_fields():
if field.key == 'map' and self.data.get(field.id):
return field.get_json_value(self.data[field.id])
return None
def get_json_export_dict(self, include_files=True, anonymise=False, user=None):
data = {}
data['id'] = str(self.id)
data['digest'] = self.digest
data['display_id'] = self.get_display_id()
data['display_name'] = self.get_display_name()
data['text'] = self.get_display_label()
data['receipt_time'] = datetime.datetime(*self.receipt_time[:6])
data['last_update_time'] = datetime.datetime(*self.last_update_time[:6])
data['criticality_level'] = self.criticality_level
data['url'] = self.get_url()
if not anonymise:
try:
user = get_publisher().user_class.get(self.user_id)
except KeyError:
user = None
if user:
data['user'] = user.get_json_export_dict()
data['fields'] = get_json_dict(self.formdef.fields, self.data,
include_files=include_files, anonymise=anonymise)
data['workflow'] = {}
wf_status = self.get_visible_status(user)
if wf_status:
data['workflow']['status'] = {'id': wf_status.id, 'name': wf_status.name}
# Workflow data have unknown purpose, do not store them in anonymised export
if self.workflow_data and not anonymise:
data['workflow']['data'] = self.workflow_data
if self.formdef.workflow.get_backoffice_fields():
data['workflow']['fields'] = get_json_dict(
self.formdef.workflow.get_backoffice_fields(),
self.data, include_files=include_files, anonymise=anonymise)
# add a roles dictionary, with workflow functions and two special
# entries for concerned/actions roles.
data['roles'] = {}
workflow_roles = {}
if self.formdef.workflow_roles:
workflow_roles.update(self.formdef.workflow_roles)
if self.workflow_roles:
workflow_roles.update(self.workflow_roles)
for workflow_role in workflow_roles:
data['roles'][workflow_role] = [workflow_roles.get(workflow_role)]
data['roles']['concerned'] = self.get_concerned_roles()
data['roles']['actions'] = self.get_actions_roles()
for role_key in data['roles']:
# exclude special _submitter value
role_list = [x for x in data['roles'][role_key] if x != '_submitter']
# get role objects
role_list = [Role.get(x, ignore_errors=True) for x in role_list]
# export as json dicts
role_list = [x.get_json_export_dict() for x in role_list if x is not None]
data['roles'][role_key] = role_list
data['submission'] = {
'backoffice': self.backoffice_submission,
'channel': self.submission_channel or 'web',
}
if self.evolution:
evolution = data['evolution'] = []
for evo in self.evolution:
evolution.append(evo.get_json_export_dict(None if anonymise else user,
anonymise=anonymise))
if self.geolocations:
data['geolocations'] = {}
for k, v in self.geolocations.items():
data['geolocations'][k] = v.copy()
return data
def export_to_json(self, include_files=True, anonymise=False):
data = self.get_json_export_dict(include_files=include_files, anonymise=anonymise)
return json.dumps(data, cls=misc.JSONEncoder)
def mark_as_being_visited(self):
object_key = 'formdata-%s-%s' % (self.formdef.url_name, self.id)
get_session().mark_visited_object(object_key)
def feed_session(self):
# this gives a chance to fields to initialize things that would rely on
# current data ahead of times
for field in self.formdef.fields:
field.feed_session(self.data.get(field.id),
self.data.get('%s_display' % field.id))
def get_summary_field_details(self, fields=None, include_unset_required_fields=False):
if fields is None:
fields = self.formdef.fields
def get_value_info(f):
# return the selected value and an optional dictionary that will be
# passed to get_view_value() to provide additional details.
value_details = {}
if f.id not in self.data:
value = None
else:
if f.store_display_value and ('%s_display' % f.id) in self.data:
value = self.data['%s_display' % f.id]
value_details['value_id'] = self.data[f.id]
else:
value = self.data[f.id]
if value is None or value == '':
value = None
return (value, value_details)
on_page = False
current_page_fields = []
pages = []
for i, f in enumerate(fields):
if f.type == 'page':
on_page = f
current_page_fields = []
pages.append({'field': f, 'fields': current_page_fields})
continue
if f.type == 'title' and on_page and not current_page_fields and on_page.label == f.label:
# don't include first title of a page if that title has the
# same text as the page.
continue
if f.type in ('title', 'subtitle', 'comment') and f.include_in_summary_page:
current_page_fields.append({'field': f})
continue
if not hasattr(f, 'get_view_value'):
continue
if not f.include_in_summary_page:
continue
value, value_details = get_value_info(f)
if value is None and not (f.required and include_unset_required_fields):
continue
current_page_fields.append({'field': f, 'value': value, 'value_details': value_details})
if not pages:
fields_and_details = current_page_fields
else:
# ignore empty pages
fields_and_details = []
for page in pages:
if not any([bool('value' in x) for x in page['fields']]):
continue
fields_and_details.append(page)
fields_and_details.extend([x for x in page['fields']])
return fields_and_details
def iter_evolution_parts(self):
for evo in self.evolution or []:
for part in evo.parts or []:
yield part
def __getattr__(self, attr):
try:
return self.__dict__[attr]
except KeyError:
# give direct access to values from the data dictionary
if attr[0] == 'f':
field_id = attr[1:]
if field_id in self.__dict__['data']:
return self.__dict__['data'][field_id]
# if field id is not in data dictionary it may still be a valid
# field, never initialized, check requested field id against
# existing fields ids.
formdef_fields = self.formdef.get_all_fields()
if field_id in [x.id for x in formdef_fields]:
return None
raise AttributeError(attr)
# don't pickle _formdef cache
def __getstate__(self):
odict = self.__dict__.copy()
if '_formdef' in odict:
del odict['_formdef']
return odict
def __setstate__(self, dict):
if '_formdef' in dict:
# there was a time, before October 2007 and 48e46bf0, and pickled
# objects had a _formdef, in case these objects still exists, we
# remove the _formdef beforehand, so it doesn't interfere with the
# cached _formdef already set in data_class()
del dict['_formdef']
self.__dict__ = dict
Substitutions.register('form_receipt_date', category=N_('Form'), comment=N_('Form Receipt Date'))
Substitutions.register('form_receipt_time', category=N_('Form'), comment=N_('Form Receipt Time'))
Substitutions.register('form_number', category=N_('Form'), comment=N_('Form Number'))
Substitutions.register('form_details', category=N_('Form'), comment=N_('Form Details'))
Substitutions.register('form_url', category=N_('Form'), comment=N_('Form URL'))
Substitutions.register('form_url_backoffice', category=N_('Form'), comment=N_('Form URL (backoffice)'))
Substitutions.register('form_status_url', category=N_('Form'), comment=N_('Form Status URL'))
Substitutions.register('form_tracking_code', category=N_('Form'), comment=N_('Form Tracking Code'))
Substitutions.register('form_user_display_name', category=N_('Form'), comment=N_('Form Submitter Name'))
Substitutions.register('form_user_email', category=N_('Form'), comment=N_('Form Submitter Email'))
Substitutions.register_dynamic_source(FormData)