wcs/wcs/wf/create_formdata.py

455 lines
19 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2016 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import time
import xml.etree.ElementTree as ET
from quixote import get_request, get_session, get_publisher
from quixote.html import htmltext
from django.utils.functional import cached_property
from wcs.qommon import _, N_
from wcs.qommon.form import (WidgetListAsTable, CompositeWidget,
SingleSelectWidget, ComputedExpressionWidget,
CheckboxWidget, VarnameWidget, HtmlWidget)
from wcs.logged_errors import LoggedError
from wcs.workflows import WorkflowStatusItem, register_item_class
from wcs.formdef import FormDef
class Mapping(object):
field_id = None
expression = None
def __init__(self, field_id, expression):
self.field_id = field_id
self.expression = expression
class MappingWidget(CompositeWidget):
def __init__(self, name, value=None, to_formdef=None, **kwargs):
value = value or Mapping(None, '')
super(MappingWidget, self).__init__(name, value, **kwargs)
to_fields = self._fields_to_options(to_formdef)
self.add(SingleSelectWidget,
name='field_id',
title=_('Field'),
value=value.field_id,
options=to_fields)
self.add(ComputedExpressionWidget,
name='expression',
title=_('Expression'),
value=value.expression)
def _fields_to_options(self, formdef):
return [(None, '---', '')] + [
(field.id, field.label, str(field.id)) for field in formdef.get_widget_fields()]
def _parse(self, request):
super(MappingWidget, self)._parse(request)
if self.get('field_id') is not None and self.get('expression') is not None:
self.value = Mapping(field_id=self.get('field_id'), expression=self.get('expression'))
else:
self.value = None
class MappingsWidget(WidgetListAsTable):
readonly = False
# widget_list.js does not work with ComputedExpressionWidget,
# so we revert to quixote behaviour for adding a line
def add_media(self):
pass
def __init__(self, name, to_formdef=None, **kwargs):
self.to_formdef = to_formdef
value = kwargs.get('value')
if value:
# reorder mappings based on to_formdef fields order
value.sort(key=lambda mapping: self.ranks.get(str(mapping.field_id), 9999))
super(MappingsWidget, self).__init__(
name,
element_type=MappingWidget,
element_kwargs={
'to_formdef': to_formdef,
},
**kwargs)
@cached_property
def ranks(self):
return {str(field.id): i for i, field in enumerate(
field for field in self.to_formdef.get_widget_fields())}
def _parse(self, request):
super(MappingsWidget, self)._parse(request)
if self.value:
# prevent many mappings to the same field
if len(set(mapping.field_id for mapping in self.value)) != len(self.value):
self.error = _('Some destination fields are duplicated')
return
# reorder mappings based on to_formdef fields order
self.value.sort(key=lambda mapping: self.ranks.get(str(mapping.field_id), 9999))
class LinkedFormdataEvolutionPart(object):
formdef_class = FormDef
attach_to_history = False
def __init__(self, formdata, varname, attach_to_history):
self._formdef = formdata.formdef
self._formdata = formdata
self.formdef_id = formdata.formdef.id
self.formdata_id = formdata.id
self.varname = varname
self.attach_to_history = attach_to_history
@property
def formdef(self):
if not hasattr(self, '_formdef'):
self._formdef = self.formdef_class.get(self.formdef_id)
return self._formdef
@property
def formdata(self):
if not hasattr(self, '_formdata'):
self._formdata = self.formdef.data_class().get(self.formdata_id, ignore_errors=True)
return self._formdata
def __getstate__(self):
# Forget cached values
return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
def __repr__(self):
return '<Linked %s "%s-%s">' % (self.formdef_class.__name__,
self.formdef_id, self.formdata_id)
@classmethod
def get_substitution_variables(cls, formdata):
d = {}
for part in formdata.iter_evolution_parts():
if not isinstance(part, cls):
continue
if part.formdata:
d['form_links_%s' % (part.varname or '*')] = part
return d
def view(self):
if self.attach_to_history:
return htmltext('<p class="wf-links">%s <a href="%s">%s %s</a>') % (
_('Created new form'),
self.formdata and self.formdata.get_url(
backoffice=bool(get_request() and get_request().is_in_backoffice())),
self.formdef.name if self.formdef else _('Deleted'),
self.formdata_id)
else:
return ''
class LazyFormDataLinks(object):
def __init__(self, formdata):
self._formdata = formdata
def __getattr__(self, varname):
for part in self._formdata.iter_evolution_parts():
if not isinstance(part, LinkedFormdataEvolutionPart):
continue
if part.varname == varname and part.formdata:
return part.formdata.get_substitution_variables()
raise AttributeError(varname)
def inspect_keys(self):
for part in self._formdata.iter_evolution_parts():
if isinstance(part, LinkedFormdataEvolutionPart) and part.varname and part.formdata:
yield part.varname
class CreateFormdataWorkflowStatusItem(WorkflowStatusItem):
description = N_('New Form Creation')
key = 'create_formdata'
category = 'formdata-action'
support_substitution_variables = True
formdef_class = FormDef
evolution_part_class = LinkedFormdataEvolutionPart
formdef_slug = None
formdef_label = N_('Form')
mappings_label = N_('Mappings to new form fields')
varname_hint = N_('This is used to get linked forms in expressions.')
draft = False
backoffice_submission = False
keep_user = True
keep_submission_context = False
mappings = None
varname = None
map_fields_by_varname = False
attach_to_history = False
def _resolve_formdef_slug(self, formdef_slug):
if formdef_slug:
try:
return self.formdef_class.get_by_urlname(formdef_slug)
except KeyError:
pass
return None
@property
def formdef(self):
return self._resolve_formdef_slug(self.formdef_slug)
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
super(CreateFormdataWorkflowStatusItem, self).add_parameters_widgets(
form, parameters, prefix=prefix, formdef=formdef)
if 'formdef_slug' in parameters:
list_forms = [(None, '---', '')]
list_forms += [(x.url_name, x.name, x.url_name) for x in self.formdef_class.select(order_by='name')
if not x.disabled or x.url_name == self.formdef_slug]
form.add(SingleSelectWidget, '%sformdef_slug' % prefix,
title=_('Form'),
value=self.formdef_slug,
options=list_forms)
if 'draft' in parameters:
form.add(CheckboxWidget, '%sdraft' % prefix,
title=_('Create new draft'),
value=self.draft)
if 'backoffice_submission' in parameters:
form.add(CheckboxWidget, '%sbackoffice_submission' % prefix,
title=_('Backoffice submission'),
value=self.backoffice_submission,
advanced=(self.backoffice_submission == CreateFormdataWorkflowStatusItem.backoffice_submission))
if 'keep_user' in parameters:
form.add(CheckboxWidget, '%skeep_user' % prefix,
title=_('Keep user'),
value=self.keep_user,
advanced=(self.keep_user == CreateFormdataWorkflowStatusItem.keep_user))
if 'keep_submission_context' in parameters and self.keep_user:
form.add(CheckboxWidget, '%skeep_submission_context' % prefix,
title=_('Keep submission context'),
value=self.keep_submission_context,
advanced=(
self.keep_submission_context == CreateFormdataWorkflowStatusItem.keep_submission_context))
formdef_slug = form.get('%sformdef_slug' % prefix)
formdef = self._resolve_formdef_slug(formdef_slug)
if 'mappings' in parameters and formdef:
widget = form.add(MappingsWidget, '%smappings' % prefix,
title=_(self.mappings_label),
to_formdef=formdef,
value=self.mappings)
if form.is_submitted() and get_request().form.get('map_fields_by_varname') != 'yes':
# do not validate form if formdef is changed and there is no mappings
if formdef_slug != self.formdef_slug and not widget.parse():
form.get_widget('%smappings' % prefix).set_error(_('Please define new mappings'))
if 'varname' in parameters:
form.add(VarnameWidget, '%svarname' % prefix,
title=_('Identifier'), value=self.varname,
hint=_(self.varname_hint),
advanced=not(bool(self.varname)))
if 'map_fields_by_varname' in parameters and formdef:
form.add(CheckboxWidget, '%smap_fields_by_varname' % prefix,
title=_('Map fields by varname'),
value=self.map_fields_by_varname,
advanced=(
self.map_fields_by_varname == CreateFormdataWorkflowStatusItem.map_fields_by_varname))
if self.map_fields_by_varname:
common_varnames = [htmltext('<tt>%s</tt>') % varname for varname in self._common_varnames()]
common_varnames = htmltext(', ').join(common_varnames)
form.widgets.append(
HtmlWidget(
htmltext(
'<div class="infonotice">%s %s</div>') % (_('Common varnames:'), common_varnames)))
if 'attach_to_history' in parameters:
form.add(CheckboxWidget, '%sattach_to_history' % prefix,
title=_('Include new form in the form history'),
value=self.attach_to_history)
def get_mappings_parameter_view_value(self):
to_id_fields = {str(field.id): field for field in self.formdef.get_widget_fields()}
result = []
for mapping in self.mappings or []:
try:
dest_field = to_id_fields[str(mapping.field_id)]
result.append(htmltext('<li>%s%s</li>') % (dest_field.label, mapping.expression))
except KeyError:
result.append(htmltext('<li>#%s%s</li>') % (mapping.field_id, mapping.expression))
return htmltext('<ul class="mappings">%s</ul>') % htmltext('').join(result)
def _common_varnames(self):
'''Compute common varnames between the targeted formdef and all formdefs related to the parent workflow.'''
assert self.formdef
varnames = set(field.varname
for formdef in self.parent.parent.formdefs()
for field in formdef.get_widget_fields() if field.varname)
return sorted(varnames & set(field.varname for field in self.formdef.get_widget_fields() if field.varname))
def get_parameters(self):
return ('draft', 'formdef_slug', 'map_fields_by_varname', 'mappings', 'backoffice_submission',
'keep_user', 'keep_submission_context', 'varname', 'attach_to_history', 'condition')
def get_line_details(self):
if not self.formdef or not (self.mappings or self.map_fields_by_varname):
return _('not configured')
return self.formdef.name
def perform(self, formdata):
formdef = self.formdef
if not formdef or not (self.mappings or self.map_fields_by_varname):
return
new_formdata = formdef.data_class()()
new_formdata.receipt_time = time.localtime()
if self.keep_user:
new_formdata.user_id = formdata.user_id
if self.keep_submission_context and self.keep_user:
new_formdata.submission_context = formdata.submission_context or {}
new_formdata.submission_channel = formdata.submission_channel
new_formdata.submission_agent_id = formdata.submission_agent_id
else:
new_formdata.submission_context = {}
new_formdata.backoffice_submission = self.backoffice_submission
if self.backoffice_submission and get_request() and get_request().user is not None:
new_formdata.submission_agent_id = str(get_request().user.id)
new_formdata.submission_context['orig_object_type'] = formdata.formdef.xml_root_node
new_formdata.submission_context['orig_formdef_id'] = str(formdata.formdef.id)
new_formdata.submission_context['orig_formdata_id'] = str(formdata.id)
new_formdata.data = {}
self.apply_mappings(dest=new_formdata, src=formdata)
if self.draft:
new_formdata.status = 'draft'
new_formdata.store()
else:
# freeze substitutions during submission, as it has side effects
with get_publisher().substitutions.freeze():
new_formdata.just_created()
new_formdata.store()
new_formdata.perform_workflow()
new_formdata.store()
if new_formdata.user_id is None and not new_formdata.backoffice_submission:
get_session().mark_anonymous_formdata(new_formdata)
evo = formdata.evolution[-1]
evo.add_part(
self.evolution_part_class(
new_formdata,
varname=self.varname,
attach_to_history=self.attach_to_history))
formdata.store()
def apply_mappings(self, dest, src):
if self.map_fields_by_varname:
fields_by_varname = {field.varname: field for field in self.formdef.get_widget_fields() if field.varname}
for field in src.formdef.get_widget_fields():
dest_field = fields_by_varname.get(field.varname)
if dest_field is None:
continue
try:
self._set_value(
formdata=dest,
field=dest_field,
value=src.data.get(field.id))
except Exception as e:
LoggedError.record(_('Could not copy field by varname for "%s"') % field.varname,
formdata=src, status_item=self, exception=e)
# field.id can be serialized to xml, so we must always convert them to
# str when matching
to_id_fields = {str(field.id): field for field in self.formdef.get_widget_fields()}
missing_fields = []
for mapping in self.mappings or []:
try:
dest_field = to_id_fields[str(mapping.field_id)]
except KeyError:
missing_fields.append(mapping.field_id)
continue
try:
value = self.compute(mapping.expression, formdata=src, raises=True, status_item=self)
except Exception:
# already logged by self.compute
continue
try:
self._set_value(
formdata=dest,
field=dest_field,
value=value)
except Exception as e:
expression = self.get_expression(mapping.expression)
LoggedError.record(_('Could not assign value to field "%s"') % dest_field.label,
formdata=src, status_item=self,
expression=expression['value'], expression_type=expression['type'],
exception=e)
if missing_fields:
summary = _('Missing field %r') % missing_fields
LoggedError.record(summary, formdata=src, status_item=self)
def _set_value(self, formdata, field, value):
if field.convert_value_from_anything:
old_value = value # noqa: F841, copy value for debug
value = field.convert_value_from_anything(value)
formdata.data['%s' % field.id] = value
if value is None:
return
if field.store_display_value:
display_value = field.store_display_value(
formdata.data, field.id)
if display_value:
formdata.data['%s_display' % field.id] = display_value
if field.store_structured_value:
structured_value = field.store_structured_value(
formdata.data, field.id)
if structured_value:
if isinstance(structured_value, dict) and structured_value.get('id'):
# in case of list field, override id
formdata.data['%s' % field.id] = str(structured_value.get('id'))
formdata.data['%s_structured' % field.id] = structured_value
def mappings_export_to_xml(self, parent, charset, include_id=False):
container = ET.SubElement(parent, 'mappings')
for mapping in self.mappings or []:
item = ET.SubElement(container, 'mapping')
item.attrib['field_id'] = str(mapping.field_id)
item.text = mapping.expression
def mappings_init_with_xml(self, container, charset, include_id=False):
self.mappings = []
for child in container:
field_id = child.attrib.get('field_id', '')
expression = child.text
if field_id:
self.mappings.append(Mapping(field_id=field_id, expression=expression))
register_item_class(CreateFormdataWorkflowStatusItem)