add new action create-formdata (#33186)

This commit is contained in:
Benjamin Dauvergne 2019-05-18 14:17:33 +02:00
parent 9bbe4b48bb
commit 7f421d1587
9 changed files with 887 additions and 8 deletions

View File

@ -38,13 +38,16 @@ from wcs.categories import Category
from wcs.data_sources import NamedDataSource
from wcs.wscalls import NamedWsCall
from wcs.roles import Role
from wcs.workflows import (Workflow, DisplayMessageWorkflowStatusItem,
WorkflowCriticalityLevel, WorkflowBackofficeFieldsFormDef,
CommentableWorkflowStatusItem)
from wcs.workflows import (
Workflow, DisplayMessageWorkflowStatusItem, WorkflowCriticalityLevel,
WorkflowBackofficeFieldsFormDef, CommentableWorkflowStatusItem,
ChoiceWorkflowStatusItem, JumpOnSubmitWorkflowStatusItem)
from wcs.wf.export_to_model import ExportToModel
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
from wcs.wf.wscall import WebserviceCallStatusItem
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
from wcs.formdef import FormDef
from wcs.carddef import CardDef
from wcs import fields
@ -5571,3 +5574,81 @@ def test_card_workflow_change(pub, studio):
assert resp.location == 'http://example.net/backoffice/cards/1/workflow-status-remapping?new=%s' % '_carddef_default'
resp = resp.follow()
resp = resp.form.submit('submit').follow()
def test_create_formdata(pub):
create_superuser(pub)
create_role()
FormDef.wipe()
target_formdef = FormDef()
target_formdef.name = 'target form'
target_formdef.enable_tracking_codes = True
target_formdef.fields = [
fields.StringField(id='0', label='string', varname='foo_string'),
fields.FileField(id='1', label='file', type='file', varname='foo_file'),
]
target_formdef.store()
Workflow.wipe()
wf = Workflow(name='create-formdata')
st1 = wf.add_status('New')
st2 = wf.add_status('Resubmit')
jump = ChoiceWorkflowStatusItem()
jump.id = '_resubmit'
jump.label = 'Resubmit'
jump.by = ['_submitter']
jump.status = st2.id
jump.parent = st1
st1.items.append(jump)
create_formdata = CreateFormdataWorkflowStatusItem()
create_formdata.id = '_create_formdata'
create_formdata.formdef_slug = target_formdef.url_name
create_formdata.mappings = [
Mapping(field_id='0', expression='=form_var_toto_string'),
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
]
create_formdata.parent = st2
st2.items.append(create_formdata)
redirect = RedirectToUrlWorkflowStatusItem()
redirect.id = '_redirect'
redirect.url = '{{ form_links_resubmitted.form_url }}'
redirect.parent = st2
st2.items.append(redirect)
jump = JumpOnSubmitWorkflowStatusItem()
jump.id = '_jump'
jump.status = st1.id
jump.parent = st2
st2.items.append(jump)
wf.store()
app = login(get_app(pub))
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st2.id))
pq = resp.pyquery.remove_namespaces()
assert pq('option[value="New Form Creation"]').text() == 'New Form Creation'
assert pq('#itemId__create_formdata a')[0].text == 'New Form Creation'
resp = resp.click('Edit', href='items/_create_formdata/', )
resp.form.set('varname', 'resubmitted')
resp = resp.form.submit(name='submit')
resp = resp.follow()
# checks that nothing changed after submit
wf2 = Workflow.select()[0]
item = wf2.get_status('2').items[0]
assert item.varname == 'resubmitted'
assert isinstance(item, CreateFormdataWorkflowStatusItem)
wf.get_status('2').items[0].label = 'really resubmit'
# duplicate
resp = app.get('/backoffice/workflows/%s/status/%s/items/_create_formdata/' % (wf.id, st2.id))
resp.form.set('mappings$element1$field_id', '0')
resp = resp.form.submit(name='submit')
pq = resp.pyquery.remove_namespaces()
assert pq('.error').text() == 'Some destination fields are duplicated'

View File

@ -44,6 +44,7 @@ from wcs.wf.wscall import WebserviceCallStatusItem
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
from wcs.wf.resubmit import ResubmitWorkflowStatusItem
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
from wcs.carddef import CardDef
from wcs.categories import Category
from wcs.formdef import FormDef
@ -5669,3 +5670,132 @@ def test_lazy_eval_with_conditional_workflow_form(pub):
context = pub.substitutions.get_context_variables(mode='lazy')
assert context['form_var_foo_bar'] == 'go'
@pytest.fixture
def create_formdata(pub):
admin = create_user(pub, is_admin=True)
FormDef.wipe()
source_formdef = FormDef()
source_formdef.name = 'source form'
source_formdef.workflow_roles = {'_receiver': 1}
source_formdef.fields = [
fields.StringField(id='0', label='string', varname='toto_string'),
fields.FileField(id='1', label='file', type='file', varname='toto_file'),
]
source_formdef.store()
target_formdef = FormDef()
target_formdef.name = 'target form'
target_formdef.workflow_roles = {'_receiver': 1}
target_formdef.backoffice_submission_roles = admin.roles[:]
target_formdef.fields = [
fields.StringField(id='0', label='string', varname='foo_string'),
fields.FileField(id='1', label='file', type='file', varname='foo_file'),
]
target_formdef.store()
wf = Workflow(name='create-formdata')
st1 = wf.add_status('New')
st2 = wf.add_status('Resubmit')
jump = ChoiceWorkflowStatusItem()
jump.id = '_resubmit'
jump.label = 'Resubmit'
jump.by = ['_receiver']
jump.status = st2.id
jump.parent = st1
st1.items.append(jump)
create_formdata = CreateFormdataWorkflowStatusItem()
create_formdata.id = '_create_formdata'
create_formdata.varname = 'resubmitted'
create_formdata.formdef_slug = target_formdef.url_name
create_formdata.keep_user = True
create_formdata.backoffice_submission = True
create_formdata.mappings = [
Mapping(field_id='0', expression='=form_var_toto_string'),
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
]
create_formdata.parent = st2
st2.items.append(create_formdata)
redirect = RedirectToUrlWorkflowStatusItem()
redirect.id = '_redirect'
redirect.url = '{{ form_links_resubmitted.form_backoffice_url }}'
redirect.parent = st2
st2.items.append(redirect)
jump = JumpOnSubmitWorkflowStatusItem()
jump.id = '_jump'
jump.status = st1.id
jump.parent = st2
st2.items.append(jump)
wf.store()
source_formdef.workflow_id = wf.id
source_formdef.store()
return locals()
def test_backoffice_create_formdata_backoffice_submission(create_formdata):
create_formdata['source_formdef'].data_class().wipe()
create_formdata['target_formdef'].data_class().wipe()
# create submitting user
user = create_formdata['pub'].user_class()
user.name = 'Jean Darmette'
user.email = 'jean.darmette@triffouilis.fr'
user.store()
# create source formdata
formdata = create_formdata['source_formdef'].data_class()()
upload = PicklableUpload('/foo/bar', content_type='text/plain')
upload.receive([b'hello world'])
formdata.data = {
'0': 'coucou',
'1': upload,
}
formdata.user = user
formdata.just_created()
formdata.store()
formdata.perform_workflow()
# agent login and go to backoffice management pages
app = get_app(create_formdata['pub'])
app = login(app)
resp = app.get(create_formdata['source_formdef'].get_url(backoffice=True))
# click on first available formdata
resp = resp.click(href='%s/' % formdata.id)
target_data_class = create_formdata['target_formdef'].data_class()
assert target_data_class.count() == 0
# resubmit it through backoffice submission
resp = resp.form.submit(name='button_resubmit')
assert target_data_class.count() == 1
target_formdata = target_data_class.select()[0]
assert target_formdata.submission_context == {
'agent_id': str(create_formdata['admin'].id),
'orig_formdata_id': '1',
'orig_formdef_id': '1'
}
assert target_formdata.user.id == user.id
assert target_formdata.status == 'draft'
assert resp.location == 'http://example.net/backoffice/management/target-form/%s/' % target_formdata.id
resp = resp.follow()
assert resp.location == 'http://example.net/backoffice/submission/target-form/%s/' % target_formdata.id
resp = resp.follow()
# second redirect with magic-token
resp = resp.follow()
resp = resp.form.submit(name='submit') # -> validation
resp = resp.form.submit(name='submit') # -> submission
target_formdata = target_data_class.get(id=target_formdata.id)
assert target_formdata.user.id == user.id
assert target_formdata.status == 'wf-new'
resp = resp.follow()
pq = resp.pyquery.remove_namespaces()
assert pq('.field-type-string .value').text() == 'coucou'
assert pq('.field-type-file .value').text() == 'bar'

View File

@ -42,6 +42,8 @@ from wcs.wf.attachment import AddAttachmentWorkflowStatusItem
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
from wcs.wf.resubmit import ResubmitWorkflowStatusItem
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
from wcs.categories import Category
from wcs.roles import Role, logged_users_role
from wcs.tracking_code import TrackingCode
@ -7478,3 +7480,170 @@ def test_form_comment_is_hidden_attribute(pub):
resp.forms[0]['f1'] = '2'
resp = resp.forms[0].submit('submit')
assert 'style="display: none"' in comment.search(resp.forms[0].text).group(0)
@pytest.fixture
def create_formdata(pub):
pub = pub
FormDef.wipe()
ds = {
'type': 'formula',
'value': repr([
{'id': '1', 'text': 'un', 'more': 'foo'},
{'id': '2', 'text': 'deux', 'more': 'bar'}]),
}
source_formdef = FormDef()
source_formdef.name = 'source form'
source_formdef.fields = [
fields.StringField(id='0', label='string', varname='toto_string'),
fields.FileField(id='1', label='file', type='file', varname='toto_file'),
fields.ItemField(id='2', label='item', data_source=ds, varname='toto_item'),
]
source_formdef.store()
target_formdef = FormDef()
target_formdef.name = 'target form'
target_formdef.enable_tracking_codes = True
target_formdef.fields = [
fields.StringField(id='0', label='string', varname='foo_string'),
fields.FileField(id='1', label='file', type='file', varname='foo_file'),
fields.ItemField(id='2', label='item', data_source=ds, varname='foo_item'),
]
target_formdef.store()
wf = Workflow(name='create-formdata')
st1 = wf.add_status('New')
st2 = wf.add_status('Resubmit')
jump = ChoiceWorkflowStatusItem()
jump.id = '_resubmit'
jump.label = 'Resubmit'
jump.by = ['_submitter']
jump.status = st2.id
jump.parent = st1
st1.items.append(jump)
create_formdata = CreateFormdataWorkflowStatusItem()
create_formdata.varname = 'resubmitted'
create_formdata.id = '_create_formdata'
create_formdata.formdef_slug = target_formdef.url_name
create_formdata.mappings = [
Mapping(field_id='0', expression='=form_var_toto_string'),
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
Mapping(field_id='2', expression='=form_var_toto_item_raw'),
]
create_formdata.parent = st2
st2.items.append(create_formdata)
redirect = RedirectToUrlWorkflowStatusItem()
redirect.id = '_redirect'
redirect.url = '{{ form_links_resubmitted.form_url }}'
redirect.parent = st2
st2.items.append(redirect)
display = DisplayMessageWorkflowStatusItem()
display.id = '_display'
display.message = '''<div class="linked">{% if form_links_resubmitted %}
<p>Linked status: <span class="status">{{ form_links_resubmitted.form_status }}</span></p>
<p>Target formdata field: <span class="foo_string">{{ form_links_resubmitted.form_var_foo_string }}</span></p>
{% endif %}</div>'''
display.to = []
st2.items.append(display)
wf.store()
source_formdef.workflow_id = wf.id
source_formdef.store()
return locals()
def test_create_formdata_anonymous_draft(create_formdata):
create_formdata['source_formdef'].data_class().wipe()
create_formdata['target_formdef'].data_class().wipe()
app = get_app(create_formdata['pub'])
resp = app.get('/source-form/')
resp.form['f0'] = 'zob'
resp.form['f1$file'] = Upload('test.txt', b'foobar', 'text/plain')
resp.form['f2'] = '2'
resp = resp.form.submit('submit') # -> validation
resp = resp.form.submit('submit') # -> submission
resp = resp.follow()
assert create_formdata['target_formdef'].data_class().count() == 0
resp = resp.form.submit('button_resubmit')
assert create_formdata['target_formdef'].data_class().count() == 1
target_formdata = create_formdata['target_formdef'].data_class().select()[0]
assert target_formdata.data.get('0') == 'zob'
assert target_formdata.data.get('1').get_content() == b'foobar'
assert target_formdata.status == 'draft'
assert target_formdata.submission_context == {
'orig_formdata_id': str(create_formdata['source_formdef'].data_class().select()[0].id),
'orig_formdef_id': str(create_formdata['source_formdef'].id),
}
resp = resp.follow()
resp = resp.follow()
assert 'zob' in resp
resp = resp.forms[1].submit('submit') # -> validation
resp = resp.forms[1].submit('submit') # -> submission
assert create_formdata['target_formdef'].data_class().count() == 1
target_formdata = create_formdata['target_formdef'].data_class().select()[0]
assert target_formdata.data.get('0') == 'zob'
assert target_formdata.data.get('1').get_content() == b'foobar'
assert target_formdata.data.get('1').get_content() == b'foobar'
assert target_formdata.data.get('2') == '2'
assert target_formdata.data.get('2_display') == 'deux'
assert target_formdata.data.get('2_structured') == {'text': 'deux', 'id': '2', 'more': 'bar'}
assert target_formdata.status == 'wf-new'
source_formdata = create_formdata['source_formdef'].data_class().select()[0]
resp = app.get(source_formdata.get_url())
pq = resp.pyquery.remove_namespaces()
assert pq('.linked .status').text() == 'New'
assert pq('.linked .foo_string').text() == 'zob'
def test_create_formdata_anonymous_submitted(create_formdata):
create_formdata['source_formdef'].data_class().wipe()
create_formdata['target_formdef'].data_class().wipe()
# submit directly
create_formdata['wf'].get_status('2').items[0].draft = False
create_formdata['wf'].store()
app = get_app(create_formdata['pub'])
resp = app.get('/source-form/')
resp.form['f0'] = 'zob'
resp.form['f1$file'] = Upload('test.txt', b'foobar', 'text/plain')
resp.form['f2'] = '2'
resp = resp.form.submit('submit') # -> validation
resp = resp.form.submit('submit') # -> submission
resp = resp.follow()
assert create_formdata['target_formdef'].data_class().count() == 0
resp = resp.form.submit('button_resubmit')
assert create_formdata['target_formdef'].data_class().count() == 1
target_formdata = create_formdata['target_formdef'].data_class().select()[0]
assert target_formdata.data.get('0') == 'zob'
assert target_formdata.data.get('1').get_content() == b'foobar'
assert target_formdata.status == 'wf-new'
assert target_formdata.submission_context == {
'orig_formdata_id': str(create_formdata['source_formdef'].data_class().select()[0].id),
'orig_formdef_id': str(create_formdata['source_formdef'].id),
}
resp = resp.follow()
assert 'New' in resp
assert 'zob' in resp
target_formdata = create_formdata['target_formdef'].data_class().select()[0]
assert target_formdata.data.get('1').get_content() == b'foobar'
assert target_formdata.data.get('1').get_content() == b'foobar'
assert target_formdata.status == 'wf-new'
source_formdata = create_formdata['source_formdef'].data_class().select()[0]
resp = app.get(source_formdata.get_url())
pq = resp.pyquery.remove_namespaces()
assert pq('.linked .status').text() == 'New'
assert pq('.linked .foo_string').text() == 'zob'

View File

@ -9,17 +9,21 @@ from django.utils.six import BytesIO
from quixote import cleanup
from wcs import publisher
from wcs.workflows import (Workflow, CommentableWorkflowStatusItem,
WorkflowCriticalityLevel, WorkflowBackofficeFieldsFormDef,
SendmailWorkflowStatusItem, SendSMSWorkflowStatusItem,
WorkflowImportError)
from wcs.formdef import FormDef
from wcs.workflows import (
Workflow, CommentableWorkflowStatusItem, WorkflowCriticalityLevel,
WorkflowBackofficeFieldsFormDef, SendmailWorkflowStatusItem,
SendSMSWorkflowStatusItem, WorkflowImportError, ChoiceWorkflowStatusItem,
JumpOnSubmitWorkflowStatusItem)
from wcs.wf.wscall import WebserviceCallStatusItem
from wcs.wf.dispatch import DispatchWorkflowStatusItem
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
from wcs.wf.profile import UpdateUserProfileStatusItem
from wcs.wf.backoffice_fields import SetBackofficeFieldsWorkflowStatusItem
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
from wcs.roles import Role
from wcs.fields import StringField
from wcs.fields import StringField, FileField
from wcs.qommon.misc import indent_xml as indent
@ -699,3 +703,53 @@ def test_action_condition(pub):
sendmail.condition = {'type': 'python', 'value': 'True'}
wf2 = assert_import_export_works(wf)
def test_create_formdata(pub):
target_formdef = FormDef()
target_formdef.name = 'target form'
target_formdef.fields = [
StringField(id='0', label='string', varname='foo_string'),
FileField(id='1', label='file', type='file', varname='foo_file'),
]
target_formdef.store()
wf = Workflow(name='create-formdata')
st1 = wf.add_status('New')
st2 = wf.add_status('Resubmit')
jump = ChoiceWorkflowStatusItem()
jump.id = '_resubmit'
jump.label = 'Resubmit'
jump.by = ['_submitter']
jump.status = st2.id
jump.parent = st1
st1.items.append(jump)
create_formdata = CreateFormdataWorkflowStatusItem()
create_formdata.id = '_create_formdata'
create_formdata.varname = 'resubmitted'
create_formdata.formdef_slug = target_formdef.url_name
create_formdata.mappings = [
Mapping(field_id='0', expression='=form_var_toto_string'),
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
]
create_formdata.parent = st2
st2.items.append(create_formdata)
redirect = RedirectToUrlWorkflowStatusItem()
redirect.id = '_redirect'
redirect.url = '{{ form_links_resubmitted.form_url }}'
redirect.parent = st2
st2.items.append(redirect)
jump = JumpOnSubmitWorkflowStatusItem()
jump.id = '_jump'
jump.status = st1.id
jump.parent = st2
st2.items.append(jump)
wf.store()
assert_import_export_works(wf, include_id=True)

View File

@ -51,6 +51,8 @@ from wcs.wf.geolocate import GeolocateWorkflowStatusItem
from wcs.wf.backoffice_fields import SetBackofficeFieldsWorkflowStatusItem
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
from wcs.wf.notification import SendNotificationWorkflowStatusItem
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
from utilities import (create_temporary_pub, MockSubstitutionVariables,
clean_temporary_pub)
@ -4355,3 +4357,57 @@ def test_aggregation_email(pub, emails):
assert AggregationEmail.count() == 0
assert 'New arrivals' in emails.emails
assert 'http://example.net/foobar/%s/status (New)' % formdata.id in emails.emails['New arrivals']['payload']
def test_create_formdata(pub):
FormDef.wipe()
LoggedError.wipe()
target_formdef = FormDef()
target_formdef.name = 'target form'
target_formdef.fields = [
StringField(id='0', label='string', varname='foo_string'),
]
target_formdef.store()
wf = Workflow(name='create-formdata')
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
create = CreateFormdataWorkflowStatusItem()
create.label = 'create a new linked form'
create.varname = 'resubmitted'
create.id = '_create'
create.mappings = [
Mapping(field_id='0', expression='=form_var_toto_string'),
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
Mapping(field_id='2', expression='=form_var_toto_item_raw'),
]
create.parent = wf.possible_status[1]
wf.possible_status[1].items.insert(0, create)
wf.store()
source_formdef = FormDef()
source_formdef.name = 'source form'
source_formdef.fields = []
source_formdef.workflow_id = wf.id
source_formdef.store()
formdata = source_formdef.data_class()()
formdata.data = {}
formdata.just_created()
assert target_formdef.data_class().count() == 0
assert LoggedError.count() == 0
# check unconfigure action do nothing
formdata.perform_workflow()
assert target_formdef.data_class().count() == 0
create.formdef_slug = target_formdef.url_name
wf.store()
del source_formdef._workflow
formdata.perform_workflow()
assert target_formdef.data_class().count() == 1
errors = LoggedError.select()
assert len(errors) == 2
assert any('form_var_toto_string' in (error.exception_message or '') for error in errors)
assert any('Missing field' in error.summary for error in errors)

View File

@ -1024,6 +1024,7 @@ div.ComputedExpressionWidget div.content input[type=text],
div.ConditionWidget div.content input[type=text] {
margin: 0;
padding-right: 10ex;
width: calc(100% - 2em - 8px);
}
div.ComputedExpressionWidget div.content span,
@ -1859,3 +1860,21 @@ a.button.button-paragraph:hover p {
table#listing tr.checked td {
background: #ddf;
}
.MappingsWidget table {
width: 100%;
}
.MappingsWidget td:first-child select {
width: 100%;
}
.MappingsWidget th {
text-align: left;
font-weight: normal;
}
.MappingsWidget th:first-child {
width: 40%;
min-width: 10em;
}

View File

@ -415,6 +415,12 @@ class LazyFormData(LazyFormDef):
def evolution(self):
return self._formdef.get_detailed_evolution(self._formdata)
@property
def links(self):
from .wf.create_formdata import LazyFormDataLinks
return LazyFormDataLinks(self._formdata)
def export_to_json(self, include_files=True):
# this gets used to generate an email attachment :/
return self._formdata.export_to_json(include_files=include_files)

363
wcs/wf/create_formdata.py Normal file
View File

@ -0,0 +1,363 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2016 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import time
import xml.etree.ElementTree as ET
from quixote import get_request, get_session, get_publisher
from django.utils.functional import cached_property
from wcs.qommon import _
from wcs.qommon.form import (WidgetListAsTable, CompositeWidget,
SingleSelectWidget, ComputedExpressionWidget,
CheckboxWidget, StringWidget, VarnameWidget)
from wcs.logged_errors import LoggedError
from wcs.workflows import WorkflowStatusItem, register_item_class
from wcs.formdef import FormDef
class Mapping(object):
def __init__(self, field_id, expression):
self.field_id = field_id
self.expression = expression
class MappingWidget(CompositeWidget):
def __init__(self, name, value=None, to_formdef=None, **kwargs):
value = value or Mapping(None, '')
super(MappingWidget, self).__init__(name, value, **kwargs)
to_fields = self._fields_to_options(to_formdef)
self.add(SingleSelectWidget,
name='field_id',
title=_('Field'),
value=value.field_id,
options=to_fields)
self.add(ComputedExpressionWidget,
name='expression',
title=_('Expression'),
value=value.expression)
def _fields_to_options(self, formdef):
return [(None, '---', '')] + [
(field.id, field.label, str(field.id)) for field in formdef.get_widget_fields()]
def _parse(self, request):
super(MappingWidget, self)._parse(request)
if self.get('field_id') is not None and self.get('expression') is not None:
self.value = Mapping(field_id=self.get('field_id'), expression=self.get('expression'))
else:
self.value = None
class MappingsWidget(WidgetListAsTable):
readonly = False
# widget_list.js does not work with ComputedExpressionWidget,
# so we revert to quixote behaviour for adding a line
def add_media(self):
pass
def __init__(self, name, to_formdef=None, **kwargs):
self.to_formdef = to_formdef
value = kwargs.get('value')
if value:
# reorder mappings based on to_formdef fields order
value.sort(key=lambda mapping: self.ranks.get(str(mapping.field_id), 9999))
super(MappingsWidget, self).__init__(
name,
element_type=MappingWidget,
element_kwargs={
'to_formdef': to_formdef,
},
**kwargs)
@cached_property
def ranks(self):
return {str(field.id): i for i, field in enumerate(
field for field in self.to_formdef.get_widget_fields())}
def _parse(self, request):
super(MappingsWidget, self)._parse(request)
if self.value:
# prevent many mappings to the same field
if len(set(mapping.field_id for mapping in self.value)) != len(self.value):
self.error = _('Some destination fields are duplicated')
return
# reorder mappings based on to_formdef fields order
self.value.sort(key=lambda mapping: self.ranks.get(str(mapping.field_id), 9999))
class LinkedFormdataEvolutionPart(object):
formdef_class = FormDef
def __init__(self, formdata, varname=None):
self._formdef = formdata.formdef
self._formdata = formdata
self.formdef_id = formdata.formdef.id
self.formdata_id = formdata.id
self.varname = varname
@property
def formdef(self):
if not hasattr(self, '_formdef'):
self._formdef = self.formdef_class.get(self.formdef_id)
return self._formdef
@property
def formdata(self):
if not hasattr(self, '_formdata'):
self._formdata = self.formdef.data_class().get(self.formdata_id, ignore_errors=True)
return self._formdata
def __getstate__(self):
# Forget cached values
return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
def __repr__(self):
return '<Link "%s-%s">' % (self.formdef_id, self.formdata_id)
@classmethod
def get_substitution_variables(cls, formdata):
d = {}
for part in formdata.iter_evolution_parts():
if not isinstance(part, LinkedFormdataEvolutionPart):
continue
if part.formdata:
d['form_links_%s' % (part.varname or '*')] = part
return d
class LazyFormDataLinks(object):
def __init__(self, formdata):
self._formdata = formdata
def __getattr__(self, varname):
for part in self._formdata.iter_evolution_parts():
if not isinstance(part, LinkedFormdataEvolutionPart):
continue
if part.varname == varname and part.formdata:
return part.formdata.get_substitution_variables()
raise AttributeError(varname)
class CreateFormdataWorkflowStatusItem(WorkflowStatusItem):
description = N_('New Form Creation')
key = 'create_formdata'
category = 'formdata-action'
support_substitution_variables = True
formdef_class = FormDef
evolution_part_class = LinkedFormdataEvolutionPart
formdef_slug = None
draft = True
backoffice_submission = False
keep_user = True
keep_submission_context = False
mappings = None
varname = None
def _resolve_formdef_slug(self, formdef_slug):
if formdef_slug:
try:
return self.formdef_class.get_by_urlname(self.formdef_slug)
except KeyError:
pass
return None
@property
def formdef(self):
return self._resolve_formdef_slug(self.formdef_slug)
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
super(CreateFormdataWorkflowStatusItem, self).add_parameters_widgets(
form, parameters, prefix=prefix, formdef=formdef)
if 'formdef_slug' in parameters:
list_forms = [(None, '---', '')]
list_forms += [(x.url_name, x.name, x.url_name) for x in self.formdef_class.select(order_by='name')]
form.add(SingleSelectWidget, 'formdef_slug',
title=_('Form'),
value=self.formdef_slug,
options=list_forms)
if 'draft' in parameters:
form.add(CheckboxWidget, '%sdraft' % prefix,
title=_('Draft'),
value=self.draft,
advanced=(self.draft == CreateFormdataWorkflowStatusItem.draft))
if 'backoffice_submission' in parameters:
form.add(CheckboxWidget, '%sbackoffice_submission' % prefix,
title=_('Backoffice submission'),
value=self.backoffice_submission,
advanced=(self.backoffice_submission == CreateFormdataWorkflowStatusItem.backoffice_submission))
if 'keep_user' in parameters:
form.add(CheckboxWidget, '%skeep_user' % prefix,
title=_('Keep user'),
value=self.keep_user,
advanced=(self.keep_user == CreateFormdataWorkflowStatusItem.keep_user))
if 'keep_submission_context' in parameters and self.keep_user:
form.add(CheckboxWidget, '%skeep_submission_context' % prefix,
title=_('Keep submission context'),
value=self.keep_submission_context,
advanced=(
self.keep_submission_context == CreateFormdataWorkflowStatusItem.keep_submission_context))
formdef = self._resolve_formdef_slug(form.get('formdef_slug'))
if 'mappings' in parameters and formdef:
form.add(MappingsWidget, '%smappings' % prefix,
title=_('Mappings to new form fields'),
to_formdef=formdef,
value=self.mappings)
if 'varname' in parameters:
form.add(VarnameWidget, '%svarname' % prefix,
title=_('Identifier'), value=self.varname,
hint=_('This is used to get linked forms in expressions.'),
advanced=not(bool(self.varname)))
def submit_admin_form(self, form):
self.mappings = []
super(CreateFormdataWorkflowStatusItem, self).submit_admin_form(form)
def get_parameters(self):
return ('formdef_slug', 'mappings', 'draft', 'backoffice_submission',
'keep_user', 'keep_submission_context', 'varname')
def perform(self, formdata):
formdef = self.formdef
if not formdef:
return
new_formdata = formdef.data_class()()
new_formdata.receipt_time = time.localtime()
if self.keep_user:
new_formdata.user_id = formdata.user_id
if self.keep_submission_context and self.keep_user:
new_formdata.submission_context = formdata.submission_context or {}
new_formdata.submission_channel = formdata.submission_channel
else:
new_formdata.submission_context = {}
new_formdata.backoffice_submission = self.backoffice_submission
if self.backoffice_submission and get_request() and get_request().user is not None:
new_formdata.submission_context['agent_id'] = str(get_request().user.id)
new_formdata.submission_context['orig_formdef_id'] = str(formdata.formdef.id)
new_formdata.submission_context['orig_formdata_id'] = str(formdata.id)
new_formdata.data = {}
self.apply_mappings(dest=new_formdata, src=formdata)
if self.draft:
new_formdata.status = 'draft'
new_formdata.store()
else:
# freeze substitutions during submission, as it has side effects
with get_publisher().substitutions.freeze():
new_formdata.just_created()
new_formdata.store()
new_formdata.perform_workflow()
new_formdata.store()
if new_formdata.user_id is None and not new_formdata.backoffice_submission:
get_session().mark_anonymous_formdata(new_formdata)
evo = formdata.evolution[-1]
evo.add_part(self.evolution_part_class(new_formdata, varname=self.varname))
formdata.store()
def apply_mappings(self, dest, src):
if not self.mappings:
return
# field.id can be serialized to xml, so we must alwat convert them to str when matching
to_id_fields = {str(field.id): field for field in self.formdef.get_widget_fields()}
missing_fields = []
for mapping in self.mappings:
try:
dest_field = to_id_fields[str(mapping.field_id)]
except KeyError:
missing_fields.append(mapping.field_id)
continue
try:
value = self.compute(mapping.expression, formdata=src, raises=True, status_item=self)
except Exception:
# already logged by self.compute
continue
try:
self._set_value(
formdata=dest,
field=dest_field,
value=value)
except Exception as e:
expression = self.get_expression(mapping.expression)
LoggedError.record('Could not assign value to field %s' % dest_field,
formdata=src, status_item=self,
expression=expression['value'], expression_type=expression['type'],
exception=e)
if missing_fields:
summary = _('Missing field %r') % missing_fields
LoggedError.record(summary, formdata=src, status_item=self)
def _set_value(self, formdata, field, value):
if field.convert_value_from_anything:
old_value = value # noqa: F841, copy value for debug
value = field.convert_value_from_anything(value)
formdata.data['%s' % field.id] = value
if field.store_display_value:
display_value = field.store_display_value(
formdata.data, field.id)
if display_value:
formdata.data['%s_display' % field.id] = display_value
if value and field.store_structured_value:
structured_value = field.store_structured_value(
formdata.data, field.id)
if structured_value:
if isinstance(structured_value, dict) and structured_value.get('id'):
# in case of list field, override id
formdata.data['%s' % field.id] = str(structured_value.get('id'))
formdata.data['%s_structured' % field.id] = structured_value
def mappings_export_to_xml(self, parent, charset, include_id=False):
container = ET.SubElement(parent, 'mappings')
for mapping in self.mappings or []:
item = ET.SubElement(container, 'mapping')
item.attrib['field_id'] = str(mapping.field_id)
item.text = mapping.expression
def mappings_init_with_xml(self, container, charset, include_id=False):
self.mappings = []
for child in container:
field_id = child.attrib.get('field_id', '')
expression = child.text
if field_id:
self.mappings.append(Mapping(field_id=field_id, expression=expression))
register_item_class(CreateFormdataWorkflowStatusItem)

View File

@ -2916,5 +2916,6 @@ def load_extra():
from .wf import backoffice_fields
from .wf import redirect_to_url
from .wf import notification
from .wf import create_formdata
from .wf.export_to_model import ExportToModel