1435 lines
54 KiB
Python
1435 lines
54 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import collections
|
|
import copy
|
|
import datetime
|
|
import json
|
|
import re
|
|
import sys
|
|
import time
|
|
|
|
from quixote import get_publisher, get_request, get_session
|
|
from quixote.http_request import Upload
|
|
|
|
from .qommon import N_, _, misc
|
|
from .qommon.evalutils import make_datetime
|
|
from .qommon.publisher import get_cfg
|
|
from .qommon.storage import Contains, Intersects, Null, StorableObject
|
|
from .qommon.substitution import CompatibilityNamesDict, Substitutions, invalidate_substitution_cache
|
|
from .qommon.template import Template
|
|
|
|
|
|
def get_dict_with_varnames(fields, data, formdata=None, varnames_only=False):
|
|
new_data = {}
|
|
for field in fields:
|
|
if not hasattr(field, 'get_view_value'):
|
|
continue
|
|
raw_value = None
|
|
if data is not None:
|
|
value = data.get(field.id)
|
|
if field.convert_value_to_str:
|
|
raw_value = value
|
|
value = field.convert_value_to_str(value)
|
|
display_value = data.get('%s_display' % field.id)
|
|
else:
|
|
value = ''
|
|
display_value = ''
|
|
|
|
if not varnames_only:
|
|
# add it as f$n$
|
|
new_data['f%s' % field.id.replace('-', '_')] = value
|
|
|
|
# also add it as 'field_' + normalized(field label)
|
|
identifier_name = misc.simplify(field.label, space='_')
|
|
new_data['field_' + identifier_name] = value
|
|
|
|
# and finally add it as its manually defined variable name
|
|
if field.varname:
|
|
if field.store_display_value:
|
|
new_data['var_%s_raw' % field.varname] = value
|
|
new_data['var_%s' % field.varname] = display_value
|
|
else:
|
|
new_data['var_%s' % field.varname] = value
|
|
if field.key == 'file':
|
|
new_data['var_%s_raw' % field.varname] = value
|
|
new_data['var_%s_url' % field.varname] = None
|
|
if value and hasattr(value, 'base_filename'):
|
|
new_data['var_%s' % field.varname] = value.base_filename
|
|
if formdata is not None:
|
|
new_data['var_%s_url' % field.varname] = '%sdownload?f=%s' % (
|
|
formdata.get_url(),
|
|
field.id,
|
|
)
|
|
elif raw_value is not None:
|
|
new_data['var_%s_raw' % field.varname] = raw_value
|
|
if data is not None:
|
|
structured_value = field.get_structured_value(data)
|
|
if isinstance(structured_value, dict):
|
|
for k, v in structured_value.items():
|
|
if k in ('id', 'text'):
|
|
continue
|
|
new_data['var_%s_%s' % (field.varname, k)] = v
|
|
if isinstance(structured_value, list):
|
|
for i, struct_value in enumerate(structured_value):
|
|
for k, v in struct_value.items():
|
|
if k in ('id', 'text'):
|
|
continue
|
|
new_data['var_%s_%s_%s' % (field.varname, i, k)] = v
|
|
if field.store_structured_value:
|
|
new_data['var_%s_structured_raw' % field.varname] = structured_value
|
|
new_data['var_%s_structured' % field.varname] = structured_value
|
|
return new_data
|
|
|
|
|
|
def flatten_dict(d):
|
|
for k, v in list(d.items()):
|
|
if isinstance(v, dict):
|
|
flatten_dict(v)
|
|
for k2, v2 in v.items():
|
|
d['%s_%s' % (k, k2)] = v2
|
|
del d[k]
|
|
|
|
|
|
class Evolution:
|
|
who = None
|
|
status = None
|
|
time = None
|
|
last_jump_datetime = None
|
|
comment = None
|
|
parts = None
|
|
|
|
def __init__(self, formdata=None):
|
|
self._formdata = formdata # formdata cache
|
|
|
|
@property
|
|
def formdata(self):
|
|
return self._formdata
|
|
|
|
def get_author_name(self):
|
|
user_id = self.who
|
|
if self.who == '_submitter':
|
|
user_id = self.formdata.user_id
|
|
try:
|
|
return get_publisher().user_class.get(user_id).display_name
|
|
except KeyError:
|
|
return None
|
|
|
|
def get_author_qualification(self):
|
|
if self.who == '_submitter' and not self.formdata.is_submitter(get_request().user):
|
|
return _('Original Submitter')
|
|
return None
|
|
|
|
def add_part(self, part):
|
|
if not self.parts:
|
|
self.parts = []
|
|
self.parts.append(part)
|
|
|
|
_display_parts = None # cache
|
|
|
|
def display_parts(self):
|
|
if self._display_parts is not None:
|
|
return self._display_parts
|
|
|
|
if not self.parts:
|
|
return []
|
|
|
|
l = []
|
|
for p in self.parts:
|
|
if not hasattr(p, 'view'):
|
|
continue
|
|
if hasattr(p, 'to') and not self.formdata.is_for_current_user(p.to):
|
|
continue
|
|
text = p.view()
|
|
if text:
|
|
l.append(text)
|
|
self._display_parts = l
|
|
return self._display_parts
|
|
|
|
def get_json_export_dict(self, user, anonymise=False, include_files=True):
|
|
data = {
|
|
'time': datetime.datetime(*self.time[:6]) if self.time else None,
|
|
'last_jump_datetime': self.last_jump_datetime,
|
|
}
|
|
if self.status:
|
|
data['status'] = self.status[3:]
|
|
if self.who != '_submitter':
|
|
try:
|
|
user = get_publisher().user_class.get(self.who)
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
data['who'] = user.get_json_export_dict()
|
|
elif not anonymise and user:
|
|
data['who'] = user.get_json_export_dict()
|
|
if self.comment and not anonymise:
|
|
data['comment'] = self.comment
|
|
parts = []
|
|
for part in self.parts or []:
|
|
if hasattr(part, 'get_json_export_dict'):
|
|
d = part.get_json_export_dict(anonymise=anonymise, include_files=include_files)
|
|
if d:
|
|
parts.append(d)
|
|
|
|
if parts:
|
|
data['parts'] = parts
|
|
return data
|
|
|
|
# don't pickle _formata cache
|
|
def __getstate__(self):
|
|
odict = self.__dict__.copy()
|
|
if '_formdata' in odict:
|
|
del odict['_formdata']
|
|
if '_display_parts' in odict:
|
|
del odict['_display_parts']
|
|
return odict
|
|
|
|
@property
|
|
def datetime(self):
|
|
return datetime.datetime(*self.time[:6])
|
|
|
|
def get_status(self):
|
|
status = self.status
|
|
if not self.status:
|
|
# look for the previous evolution with a status
|
|
for evolution in reversed(self.formdata.evolution[: self.formdata.evolution.index(self)]):
|
|
status = evolution.status
|
|
if status:
|
|
break
|
|
return self.formdata.get_status(status=status)
|
|
|
|
def get_status_label(self):
|
|
status = self.get_status()
|
|
return status.name if status else _('Unknown')
|
|
|
|
def is_hidden(self, user=None):
|
|
status = self.get_status()
|
|
if status:
|
|
return not status.is_visible(self.formdata, user or get_request().user)
|
|
return True
|
|
|
|
|
|
class FormData(StorableObject):
|
|
_names = 'XX'
|
|
_hashed_indexes = ['user_id', 'status', 'workflow_roles', 'concerned_roles', 'actions_roles']
|
|
|
|
id_display = None
|
|
|
|
user_id = None
|
|
user_label = None # taken from data, for anonymous users
|
|
receipt_time = None
|
|
status = None
|
|
anonymised = None
|
|
page_no = 0 # page to use when restoring from draft
|
|
evolution = None
|
|
data = None
|
|
editable_by = None
|
|
tracking_code = None
|
|
backoffice_submission = False
|
|
submission_agent_id = None
|
|
submission_context = None
|
|
submission_channel = None
|
|
criticality_level = 0
|
|
digest = None
|
|
|
|
prefilling_data = None
|
|
workflow_data = None
|
|
workflow_roles = None
|
|
geolocations = None
|
|
|
|
_formdef = None
|
|
|
|
def get_formdef(self):
|
|
if self._formdef:
|
|
return self._formdef
|
|
from .formdef import FormDef
|
|
|
|
id = self._names.split('-', 1)[1]
|
|
try:
|
|
self._formdef = FormDef.get_by_urlname(id)
|
|
except KeyError:
|
|
self._formdef = None
|
|
return self._formdef
|
|
|
|
formdef = property(get_formdef)
|
|
|
|
def __init__(self, id=None):
|
|
self.id = id
|
|
|
|
def migrate(self):
|
|
changed = False
|
|
if self.status and not self.status.startswith('wf-') and self.status != 'draft':
|
|
self.status = 'wf-%s' % self.status
|
|
changed = True
|
|
if self.evolution:
|
|
for evo in self.evolution:
|
|
evo._formdata = self # link from evolution to formdata
|
|
if evo.status and not evo.status.startswith('wf-'):
|
|
evo.status = 'wf-%s' % evo.status
|
|
changed = True
|
|
if (
|
|
not self.submission_agent_id
|
|
and self.submission_context
|
|
and self.submission_context.get('agent_id')
|
|
):
|
|
self.submission_agent_id = str(self.submission_context.get('agent_id'))
|
|
changed = True
|
|
|
|
if changed:
|
|
self.store()
|
|
|
|
@invalidate_substitution_cache
|
|
def store(self, *args, **kwargs):
|
|
# make sure the class set under the formdef name in the sys.modules
|
|
# namespaces is the exact one that was used when creating this
|
|
# particular object, as it is required by pickle (or it will raise
|
|
# "Can't pickle %r: it's not the same object as %s.%s" if the class
|
|
# object has been changed in the course of the request).
|
|
setattr(sys.modules[self._formdef.pickle_module_name], self._formdef.data_class_name, self.__class__)
|
|
setattr(
|
|
sys.modules['wcs.%s' % self._formdef.pickle_module_name],
|
|
self._formdef.data_class_name,
|
|
self.__class__,
|
|
)
|
|
has_id = self.id is not None
|
|
if has_id:
|
|
self.set_auto_fields()
|
|
super().store(*args, **kwargs)
|
|
if not has_id: # got it now
|
|
if self.set_auto_fields():
|
|
# store changes
|
|
super().store(*args, **kwargs)
|
|
|
|
def refresh_from_storage(self):
|
|
obj = self.get(self.id)
|
|
self.__dict__ = obj.__dict__
|
|
|
|
def get_user(self):
|
|
if self.user_id and self.user_id != 'ultra-user':
|
|
return get_publisher().user_class.get(self.user_id, ignore_errors=True)
|
|
return None
|
|
|
|
def set_user(self, user):
|
|
if user:
|
|
self.user_id = user.id
|
|
else:
|
|
self.user_id = None
|
|
|
|
user = property(get_user, set_user)
|
|
|
|
def get_user_label(self):
|
|
user = self.user
|
|
if user:
|
|
return user.get_display_name()
|
|
return self.user_label
|
|
|
|
def has_empty_data(self):
|
|
empty = True
|
|
for key in self.data or {}:
|
|
empty &= self.data.get(key) is None
|
|
return empty
|
|
|
|
@classmethod
|
|
def get_actionable_count(cls, user_roles):
|
|
if get_publisher().is_using_postgresql():
|
|
statuses = ['wf-%s' % x.id for x in cls._formdef.workflow.get_not_endpoint_status()]
|
|
criterias = [
|
|
Intersects('actions_roles_array', user_roles),
|
|
Contains('status', statuses),
|
|
Null('anonymised'),
|
|
]
|
|
return cls.count(criterias)
|
|
else:
|
|
return len(cls.get_actionable_ids(user_roles))
|
|
|
|
@classmethod
|
|
def get_actionable_ids(cls, user_roles):
|
|
statuses = ['wf-%s' % x.id for x in cls._formdef.workflow.get_not_endpoint_status()]
|
|
if get_publisher().is_using_postgresql():
|
|
criterias = [Intersects('actions_roles_array', user_roles), Contains('status', statuses)]
|
|
return cls.keys(criterias)
|
|
else:
|
|
actions_ids = set()
|
|
for role in user_roles:
|
|
actions_ids |= set(cls.get_ids_with_indexed_value('actions_roles', str(role)))
|
|
open_ids = []
|
|
for status_id in statuses:
|
|
open_ids.extend(cls.get_ids_with_indexed_value('status', status_id))
|
|
return list(actions_ids.intersection(open_ids))
|
|
|
|
@classmethod
|
|
def get_submission_channels(cls):
|
|
return collections.OrderedDict(
|
|
[
|
|
('mail', _('Mail')),
|
|
('email', _('Email')),
|
|
('phone', _('Phone')),
|
|
('counter', _('Counter')),
|
|
('fax', _('Fax')),
|
|
('web', _('Web')),
|
|
('social-network', _('Social Network')),
|
|
]
|
|
)
|
|
|
|
def get_submission_channel_label(self):
|
|
return self.get_submission_channels().get(self.submission_channel) or _('Web')
|
|
|
|
def get_parent(self):
|
|
if not self.submission_context:
|
|
return None
|
|
object_type = self.submission_context.get('orig_object_type', 'formdef')
|
|
objectdef_id = self.submission_context.get('orig_formdef_id')
|
|
objectdata_id = self.submission_context.get('orig_formdata_id')
|
|
if not (object_type and objectdef_id and objectdata_id):
|
|
return None
|
|
if object_type == 'carddef':
|
|
from .carddef import CardDef
|
|
|
|
objectdef_class = CardDef
|
|
else:
|
|
from .formdef import FormDef
|
|
|
|
objectdef_class = FormDef
|
|
try:
|
|
return objectdef_class.get(objectdef_id).data_class().get(objectdata_id)
|
|
except KeyError:
|
|
return None
|
|
|
|
def just_created(self):
|
|
self.receipt_time = time.localtime()
|
|
self.status = 'wf-%s' % self.formdef.workflow.possible_status[0].id
|
|
# we add the initial status to the history, this makes it more readable
|
|
# afterwards (also this gets the (previous_status) code to work in all
|
|
# cases)
|
|
evo = Evolution(self)
|
|
evo.who = '_submitter'
|
|
evo.time = self.receipt_time
|
|
evo.status = self.status
|
|
self.evolution = [evo]
|
|
|
|
def set_auto_fields(self, *args, **kwargs):
|
|
fields = {'digest': self.formdef.digest_template}
|
|
if not self.id_display:
|
|
# only set id_display once as it may have been set automatically
|
|
# by interpreting a webservice response.
|
|
fields['id_display'] = self.formdef.get_display_id_format().strip()
|
|
|
|
changed = False
|
|
|
|
users_cfg = get_cfg('users', {})
|
|
if not self.user_id and users_cfg and users_cfg.get('field_name'):
|
|
field_name_values = users_cfg.get('field_name')
|
|
form_user_data = {}
|
|
for field in self.formdef.fields:
|
|
if not hasattr(field, 'prefill'):
|
|
continue
|
|
if field.prefill and field.prefill.get('type') == 'user':
|
|
form_user_data[field.prefill['value']] = self.data.get(field.id)
|
|
user_label = ' '.join(
|
|
[form_user_data.get(x) for x in field_name_values if isinstance(form_user_data.get(x), str)]
|
|
)
|
|
if user_label != self.user_label:
|
|
self.user_label = user_label
|
|
changed = True
|
|
|
|
if any(fields.values()):
|
|
context = self.get_substitution_variables()
|
|
context['formdef_id'] = self.formdef.id
|
|
for attribute, template in fields.items():
|
|
if template is None:
|
|
new_value = None
|
|
else:
|
|
new_value = Template(template, autoescape=False).render(context)
|
|
if new_value != getattr(self, attribute, None):
|
|
setattr(self, attribute, new_value)
|
|
changed = True
|
|
return changed
|
|
|
|
def get_lateral_block(self):
|
|
context = get_publisher().substitutions.get_context_variables(mode='lazy')
|
|
context['formdef_id'] = self.formdef.id
|
|
if self.formdef.lateral_template is None:
|
|
new_value = None
|
|
else:
|
|
new_value = Template(self.formdef.lateral_template, autoescape=False).render(context)
|
|
return new_value
|
|
|
|
# criticality levels are stored as [0, 101, 102, 103...], this makes it
|
|
# easier to group "uncritical" formdatas (=0) together when sorting.
|
|
def get_current_criticality_level(self):
|
|
levels = len(self.formdef.workflow.criticality_levels or [0])
|
|
current_level = self.criticality_level or 0
|
|
if current_level >= 100 + levels:
|
|
# too high, probably because the workflow was changed and there is
|
|
# fewer levels than before
|
|
current_level = 100 + levels - 1
|
|
return current_level
|
|
|
|
def increase_criticality_level(self):
|
|
levels = len(self.formdef.workflow.criticality_levels or [0])
|
|
current_level = self.get_current_criticality_level()
|
|
if current_level == 0:
|
|
current_level = 100
|
|
if current_level < (100 + levels - 1):
|
|
self.criticality_level = current_level + 1
|
|
self.store()
|
|
|
|
def decrease_criticality_level(self):
|
|
current_level = self.get_current_criticality_level()
|
|
if current_level == 0:
|
|
return
|
|
self.criticality_level = current_level - 1
|
|
if self.criticality_level <= 100:
|
|
self.criticality_level = 0
|
|
self.store()
|
|
|
|
def set_criticality_level(self, level):
|
|
levels = len(self.formdef.workflow.criticality_levels or [0])
|
|
level = min(levels - 1, level)
|
|
if level > 0:
|
|
self.criticality_level = 100 + level
|
|
else:
|
|
self.criticality_level = 0
|
|
self.store()
|
|
|
|
def get_criticality_level_object(self):
|
|
levels = self.formdef.workflow.criticality_levels or []
|
|
if not levels:
|
|
raise IndexError()
|
|
current_level = self.get_current_criticality_level()
|
|
if current_level > 0:
|
|
current_level = current_level - 100
|
|
return levels[current_level]
|
|
|
|
def perform_workflow(self):
|
|
url = None
|
|
get_publisher().substitutions.feed(self)
|
|
wf_status = self.get_status()
|
|
from wcs.workflows import perform_items
|
|
|
|
url = perform_items(wf_status.items, self)
|
|
return url
|
|
|
|
def perform_global_action(self, action_id, user):
|
|
from wcs.workflows import perform_items
|
|
|
|
for action in self.formdef.workflow.get_global_actions_for_user(formdata=self, user=user):
|
|
if action.id != action_id:
|
|
continue
|
|
perform_items(action.items, self)
|
|
break
|
|
|
|
def get_workflow_messages(self, position='top'):
|
|
wf_status = self.get_status()
|
|
if not wf_status:
|
|
return []
|
|
messages = []
|
|
for item in wf_status.items:
|
|
if not item.check_condition(self):
|
|
continue
|
|
if hasattr(item, 'get_message'):
|
|
message = item.get_message(self, position=position)
|
|
if message:
|
|
messages.append(message)
|
|
return messages
|
|
|
|
def get_status(self, status=None):
|
|
if not status:
|
|
status = self.status
|
|
if status is None:
|
|
return None
|
|
if not self.formdef:
|
|
return None
|
|
if status.startswith('wf-'):
|
|
status = status[3:]
|
|
try:
|
|
wf_status = [x for x in self.formdef.workflow.possible_status if x.id == status][0]
|
|
except IndexError:
|
|
return None
|
|
return wf_status
|
|
|
|
def get_status_label(self, status=None):
|
|
if self.is_draft(status):
|
|
return _('Draft')
|
|
wf_status = self.get_status(status)
|
|
if not wf_status:
|
|
return _('Unknown')
|
|
return wf_status.name
|
|
|
|
def get_visible_status(self, user=None):
|
|
if not self.evolution:
|
|
return self.get_status()
|
|
if not user and get_request():
|
|
user = get_request().user
|
|
for evo in reversed(self.evolution):
|
|
if not evo.status:
|
|
continue
|
|
wf_status = self.get_status(evo.status)
|
|
if not wf_status:
|
|
continue
|
|
if not wf_status.is_visible(self, user):
|
|
continue
|
|
return wf_status
|
|
return None
|
|
|
|
def get_visible_evolution_parts(self, user=None):
|
|
last_seen_status = None
|
|
last_seen_author = None
|
|
for evolution_part in self.evolution or []:
|
|
if evolution_part.is_hidden(user=user):
|
|
continue
|
|
if (evolution_part.status is None or last_seen_status == evolution_part.status) and (
|
|
evolution_part.who is None or last_seen_author == evolution_part.who
|
|
):
|
|
# don't include empty evolution parts if status and author
|
|
# didn't change.
|
|
if not evolution_part.comment and not evolution_part.display_parts():
|
|
continue
|
|
last_seen_status = evolution_part.status or last_seen_status
|
|
last_seen_author = evolution_part.who or last_seen_author
|
|
yield evolution_part
|
|
|
|
def get_workflow_form(self, user, displayed_fields=None):
|
|
wf_status = self.get_status()
|
|
if not wf_status:
|
|
return None
|
|
return wf_status.get_action_form(self, user, displayed_fields=displayed_fields)
|
|
|
|
def handle_workflow_form(self, user, form):
|
|
wf_status = self.get_status()
|
|
if not wf_status:
|
|
return None
|
|
return wf_status.handle_form(form, self, user)
|
|
|
|
def evaluate_live_workflow_form(self, user, form):
|
|
wf_status = self.get_status()
|
|
if not wf_status:
|
|
return None
|
|
wf_status.evaluate_live_form(form, self, user)
|
|
|
|
def pop_previous_marked_status(self):
|
|
if not self.workflow_data or '_markers_stack' not in self.workflow_data:
|
|
return None
|
|
try:
|
|
marker_data = self.workflow_data['_markers_stack'].pop()
|
|
status_id = marker_data['status_id']
|
|
except IndexError:
|
|
return None
|
|
try:
|
|
return self.formdef.workflow.get_status(status_id)
|
|
except KeyError:
|
|
return None
|
|
|
|
def jump_status(self, status_id, user_id=None):
|
|
if status_id == '_previous':
|
|
previous_status = self.pop_previous_marked_status()
|
|
if not previous_status:
|
|
summary = _('Failed to compute previous status')
|
|
get_publisher().record_error(summary, formdata=self)
|
|
return
|
|
status_id = previous_status.id
|
|
status = 'wf-%s' % status_id
|
|
if not self.evolution:
|
|
self.evolution = []
|
|
elif (
|
|
self.status == status
|
|
and self.evolution[-1].status == status
|
|
and not self.evolution[-1].comment
|
|
and not self.evolution[-1].display_parts()
|
|
):
|
|
# if status do not change and last evolution is empty,
|
|
# just update last jump time on last evolution, do not add one
|
|
self.evolution[-1].last_jump_datetime = datetime.datetime.now()
|
|
self.store()
|
|
return
|
|
evo = Evolution(self)
|
|
evo.time = time.localtime()
|
|
evo.status = status
|
|
evo.who = user_id
|
|
self.evolution.append(evo)
|
|
self.status = status
|
|
self.store()
|
|
|
|
def get_url(self, backoffice=False):
|
|
return '%s%s/' % (self.formdef.get_url(backoffice=backoffice), self.id)
|
|
|
|
def get_backoffice_url(self):
|
|
return self.get_url(backoffice=True)
|
|
|
|
def get_api_url(self):
|
|
return '%s%s/' % (self.formdef.get_api_url(), self.id)
|
|
|
|
def get_display_id(self):
|
|
return str(self.id_display or self.id)
|
|
|
|
def get_role_translation(self, role_name):
|
|
if role_name == '_submitter':
|
|
raise Exception('_submitter is not a valid role')
|
|
if str(role_name).startswith('_'):
|
|
role_id = None
|
|
if self.workflow_roles:
|
|
role_id = self.workflow_roles.get(role_name)
|
|
if not role_id and self.formdef.workflow_roles:
|
|
role_id = self.formdef.workflow_roles.get(role_name)
|
|
if role_id is None:
|
|
return role_id
|
|
return str(role_id)
|
|
return str(role_name)
|
|
|
|
def get_handling_role_id(self):
|
|
# TODO: look at current status and return the role(s) actually
|
|
# concerned by the handling of the formdata
|
|
return self.get_role_translation('_receiver')
|
|
|
|
def get_handling_role(self):
|
|
try:
|
|
return get_publisher().role_class.get(self.get_handling_role_id())
|
|
except KeyError:
|
|
return None
|
|
|
|
def get_field_view_value(self, field, max_length=None):
|
|
# return the value of the given field, with special handling for "fake"
|
|
# field types that are shortcuts to internal properties.
|
|
if field.type == 'id':
|
|
return self.get_display_id()
|
|
if field.type == 'display_name':
|
|
return self.get_display_name()
|
|
if field.type == 'time':
|
|
return misc.localstrftime(self.receipt_time)
|
|
if field.type == 'last_update_time':
|
|
return misc.localstrftime(self.last_update_time)
|
|
if field.type == 'user-label':
|
|
return self.get_user_label() or '-'
|
|
if field.type == 'status':
|
|
return self.get_status_label()
|
|
if field.type == 'submission_channel':
|
|
return self.get_submission_channel_label()
|
|
if field.type == 'submission_agent':
|
|
try:
|
|
agent_user = self.submission_agent_id
|
|
return get_publisher().user_class.get(agent_user).display_name
|
|
except (KeyError, TypeError):
|
|
return '-'
|
|
if field.type == 'anonymised':
|
|
return _('Yes') if self.anonymised else _('No')
|
|
|
|
field_value = self.data.get(field.id)
|
|
if field_value is None:
|
|
return ''
|
|
if max_length is not None:
|
|
# if max_length is set the target is a backoffice listing/table,
|
|
# return an html value, appropriately shortened.
|
|
field_value = self.data.get('%s_display' % field.id, field_value)
|
|
return field.get_view_short_value(field_value, max_length)
|
|
else:
|
|
# otherwise return the actual "raw" field value
|
|
return field_value
|
|
|
|
def update_workflow_data(self, dict):
|
|
if not self.workflow_data:
|
|
self.workflow_data = {}
|
|
self.workflow_data.update(dict)
|
|
|
|
def get_as_dict(self):
|
|
return get_dict_with_varnames(self.formdef.get_all_fields(), self.data, self)
|
|
|
|
def is_at_endpoint_status(self):
|
|
endpoint_status_ids = ['wf-%s' % x.id for x in self.formdef.workflow.get_endpoint_status()]
|
|
return self.status in endpoint_status_ids
|
|
|
|
def get_static_substitution_variables(self, minimal=False):
|
|
d = {}
|
|
|
|
if self.id:
|
|
d.update(
|
|
{
|
|
'form_receipt_date': misc.strftime(misc.date_format(), self.receipt_time),
|
|
'form_receipt_time': misc.strftime('%H:%M', self.receipt_time),
|
|
'form_number': str(self.get_display_id()),
|
|
'form_number_raw': '%s' % self.id,
|
|
'form_url': self.get_url(),
|
|
'form_url_backoffice': self.get_url(backoffice=True),
|
|
'form_uri': '%s/%s/' % (self.formdef.url_name, self.id),
|
|
'form_criticality_level': self.criticality_level,
|
|
'form_digest': self.digest,
|
|
'form_display_name': self.get_display_name(),
|
|
}
|
|
)
|
|
if self.receipt_time:
|
|
# always get receipt time as a datetime object, this handles
|
|
# both normal formdata (where receipt_time is a time.struct_time)
|
|
# and sql.AnyFormData where it's already a datetime object.
|
|
d['form_receipt_datetime'] = make_datetime(self.receipt_time)
|
|
if self.last_update_time:
|
|
d['form_last_update_datetime'] = make_datetime(self.last_update_time)
|
|
if self.formdef.workflow.criticality_levels:
|
|
try:
|
|
level = self.get_criticality_level_object()
|
|
except IndexError:
|
|
pass
|
|
else:
|
|
d['form_criticality_label'] = level.name
|
|
|
|
d['form_status'] = self.get_status_label()
|
|
|
|
if self.id and self.formdef.workflow and self.status:
|
|
d['form_status_is_endpoint'] = self.is_at_endpoint_status()
|
|
|
|
if self.tracking_code:
|
|
d['form_tracking_code'] = self.tracking_code
|
|
elif not self.status and self.data:
|
|
if 'future_tracking_code' in self.data:
|
|
d['form_tracking_code'] = self.data['future_tracking_code']
|
|
elif 'draft_formdata_id' in self.data:
|
|
try:
|
|
d['form_tracking_code'] = (
|
|
self.formdef.data_class().get(self.data['draft_formdata_id']).tracking_code
|
|
)
|
|
except KeyError:
|
|
pass
|
|
|
|
d['form_submission_backoffice'] = self.backoffice_submission
|
|
d['form_submission_channel'] = self.submission_channel
|
|
d['form_submission_channel_label'] = self.get_submission_channel_label()
|
|
if self.submission_context:
|
|
d['form_submission_context'] = self.submission_context
|
|
|
|
# formdef and category variables
|
|
d.update(self.formdef.get_static_substitution_variables(minimal=minimal))
|
|
|
|
if minimal:
|
|
d = copy.deepcopy(d)
|
|
flatten_dict(d)
|
|
return d
|
|
|
|
if self.id:
|
|
d.update(
|
|
{
|
|
'form_status_url': '%sstatus' % self.get_url(),
|
|
'form_details': self.formdef.get_detailed_email_form(self, self.get_url()),
|
|
}
|
|
)
|
|
|
|
user = self.get_user()
|
|
if user:
|
|
d.update(user.get_substitution_variables(prefix='form_'))
|
|
|
|
for k, v in self.get_as_dict().items():
|
|
d['form_' + k] = v
|
|
|
|
# include substitution variables for workflow roles; this will
|
|
# typically give variables such as form_role_receiver_name and
|
|
# form_role_receiver_emails.
|
|
workflow_roles = {}
|
|
if self.formdef.workflow_roles:
|
|
workflow_roles.update(self.formdef.workflow_roles)
|
|
if self.workflow_roles:
|
|
workflow_roles.update(self.workflow_roles)
|
|
|
|
for role_type, role_id in workflow_roles.items():
|
|
prefix = 'form_role_%s_' % role_type.replace('-', '_').strip('_')
|
|
try:
|
|
d.update(get_publisher().role_class.get(role_id).get_substitution_variables(prefix))
|
|
except KeyError:
|
|
pass
|
|
|
|
if self.evolution and self.evolution[-1].comment:
|
|
d['form_comment'] = self.evolution[-1].comment
|
|
else:
|
|
d['form_comment'] = ''
|
|
|
|
d['form_previous_status'] = ''
|
|
d['form_status_changed'] = False
|
|
if self.evolution:
|
|
first_evolution_in_current_status = None
|
|
for evolution in reversed(self.evolution):
|
|
if evolution.status and evolution.status != self.status:
|
|
d['form_previous_status'] = self.get_status_label(evolution.status)
|
|
break
|
|
if evolution.status:
|
|
first_evolution_in_current_status = evolution
|
|
if (
|
|
d['form_status'] != d['form_previous_status']
|
|
and self.evolution[-1].status
|
|
and first_evolution_in_current_status is self.evolution[-1]
|
|
and not self.evolution[-1].last_jump_datetime
|
|
):
|
|
# mark status has changed if the previous status was different
|
|
# and we are not on a change done on the same status.
|
|
d['form_status_changed'] = True
|
|
|
|
d['form_evolution'] = self.formdef.get_detailed_evolution(self)
|
|
|
|
if self.formdef.workflow and self.status:
|
|
wf_status = self.get_status()
|
|
if wf_status:
|
|
for item in wf_status.items:
|
|
d.update(item.get_substitution_variables(self))
|
|
|
|
# Add variables from evolution parts classes
|
|
evolution_parts_classes = set(
|
|
part.__class__ for evolution in self.evolution or [] for part in evolution.parts or []
|
|
)
|
|
for klass in evolution_parts_classes:
|
|
if hasattr(klass, 'get_substitution_variables'):
|
|
d.update(klass.get_substitution_variables(self))
|
|
|
|
if self.geolocations:
|
|
for k, v in self.geolocations.items():
|
|
d['form_geoloc_%s_lat' % k] = v.get('lat')
|
|
d['form_geoloc_%s_lon' % k] = v.get('lon')
|
|
d['form_geoloc_%s' % k] = v
|
|
|
|
lazy = self.get_substitution_variables()
|
|
del lazy['form']
|
|
del lazy['attachments']
|
|
d.update(lazy)
|
|
|
|
d = copy.deepcopy(d)
|
|
flatten_dict(d)
|
|
|
|
return d
|
|
|
|
def get_as_lazy(self):
|
|
from wcs.variables import LazyFormData
|
|
|
|
return LazyFormData(self)
|
|
|
|
def get_substitution_variables(self, minimal=False):
|
|
from wcs.workflows import AttachmentsSubstitutionProxy
|
|
|
|
variables = CompatibilityNamesDict(
|
|
{
|
|
'form': self.get_as_lazy(),
|
|
'attachments': AttachmentsSubstitutionProxy(self),
|
|
}
|
|
)
|
|
if self.formdef.category:
|
|
variables.update(self.formdef.category.get_substitution_variables(minimal=minimal))
|
|
if minimal:
|
|
return variables
|
|
|
|
if self.workflow_data:
|
|
d = {}
|
|
# pass over workflow data to:
|
|
# - attach an extra url attribute to uploaded files
|
|
# - ignore "private" attributes
|
|
# - ignore attributes that will conflict with (parts of) the
|
|
# "form" namespace
|
|
for k, v in self.workflow_data.items():
|
|
if k[0] == '_' or k.startswith('form_var_') or k == 'form':
|
|
continue
|
|
d[k] = v
|
|
# recompute _url variable of attached files
|
|
form_url = self.get_url()
|
|
for k, v in self.workflow_data.items():
|
|
if isinstance(v, Upload):
|
|
try:
|
|
formvar, fieldvar = re.match('(.*)_var_(.*)_raw$', k).groups()
|
|
except AttributeError:
|
|
continue
|
|
d[k.rsplit('_', 1)[0] + '_url'] = '%sfiles/form-%s-%s/%s' % (
|
|
form_url,
|
|
formvar,
|
|
fieldvar,
|
|
self.workflow_data['%s_var_%s' % (formvar, fieldvar)],
|
|
)
|
|
|
|
d = copy.deepcopy(d)
|
|
flatten_dict(d)
|
|
variables.update({k: v for k, v in d.items() if CompatibilityNamesDict.valid_key_regex.match(k)})
|
|
|
|
return variables
|
|
|
|
@classmethod
|
|
def get_substitution_variables_list(cls):
|
|
variables = []
|
|
# we can't advertise fields, as this is a metaclass that will be used
|
|
# in FormDef.data_class() to create a real class
|
|
for field in []: # cls.formdef.fields:
|
|
# we only advertise fields with a varname, as they can be
|
|
# considered stable
|
|
if field.varname:
|
|
variables.append((_('Form'), 'form_var_' + field.varname, _('Form Field: %s') % field.label))
|
|
user_variables = get_publisher().user_class.get_substitution_variables_list(prefix='form_')
|
|
for dummy, name, dummy in user_variables:
|
|
variables.append((_('Form'), name, _('Form Submitter Field')))
|
|
return variables
|
|
|
|
@classmethod
|
|
def rebuild_security(cls):
|
|
with get_publisher().substitutions.temporary_feed(cls._formdef):
|
|
cls.rebuild_indexes(indexes=['concerned_roles', 'actions_roles'])
|
|
|
|
def is_submitter(self, user):
|
|
if self.user_id and user and str(self.user_id) == str(user.id):
|
|
return True
|
|
if get_session() and get_session().is_anonymous_submitter(self):
|
|
return True
|
|
return False
|
|
|
|
def is_for_current_user(self, to):
|
|
if not to:
|
|
return True
|
|
if not get_request():
|
|
return False
|
|
user = get_request().user
|
|
for role in to or []:
|
|
if role == '_submitter':
|
|
if self.is_submitter(user):
|
|
return True
|
|
elif user:
|
|
role = self.get_role_translation(role)
|
|
if role in user.get_roles():
|
|
return True
|
|
return False
|
|
|
|
def is_draft(self, status=None):
|
|
if status is None:
|
|
status = self.status
|
|
return status == 'draft'
|
|
|
|
def get_concerned_roles(self):
|
|
if self.is_draft():
|
|
# drafts are only visible to submitter
|
|
return ['_submitter']
|
|
|
|
status_action_roles = set()
|
|
|
|
# make sure the handling roles always gets access to the formdata, till
|
|
# the very end (where it may be that there is no workflow status item
|
|
# at all).
|
|
for function_key in self.formdef.workflow.roles.keys():
|
|
handling_role = self.get_role_translation(function_key)
|
|
if handling_role:
|
|
status_action_roles.add(handling_role)
|
|
|
|
wf_status = self.get_status()
|
|
if not wf_status:
|
|
status_action_roles.add('_submitter')
|
|
else:
|
|
status_action_roles |= set(self.get_actions_roles())
|
|
return status_action_roles
|
|
|
|
concerned_roles = property(get_concerned_roles)
|
|
|
|
def get_actions_roles(self):
|
|
if self.is_draft():
|
|
return []
|
|
|
|
wf_status = self.get_status()
|
|
if not wf_status:
|
|
return []
|
|
|
|
status_action_roles = set()
|
|
for item in wf_status.items or []:
|
|
if not hasattr(item, 'by') or not item.by:
|
|
continue
|
|
with get_publisher().substitutions.temporary_feed(self):
|
|
if not item.check_condition(self):
|
|
continue
|
|
for role in item.by:
|
|
if role == '_submitter':
|
|
status_action_roles.add(role)
|
|
else:
|
|
real_role = self.get_role_translation(role)
|
|
if real_role:
|
|
status_action_roles.add(real_role)
|
|
|
|
return status_action_roles
|
|
|
|
actions_roles = property(get_actions_roles)
|
|
|
|
def get_last_update_time(self):
|
|
if hasattr(self, '_last_update_time'):
|
|
return self._last_update_time
|
|
if self.evolution and self.evolution[-1].last_jump_datetime:
|
|
return self.evolution[-1].last_jump_datetime.timetuple()
|
|
elif self.evolution and self.evolution[-1].time:
|
|
return self.evolution[-1].time
|
|
else:
|
|
return self.receipt_time
|
|
|
|
def set_last_update_time(self, value):
|
|
self._last_update_time = value
|
|
|
|
last_update_time = property(get_last_update_time, set_last_update_time)
|
|
|
|
def anonymise(self):
|
|
for field in self.formdef.get_all_fields():
|
|
if field.anonymise:
|
|
field.set_value(self.data, None)
|
|
|
|
self.anonymised = datetime.datetime.now()
|
|
self.user_id = None
|
|
self.user_label = None
|
|
self.editable_by = None
|
|
self.workflow_data = None
|
|
self.workflow_roles = None
|
|
self.submission_context = None
|
|
|
|
if self.evolution:
|
|
for evo in self.evolution:
|
|
evo.who = None
|
|
evo.parts = None
|
|
evo.comment = None
|
|
evo.parts = None
|
|
self.store()
|
|
|
|
def get_display_name(self):
|
|
return _('%(name)s #%(id)s') % {'name': self.formdef.name, 'id': self.get_display_id()}
|
|
|
|
def get_display_label(self):
|
|
if self.digest:
|
|
return '%s (%s)' % (self.get_display_name(), self.digest)
|
|
return self.get_display_name()
|
|
|
|
def get_auto_geoloc(self):
|
|
# use proper geolocation if it exists
|
|
if self.geolocations:
|
|
for v in self.geolocations.values():
|
|
if v:
|
|
return v
|
|
# fallback to 1st map field
|
|
for field in self.formdef.get_all_fields():
|
|
if field.key == 'map' and self.data.get(field.id):
|
|
return field.get_json_value(self.data[field.id])
|
|
return None
|
|
|
|
@classmethod
|
|
def get_json_data_dict(cls, data, fields, formdata=None, include_files=True, anonymise=False):
|
|
new_data = {}
|
|
seen = set()
|
|
for field in fields:
|
|
if anonymise and field.anonymise:
|
|
continue
|
|
if not field.varname: # exports only named fields
|
|
continue
|
|
if field.varname in seen:
|
|
# skip fields with a varname that is used by another non-empty
|
|
# field.
|
|
continue
|
|
if data is not None:
|
|
value = data.get(field.id)
|
|
if value and hasattr(field, 'get_json_value'):
|
|
value = field.get_json_value(value, formdata=formdata, include_file_content=include_files)
|
|
else:
|
|
value = None
|
|
if value:
|
|
seen.add(field.varname)
|
|
if field.store_display_value:
|
|
new_data[field.varname + '_raw'] = value
|
|
new_data[field.varname] = data.get('%s_display' % field.id)
|
|
else:
|
|
new_data[field.varname] = value
|
|
if field.store_structured_value:
|
|
if data.get('%s_structured' % field.id):
|
|
new_data[field.varname + '_structured'] = data.get('%s_structured' % field.id)
|
|
return new_data
|
|
|
|
def get_json_dict(self, fields, include_files=True, anonymise=False):
|
|
return self.get_json_data_dict(
|
|
self.data, fields, formdata=self, include_files=include_files, anonymise=anonymise
|
|
)
|
|
|
|
def get_json_export_dict(self, include_files=True, anonymise=False, user=None):
|
|
data = {}
|
|
data['id'] = str(self.id)
|
|
data['digest'] = self.digest
|
|
data['display_id'] = self.get_display_id()
|
|
data['display_name'] = self.get_display_name()
|
|
data['text'] = self.get_display_label()
|
|
data['receipt_time'] = datetime.datetime(*self.receipt_time[:6])
|
|
data['last_update_time'] = datetime.datetime(*self.last_update_time[:6])
|
|
data['criticality_level'] = self.criticality_level
|
|
data['url'] = self.get_url()
|
|
|
|
if not anonymise:
|
|
try:
|
|
user = get_publisher().user_class.get(self.user_id)
|
|
except KeyError:
|
|
user = None
|
|
if user:
|
|
data['user'] = user.get_json_export_dict()
|
|
|
|
data['fields'] = self.get_json_dict(
|
|
self.formdef.fields, include_files=include_files, anonymise=anonymise
|
|
)
|
|
|
|
data['workflow'] = {}
|
|
wf_status = self.get_visible_status(user)
|
|
if wf_status:
|
|
data['workflow']['status'] = {'id': wf_status.id, 'name': wf_status.name}
|
|
wf_real_status = self.get_status()
|
|
if wf_real_status:
|
|
data['workflow']['real_status'] = {'id': wf_real_status.id, 'name': wf_real_status.name}
|
|
# Workflow data have unknown purpose, do not store them in anonymised export
|
|
if self.workflow_data and not anonymise:
|
|
data['workflow']['data'] = self.workflow_data
|
|
if self.formdef.workflow.get_backoffice_fields():
|
|
data['workflow']['fields'] = self.get_json_dict(
|
|
self.formdef.workflow.get_backoffice_fields(),
|
|
include_files=include_files,
|
|
anonymise=anonymise,
|
|
)
|
|
|
|
# add a roles dictionary, with workflow functions and two special
|
|
# entries for concerned/actions roles.
|
|
data['roles'] = {}
|
|
workflow_roles = {}
|
|
if self.formdef.workflow_roles:
|
|
workflow_roles.update(self.formdef.workflow_roles)
|
|
if self.workflow_roles:
|
|
workflow_roles.update(self.workflow_roles)
|
|
for workflow_role in workflow_roles:
|
|
data['roles'][workflow_role] = [workflow_roles.get(workflow_role)]
|
|
data['roles']['concerned'] = self.get_concerned_roles()
|
|
data['roles']['actions'] = self.get_actions_roles()
|
|
|
|
for role_key in data['roles']:
|
|
# exclude special _submitter value
|
|
role_list = [x for x in data['roles'][role_key] if x != '_submitter']
|
|
# get role objects
|
|
role_list = [get_publisher().role_class.get(x, ignore_errors=True) for x in role_list]
|
|
# export as json dicts
|
|
role_list = [x.get_json_export_dict() for x in role_list if x is not None]
|
|
data['roles'][role_key] = role_list
|
|
|
|
data['submission'] = {
|
|
'backoffice': self.backoffice_submission,
|
|
'channel': self.submission_channel or 'web',
|
|
}
|
|
|
|
if self.evolution:
|
|
evolution = data['evolution'] = []
|
|
for evo in self.evolution:
|
|
evolution.append(
|
|
evo.get_json_export_dict(
|
|
user=None if anonymise else user,
|
|
anonymise=anonymise,
|
|
include_files=include_files,
|
|
)
|
|
)
|
|
|
|
if self.geolocations:
|
|
data['geolocations'] = {}
|
|
for k, v in self.geolocations.items():
|
|
data['geolocations'][k] = v.copy()
|
|
|
|
return data
|
|
|
|
def export_to_json(self, include_files=True, anonymise=False):
|
|
data = self.get_json_export_dict(include_files=include_files, anonymise=anonymise)
|
|
return json.dumps(data, cls=misc.JSONEncoder)
|
|
|
|
def get_object_key(self):
|
|
return '%s-%s-%s' % (self.formdef.xml_root_node, self.formdef.url_name, self.id)
|
|
|
|
def feed_session(self):
|
|
# this gives a chance to fields to initialize things that would rely on
|
|
# current data ahead of times
|
|
for field in self.formdef.fields:
|
|
field.feed_session(self.data.get(field.id), self.data.get('%s_display' % field.id))
|
|
|
|
def get_summary_field_details(self, fields=None, include_unset_required_fields=False):
|
|
if fields is None:
|
|
fields = self.formdef.fields
|
|
|
|
on_page = False
|
|
current_page_fields = []
|
|
pages = []
|
|
for f in fields:
|
|
if f.type == 'page':
|
|
on_page = f
|
|
current_page_fields = []
|
|
pages.append({'field': f, 'fields': current_page_fields})
|
|
continue
|
|
|
|
if f.type == 'title' and on_page and not current_page_fields and on_page.label == f.label:
|
|
# don't include first title of a page if that title has the
|
|
# same text as the page.
|
|
continue
|
|
|
|
if f.type in ('title', 'subtitle', 'comment') and f.include_in_summary_page:
|
|
current_page_fields.append({'field': f})
|
|
continue
|
|
|
|
if not hasattr(f, 'get_view_value'):
|
|
continue
|
|
|
|
if not f.include_in_summary_page:
|
|
continue
|
|
|
|
value, value_details = f.get_value_info(self.data)
|
|
if value is None and not (f.required and include_unset_required_fields):
|
|
continue
|
|
|
|
current_page_fields.append({'field': f, 'value': value, 'value_details': value_details})
|
|
|
|
if not pages:
|
|
fields_and_details = current_page_fields
|
|
else:
|
|
# ignore empty pages
|
|
fields_and_details = []
|
|
for page in pages:
|
|
if not any(bool('value' in x) for x in page['fields']):
|
|
continue
|
|
fields_and_details.append(page)
|
|
fields_and_details.extend([x for x in page['fields']])
|
|
|
|
return fields_and_details
|
|
|
|
def iter_evolution_parts(self):
|
|
for evo in self.evolution or []:
|
|
for part in evo.parts or []:
|
|
yield part
|
|
|
|
def iter_target_datas(self, objectdef=None, object_type=None, status_item=None):
|
|
# objectdef, object_type and status_item are provided when called from a workflow action
|
|
from wcs.wf.create_formdata import LinkedFormdataEvolutionPart
|
|
|
|
from .carddef import CardDef
|
|
from .formdef import FormDef
|
|
|
|
parent = self.get_parent()
|
|
if parent and object_type:
|
|
# looking for a parent of a specific type (workflow action)
|
|
parent_identifier = '%s:%s' % (parent.formdef.xml_root_node, parent.formdef.url_name)
|
|
if parent_identifier == object_type:
|
|
yield parent
|
|
elif parent:
|
|
# looking for any parent (inspect page)
|
|
yield (parent, _('Parent'))
|
|
|
|
data_ids = []
|
|
# search linked objects in data sources
|
|
for field in self.get_formdef().get_all_fields():
|
|
linked_id = self.data.get(field.id)
|
|
if not linked_id:
|
|
continue
|
|
data_source = getattr(field, 'data_source', None)
|
|
if not data_source:
|
|
continue
|
|
origin = _('Data Source')
|
|
if field.varname:
|
|
origin = '%s - varname "%s"' % (origin, field.varname)
|
|
if object_type:
|
|
# looking for a data_source of a specific type (workflow action)
|
|
if data_source['type'] == object_type:
|
|
data_ids.append((data_source['type'], linked_id, origin))
|
|
else:
|
|
# looking for any data_source (inspect page)
|
|
data_ids.append((data_source['type'], linked_id, origin))
|
|
|
|
# search in evolution
|
|
for part in self.iter_evolution_parts():
|
|
if not isinstance(part, LinkedFormdataEvolutionPart):
|
|
continue
|
|
part_identifier = '%s:%s' % (part.formdef.xml_root_node, part.formdef.url_name)
|
|
if object_type:
|
|
# looking for an object of a specific type (workflow action)
|
|
if part_identifier == object_type:
|
|
data_ids.append((part_identifier, part.formdata_id, _('Evolution')))
|
|
else:
|
|
# looking for any object (inspect page)
|
|
data_ids.append((part_identifier, part.formdata_id, _('Evolution')))
|
|
|
|
for (slug, target_id, origin) in data_ids:
|
|
if object_type:
|
|
# workflow action
|
|
try:
|
|
yield objectdef.data_class().get(target_id)
|
|
except KeyError as e:
|
|
# use custom error message depending on target type
|
|
get_publisher().record_error(
|
|
_('Could not find linked "%(object_name)s" object by id %(object_id)s')
|
|
% {'object_name': objectdef.name, 'object_id': target_id},
|
|
formdata=self,
|
|
status_item=status_item,
|
|
exception=e,
|
|
)
|
|
else:
|
|
# inspect page
|
|
try:
|
|
obj_type, slug = slug.split(':')
|
|
if obj_type == 'formdef':
|
|
obj_class = FormDef
|
|
elif obj_type == 'carddef':
|
|
obj_class = CardDef
|
|
try:
|
|
_objectdef = obj_class.get_by_urlname(slug)
|
|
except KeyError:
|
|
yield (
|
|
_('Linked object def by id %(object_id)s') % {'object_id': slug},
|
|
_('%s - not found') % origin,
|
|
)
|
|
else:
|
|
yield (_objectdef.data_class().get(target_id), origin)
|
|
except ValueError:
|
|
pass
|
|
except KeyError:
|
|
yield (
|
|
_('Linked "%(object_name)s" object by id %(object_id)s')
|
|
% {'object_name': _objectdef.name, 'object_id': target_id},
|
|
_('%s - not found') % origin,
|
|
)
|
|
|
|
def __getattr__(self, attr):
|
|
try:
|
|
return self.__dict__[attr]
|
|
except KeyError:
|
|
# give direct access to values from the data dictionary
|
|
if attr[0] == 'f':
|
|
field_id = attr[1:]
|
|
if field_id in self.__dict__['data']:
|
|
return self.__dict__['data'][field_id]
|
|
# if field id is not in data dictionary it may still be a valid
|
|
# field, never initialized, check requested field id against
|
|
# existing fields ids.
|
|
formdef_fields = self.formdef.get_all_fields()
|
|
if field_id in [x.id for x in formdef_fields]:
|
|
return None
|
|
raise AttributeError(attr)
|
|
|
|
# don't pickle _formdef cache
|
|
def __getstate__(self):
|
|
odict = self.__dict__.copy()
|
|
if '_formdef' in odict:
|
|
del odict['_formdef']
|
|
return odict
|
|
|
|
def __setstate__(self, dict):
|
|
if '_formdef' in dict:
|
|
# there was a time, before October 2007 and 48e46bf0, and pickled
|
|
# objects had a _formdef, in case these objects still exists, we
|
|
# remove the _formdef beforehand, so it doesn't interfere with the
|
|
# cached _formdef already set in data_class()
|
|
del dict['_formdef']
|
|
self.__dict__ = dict
|
|
|
|
|
|
Substitutions.register('form_receipt_date', category=N_('Form'), comment=N_('Form Receipt Date'))
|
|
Substitutions.register('form_receipt_time', category=N_('Form'), comment=N_('Form Receipt Time'))
|
|
Substitutions.register('form_number', category=N_('Form'), comment=N_('Form Number'))
|
|
Substitutions.register('form_details', category=N_('Form'), comment=N_('Form Details'))
|
|
Substitutions.register('form_url', category=N_('Form'), comment=N_('Form URL'))
|
|
Substitutions.register('form_url_backoffice', category=N_('Form'), comment=N_('Form URL (backoffice)'))
|
|
Substitutions.register('form_status_url', category=N_('Form'), comment=N_('Form Status URL'))
|
|
Substitutions.register('form_tracking_code', category=N_('Form'), comment=N_('Form Tracking Code'))
|
|
Substitutions.register('form_user_display_name', category=N_('Form'), comment=N_('Form Submitter Name'))
|
|
Substitutions.register('form_user_email', category=N_('Form'), comment=N_('Form Submitter Email'))
|
|
Substitutions.register_dynamic_source(FormData)
|