wcs/wcs/wf/create_formdata.py

596 lines
23 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2016 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import time
import xml.etree.ElementTree as ET
from django.utils.functional import cached_property
from quixote import get_publisher
from quixote import get_request
from quixote import get_session
from quixote.html import htmltext
from wcs.formdef import FormDef
from wcs.qommon import N_
from wcs.qommon import _
from wcs.qommon.form import CheckboxWidget
from wcs.qommon.form import CompositeWidget
from wcs.qommon.form import ComputedExpressionWidget
from wcs.qommon.form import Form
from wcs.qommon.form import HtmlWidget
from wcs.qommon.form import RadiobuttonsWidget
from wcs.qommon.form import SingleSelectWidget
from wcs.qommon.form import VarnameWidget
from wcs.qommon.form import WidgetListAsTable
from wcs.workflows import WorkflowStatusItem
from wcs.workflows import register_item_class
class Mapping:
field_id = None
expression = None
def __init__(self, field_id, expression):
self.field_id = field_id
self.expression = expression
class MappingWidget(CompositeWidget):
def __init__(self, name, value=None, to_formdef=None, **kwargs):
value = value or Mapping(None, '')
self.accept_empty_value = kwargs.pop('accept_empty_value', False)
super().__init__(name, value, **kwargs)
to_fields = self._fields_to_options(to_formdef)
self.add(
SingleSelectWidget, name='field_id', title=_('Field'), value=value.field_id, options=to_fields
)
placeholder = ''
if self.accept_empty_value:
placeholder = _('Leaving the field blank will empty the value.')
self.add(
ComputedExpressionWidget,
name='expression',
title=_('Expression'),
value=value.expression,
value_placeholder=placeholder,
)
def _fields_to_options(self, formdef):
return [(None, '---', '')] + [
(field.id, field.label, str(field.id)) for field in formdef.get_widget_fields()
]
def _parse(self, request):
super()._parse(request)
if self.get('field_id') is not None and self.accept_empty_value or self.get('expression') is not None:
self.value = Mapping(field_id=self.get('field_id'), expression=self.get('expression'))
else:
self.value = None
class MappingsWidget(WidgetListAsTable):
readonly = False
# widget_list.js does not work with ComputedExpressionWidget,
# so we revert to quixote behaviour for adding a line
def add_media(self):
pass
def __init__(self, name, to_formdef=None, **kwargs):
self.to_formdef = to_formdef
value = kwargs.get('value')
if value:
# reorder mappings based on to_formdef fields order
value.sort(key=lambda mapping: self.ranks.get(str(mapping.field_id), 9999))
accept_empty_value = kwargs.pop('accept_empty_value', False)
super().__init__(
name,
element_type=MappingWidget,
element_kwargs={
'to_formdef': to_formdef,
'accept_empty_value': accept_empty_value,
},
**kwargs,
)
@cached_property
def ranks(self):
return {
str(field.id): i for i, field in enumerate(field for field in self.to_formdef.get_widget_fields())
}
def _parse(self, request):
super()._parse(request)
if self.value:
# prevent many mappings to the same field
if len(set(mapping.field_id for mapping in self.value)) != len(self.value):
self.error = _('Some destination fields are duplicated')
return
# reorder mappings based on to_formdef fields order
self.value.sort(key=lambda mapping: self.ranks.get(str(mapping.field_id), 9999))
class LinkedFormdataEvolutionPart:
formdef_class = FormDef
attach_to_history = False
def __init__(self, formdata, varname, attach_to_history):
self._formdef = formdata.formdef
self._formdata = formdata
self.formdef_id = formdata.formdef.id
self.formdata_id = formdata.id
self.varname = varname
self.attach_to_history = attach_to_history
@property
def formdef(self):
if not hasattr(self, '_formdef'):
self._formdef = self.formdef_class.get(self.formdef_id)
return self._formdef
@property
def formdata(self):
if not hasattr(self, '_formdata'):
self._formdata = self.formdef.data_class().get(self.formdata_id, ignore_errors=True)
return self._formdata
def __getstate__(self):
# Forget cached values
return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
def __repr__(self):
return '<Linked %s "%s-%s">' % (self.formdef_class.__name__, self.formdef_id, self.formdata_id)
@classmethod
def get_substitution_variables(cls, formdata):
d = {}
for part in formdata.iter_evolution_parts():
if not isinstance(part, cls):
continue
if part.formdata:
d['form_links_%s' % (part.varname or '*')] = part
return d
def view(self):
if self.attach_to_history:
return htmltext('<p class="wf-links">%s <a href="%s">%s %s</a>') % (
_('Created new form'),
self.formdata
and self.formdata.get_url(
backoffice=bool(get_request() and get_request().is_in_backoffice())
),
self.formdef.name if self.formdef else _('Deleted'),
self.formdata_id,
)
else:
return ''
class LazyFormDataLinks:
def __init__(self, formdata):
self._formdata = formdata
def __getattr__(self, varname):
for part in self._formdata.iter_evolution_parts():
if not isinstance(part, LinkedFormdataEvolutionPart):
continue
if part.varname == varname and part.formdata:
return part.formdata.get_substitution_variables()
raise AttributeError(varname)
def inspect_keys(self):
for part in self._formdata.iter_evolution_parts():
if isinstance(part, LinkedFormdataEvolutionPart) and part.varname and part.formdata:
yield part.varname
class CreateFormdataWorkflowStatusItem(WorkflowStatusItem):
description = N_('New Form Creation')
key = 'create_formdata'
category = 'formdata-action'
support_substitution_variables = True
formdef_class = FormDef
evolution_part_class = LinkedFormdataEvolutionPart
formdef_slug = None
formdef_label = N_('Form')
mappings_label = N_('Mappings to new form fields')
accept_empty_value = False
varname_hint = N_('This is used to get linked forms in expressions.')
user_association_option_label = N_('User to associate to form')
draft = False
backoffice_submission = False
user_association_mode = None
user_association_template = None
keep_submission_context = False
mappings = None
varname = None
map_fields_by_varname = False
attach_to_history = False
def migrate(self):
changed = super().migrate()
if getattr(self, 'keep_user', False) is True:
self.user_association_mode = 'keep-user'
delattr(self, 'keep_user')
changed = True
return changed
def _resolve_formdef_slug(self, formdef_slug):
if formdef_slug:
try:
return self.formdef_class.get_by_urlname(formdef_slug)
except KeyError:
pass
return None
@property
def formdef(self):
return self._resolve_formdef_slug(self.formdef_slug)
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'formdef_slug' in parameters:
list_forms = [(None, '---', '')]
list_forms += [
(x.url_name, x.name, x.url_name)
for x in self.formdef_class.select(order_by='name')
if not x.disabled or x.url_name == self.formdef_slug
]
form.add(
SingleSelectWidget,
'%sformdef_slug' % prefix,
title=_(self.formdef_label),
value=self.formdef_slug,
options=list_forms,
)
if 'draft' in parameters:
form.add(CheckboxWidget, '%sdraft' % prefix, title=_('Create new draft'), value=self.draft)
if 'backoffice_submission' in parameters:
form.add(
CheckboxWidget,
'%sbackoffice_submission' % prefix,
title=_('Backoffice submission'),
value=self.backoffice_submission,
advanced=(
self.backoffice_submission == CreateFormdataWorkflowStatusItem.backoffice_submission
),
)
if 'user_association_mode' in parameters:
form.add(
RadiobuttonsWidget,
'%suser_association_mode' % prefix,
title=_(self.user_association_option_label),
options=[
(None, _('None'), 'none'),
('keep-user', _('Keep Current User'), 'keep-user'),
('custom', _('Custom (user will come from template value)'), 'custom'),
],
value=self.user_association_mode,
attrs={'data-dynamic-display-parent': 'true'},
advanced=bool(self.user_association_mode),
)
if 'user_association_template' in parameters:
form.add(
ComputedExpressionWidget,
'%suser_association_template' % prefix,
title=_('Template for user association (via email or NameID)'),
value=self.user_association_template,
attrs={
'data-dynamic-display-child-of': '%suser_association_mode' % prefix,
'data-dynamic-display-value': 'custom',
},
advanced=bool(self.user_association_mode),
)
if 'keep_submission_context' in parameters:
form.add(
CheckboxWidget,
'%skeep_submission_context' % prefix,
title=_('Keep submission context'),
value=self.keep_submission_context,
advanced=(
self.keep_submission_context == CreateFormdataWorkflowStatusItem.keep_submission_context
),
)
formdef_slug = form.get('%sformdef_slug' % prefix)
formdef = self._resolve_formdef_slug(formdef_slug)
if 'mappings' in parameters and formdef:
widget = form.add(
MappingsWidget,
'%smappings' % prefix,
title=_(self.mappings_label),
accept_empty_value=self.accept_empty_value,
to_formdef=formdef,
value=self.mappings,
)
if form.is_submitted() and get_request().form.get('map_fields_by_varname') != 'yes':
# do not validate form if formdef is changed and there is no mappings
if formdef_slug != self.formdef_slug and not widget.parse():
form.get_widget('%smappings' % prefix).set_error(_('Please define new mappings'))
if 'varname' in parameters:
form.add(
VarnameWidget,
'%svarname' % prefix,
title=_('Identifier'),
value=self.varname,
hint=_(self.varname_hint),
advanced=not (bool(self.varname)),
)
if 'map_fields_by_varname' in parameters and formdef:
form.add(
CheckboxWidget,
'%smap_fields_by_varname' % prefix,
title=_('Map fields by varname'),
value=self.map_fields_by_varname,
advanced=(
self.map_fields_by_varname == CreateFormdataWorkflowStatusItem.map_fields_by_varname
),
)
if self.map_fields_by_varname and self.formdef:
common_varnames = [htmltext('<tt>%s</tt>') % varname for varname in self._common_varnames()]
common_varnames = htmltext(', ').join(common_varnames)
form.widgets.append(
HtmlWidget(
htmltext('<div class="infonotice">%s %s</div>')
% (_('Common varnames:'), common_varnames)
)
)
if 'attach_to_history' in parameters:
form.add(
CheckboxWidget,
'%sattach_to_history' % prefix,
title=_('Include new form in the form history'),
value=self.attach_to_history,
)
if kwargs.get('orig') == 'variable_widget':
return
errors = [w.name for w in form.get_all_widgets() if w.has_error()]
if set(errors) == set(['%smappings' % prefix]):
form.ERROR_NOTICE = _('This action is configured in two steps. See below for details.')
else:
form.ERROR_NOTICE = Form.ERROR_NOTICE
def get_mappings_parameter_view_value(self):
to_id_fields = {str(field.id): field for field in self.formdef.get_widget_fields()}
result = []
for mapping in self.mappings or []:
try:
dest_field = to_id_fields[str(mapping.field_id)]
result.append(htmltext('<li>%s%s</li>') % (dest_field.label, mapping.expression))
except KeyError:
result.append(htmltext('<li>#%s%s</li>') % (mapping.field_id, mapping.expression))
return htmltext('<ul class="mappings">%s</ul>') % htmltext('').join(result)
def _common_varnames(self):
'''Compute common varnames between the targeted formdef and all formdefs related to the parent workflow.'''
assert self.formdef
varnames = set(
field.varname
for formdef in self.parent.parent.formdefs()
for field in formdef.get_widget_fields()
if field.varname
)
return sorted(
varnames & set(field.varname for field in self.formdef.get_widget_fields() if field.varname)
)
def get_parameters(self):
return (
'draft',
'formdef_slug',
'map_fields_by_varname',
'mappings',
'backoffice_submission',
'user_association_mode',
'user_association_template',
'keep_submission_context',
'varname',
'attach_to_history',
'condition',
)
def get_line_details(self):
if not self.formdef or not (self.mappings or self.map_fields_by_varname):
return _('not configured')
return self.formdef.name
def perform(self, formdata):
formdef = self.formdef
if not formdef or not (self.mappings or self.map_fields_by_varname):
return
new_formdata = formdef.data_class()()
new_formdata.receipt_time = time.localtime()
if self.user_association_mode == 'keep-user':
new_formdata.user_id = formdata.user_id
elif self.user_association_mode == 'custom' and self.user_association_template:
with get_publisher().complex_data():
try:
value = self.compute(
self.user_association_template,
formdata=formdata,
raises=True,
allow_complex=True,
status_item=self,
)
except Exception:
# already logged by self.compute
value = None
else:
value = get_publisher().get_cached_complex_data(value)
from wcs.variables import LazyUser
if isinstance(value, LazyUser):
value = value._user
if isinstance(value, get_publisher().user_class):
new_formdata.user = value
else:
new_formdata.user = get_publisher().user_class.lookup_by_string(value)
if self.keep_submission_context:
new_formdata.submission_context = formdata.submission_context or {}
new_formdata.submission_channel = formdata.submission_channel
new_formdata.submission_agent_id = formdata.submission_agent_id
else:
new_formdata.submission_context = {}
new_formdata.backoffice_submission = self.backoffice_submission
if self.backoffice_submission and get_request() and get_request().user is not None:
new_formdata.submission_agent_id = str(get_request().user.id)
new_formdata.submission_context['orig_object_type'] = formdata.formdef.xml_root_node
new_formdata.submission_context['orig_formdef_id'] = str(formdata.formdef.id)
new_formdata.submission_context['orig_formdata_id'] = str(formdata.id)
new_formdata.data = {}
self.apply_mappings(dest=new_formdata, src=formdata)
if formdef.enable_tracking_codes:
code = get_publisher().tracking_code_class()
if self.draft:
new_formdata.status = 'draft'
new_formdata.store()
if formdef.enable_tracking_codes:
code.formdata = new_formdata # this will .store() the code
else:
formdata.store()
# freeze substitutions during submission, as it has side effects
with get_publisher().substitutions.freeze():
new_formdata.just_created()
new_formdata.store()
if formdef.enable_tracking_codes:
code.formdata = new_formdata # this will .store() the code
new_formdata.perform_workflow()
new_formdata.store()
# update local object as it may have been modified by new_formdata
# workflow execution.
formdata.refresh_from_storage()
if new_formdata.user_id is None and not new_formdata.backoffice_submission and get_session():
get_session().mark_anonymous_formdata(new_formdata)
evo = formdata.evolution[-1]
evo.add_part(
self.evolution_part_class(
new_formdata, varname=self.varname, attach_to_history=self.attach_to_history
)
)
formdata.store()
def apply_mappings(self, dest, src):
if self.map_fields_by_varname:
fields_by_varname = {
field.varname: field for field in self.formdef.get_widget_fields() if field.varname
}
for field in src.formdef.get_widget_fields():
dest_field = fields_by_varname.get(field.varname)
if dest_field is None:
continue
try:
self._set_value(formdata=dest, field=dest_field, value=src.data.get(field.id))
except Exception as e:
get_publisher().record_error(
_('Could not copy field by varname for "%s"') % field.varname,
formdata=src,
status_item=self,
exception=e,
)
# field.id can be serialized to xml, so we must always convert them to
# str when matching
to_id_fields = {str(field.id): field for field in self.formdef.get_widget_fields()}
missing_fields = []
for mapping in self.mappings or []:
try:
dest_field = to_id_fields[str(mapping.field_id)]
except KeyError:
missing_fields.append(mapping.field_id)
continue
with get_publisher().complex_data():
try:
value = self.compute(
mapping.expression,
formdata=src,
raises=True,
allow_complex=dest_field.allow_complex,
status_item=self,
)
except Exception:
# already logged by self.compute
continue
if dest_field.allow_complex:
value = get_publisher().get_cached_complex_data(value)
try:
self._set_value(formdata=dest, field=dest_field, value=value)
except Exception as e:
expression = self.get_expression(mapping.expression)
get_publisher().record_error(
_('Could not assign value to field "%s"') % dest_field.label,
formdata=src,
status_item=self,
expression=expression['value'],
expression_type=expression['type'],
exception=e,
)
if missing_fields:
summary = _('Missing field %r') % missing_fields
get_publisher().record_error(summary, formdata=src, status_item=self)
def _set_value(self, formdata, field, value):
if field.convert_value_from_anything:
dummy = value # noqa: F841, copy value for debug
value = field.convert_value_from_anything(value)
field.set_value(formdata.data, value)
def mappings_export_to_xml(self, parent, charset, include_id=False):
container = ET.SubElement(parent, 'mappings')
for mapping in self.mappings or []:
item = ET.SubElement(container, 'mapping')
item.attrib['field_id'] = str(mapping.field_id)
item.text = mapping.expression
def mappings_init_with_xml(self, container, charset, include_id=False, snapshot=False):
self.mappings = []
for child in container:
field_id = child.attrib.get('field_id', '')
expression = child.text
if field_id:
self.mappings.append(Mapping(field_id=field_id, expression=expression))
register_item_class(CreateFormdataWorkflowStatusItem)