wcs/tests/test_workflows.py

3868 lines
135 KiB
Python

import json
import datetime
import os
import pytest
import shutil
import time
import urllib2
import urlparse
import zipfile
import mock
from django.utils.six import StringIO
from quixote import cleanup, get_response
from qommon.errors import ConnectionError
from quixote.http_request import Upload as QuixoteUpload
from wcs.qommon.http_request import HTTPRequest
from qommon.form import *
from wcs.formdef import FormDef
from wcs import sessions
from wcs.fields import (StringField, DateField, MapField, FileField, ItemField,
ItemsField, CommentField)
from wcs.logged_errors import LoggedError
from wcs.roles import Role
from wcs.workflows import (Workflow, WorkflowStatusItem,
SendmailWorkflowStatusItem, SendSMSWorkflowStatusItem,
CommentableWorkflowStatusItem, ChoiceWorkflowStatusItem,
DisplayMessageWorkflowStatusItem,
AbortActionException, WorkflowCriticalityLevel,
AttachmentEvolutionPart, WorkflowBackofficeFieldsFormDef)
from wcs.wf.anonymise import AnonymiseWorkflowStatusItem
from wcs.wf.criticality import ModifyCriticalityWorkflowStatusItem, MODE_INC, MODE_DEC, MODE_SET
from wcs.wf.dispatch import DispatchWorkflowStatusItem
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
from wcs.wf.timeout_jump import TimeoutWorkflowStatusItem
from wcs.wf.profile import UpdateUserProfileStatusItem
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
from wcs.wf.remove import RemoveWorkflowStatusItem
from wcs.wf.roles import AddRoleWorkflowStatusItem, RemoveRoleWorkflowStatusItem
from wcs.wf.wscall import WebserviceCallStatusItem
from wcs.wf.export_to_model import ExportToModel, transform_to_pdf
from wcs.wf.geolocate import GeolocateWorkflowStatusItem
from wcs.wf.backoffice_fields import SetBackofficeFieldsWorkflowStatusItem
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
from utilities import (create_temporary_pub, MockSubstitutionVariables,
clean_temporary_pub)
def setup_module(module):
cleanup()
def teardown_module(module):
clean_temporary_pub()
def pytest_generate_tests(metafunc):
if 'two_pubs' in metafunc.fixturenames:
metafunc.parametrize('two_pubs', ['pickle', 'sql'], indirect=True)
@pytest.fixture
def pub(request):
pub = create_temporary_pub()
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
req.response.filter = {}
req.user = None
pub._set_request(req)
req.session = sessions.BasicSession(id=1)
pub.set_config(req)
return pub
@pytest.fixture
def two_pubs(request):
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
req.response.filter = {}
req.user = None
pub._set_request(req)
req.session = sessions.BasicSession(id=1)
pub.set_config(req)
return pub
def test_get_json_export_dict(pub):
workflow = Workflow(name='wf')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
st2.forced_endpoint = True
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
workflow.roles['_other'] = 'Other Function'
root = workflow.get_json_export_dict()
assert set(root.keys()) >= set(['statuses', 'name', 'functions'])
assert root['name'] == 'wf'
assert len(root['statuses']) == 2
assert set(st['id'] for st in root['statuses']) == set(['st1', 'st2'])
assert all(set(status.keys()) >= set(['id', 'name', 'forced_endpoint']) for status in
root['statuses'])
assert root['statuses'][0]['id'] == 'st1'
assert root['statuses'][0]['name'] == 'Status1'
assert root['statuses'][0]['forced_endpoint'] is False
assert root['statuses'][0]['endpoint'] is False
assert root['statuses'][1]['id'] == 'st2'
assert root['statuses'][1]['name'] == 'Status2'
assert root['statuses'][1]['forced_endpoint'] is True
assert root['statuses'][1]['endpoint'] is True
def test_variable_compute(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': 'hello'}
formdata.store()
pub.substitutions.feed(formdata)
item = JumpWorkflowStatusItem()
# straight string
assert item.compute('blah') == 'blah'
# django template
assert item.compute('{{ form_var_foo }}') == 'hello'
assert item.compute('{{ form_var_foo }}', render=False) == '{{ form_var_foo }}'
assert item.compute('{% if form_var_foo %}its here{% endif %}') == 'its here'
assert item.compute('{% if form_var_foo %}') == '{% if form_var_foo %}'
with pytest.raises(Exception):
item.compute('{% if form_var_foo %}', raises=True)
# ezt string
assert item.compute('[form_var_foo]') == 'hello'
# ezt string, but not ezt asked
assert item.compute('[form_var_foo]', render=False) == '[form_var_foo]'
# ezt string, with an error
assert item.compute('[end]', raises=False) == '[end]'
with pytest.raises(Exception):
item.compute('[end]', raises=True)
# python expression
assert item.compute('=form_var_foo') == 'hello'
# python expression, with an error
assert item.compute('=1/0', raises=False) == '=1/0'
with pytest.raises(Exception):
item.compute('=1/0', raises=True)
# with context
assert item.compute('{{ form_var_foo }} {{ bar }}', context={'bar': 'world'}) == 'hello world'
assert item.compute('[form_var_foo] [bar]', context={'bar': 'world'}) == 'hello world'
assert item.compute('=form_var_foo + " " + bar', context={'bar': 'world'}) == 'hello world'
# django wins
assert item.compute('{{ form_var_foo }} [bar]', context={'bar': 'world'}) == 'hello [bar]'
# django template, no escaping by default
formdata.data = {'1': '<b>hello</b>'}
formdata.store()
assert item.compute('{{ form_var_foo }}') == '<b>hello</b>' # autoescape off by default
assert item.compute('{{ form_var_foo|safe }}') == '<b>hello</b>' # no escaping (implicit |safe)
assert item.compute('{{ form_var_foo|escape }}') == '&lt;b&gt;hello&lt;/b&gt;' #escaping
def test_variable_compute_dates(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': '2017-07-17'}
formdata.store()
pub.substitutions.feed(formdata)
item = JumpWorkflowStatusItem()
assert item.compute('=date(form_var_foo)') == datetime.date(2017, 7, 17)
assert item.compute('=date(form_var_foo) + days(1)') == datetime.date(2017, 7, 18)
assert item.compute('=date(2017, 7, 18)') == datetime.date(2017, 7, 18)
def test_jump_nothing(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.store()
formdata = formdef.data_class()()
item = JumpWorkflowStatusItem()
assert item.must_jump(formdata) is True
def test_jump_datetime_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.store()
formdata = formdef.data_class()()
item = JumpWorkflowStatusItem()
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
item.condition = {'type': 'python', 'value': 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
yesterday.timetuple()[:3]}
assert item.must_jump(formdata) is True
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
item.condition = {'type': 'python', 'value': 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
tomorrow.timetuple()[:3]}
assert item.must_jump(formdata) is False
def test_jump_date_conditions(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [DateField(id='2', label='Date', type='date', varname='date')]
formdef.store()
# create/store/get, to make sure the date format is acceptable
formdata = formdef.data_class()()
formdata.data = {'2': DateField().convert_value_from_str('2015-01-04')}
formdata.store()
formdata = formdef.data_class().get(formdata.id)
pub.substitutions.feed(formdata)
item = JumpWorkflowStatusItem()
item.condition = {
'type': 'python',
'value': 'utils.make_date(form_var_date) == utils.make_date("2015-01-04")'}
assert item.must_jump(formdata) is True
item = JumpWorkflowStatusItem()
item.condition = {
'type': 'python',
'value': 'utils.time_delta(form_var_date, "2015-01-04").days == 0'}
assert item.must_jump(formdata) is True
item = JumpWorkflowStatusItem()
item.condition = {
'type': 'python',
'value': 'utils.time_delta(utils.today(), "2015-01-04").days > 0'}
assert item.must_jump(formdata) is True
item = JumpWorkflowStatusItem()
item.condition = {
'type': 'python',
'value': 'utils.time_delta(datetime.datetime.now(), "2015-01-04").days > 0'}
assert item.must_jump(formdata) is True
item = JumpWorkflowStatusItem()
item.condition = {
'type': 'python',
'value': 'utils.time_delta(utils.time.localtime(), "2015-01-04").days > 0'}
assert item.must_jump(formdata) is True
def test_jump_count_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.store()
pub.substitutions.feed(formdef)
formdef.data_class().wipe()
formdata = formdef.data_class()()
item = JumpWorkflowStatusItem()
item.condition = {'type': 'python', 'value': 'form_objects.count < 2'}
assert item.must_jump(formdata) is True
for i in range(10):
formdata = formdef.data_class()()
formdata.store()
item.condition = {'type': 'python', 'value': 'form_objects.count < 2'}
assert item.must_jump(formdata) is False
def test_jump_bad_python_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.store()
pub.substitutions.feed(formdef)
formdef.data_class().wipe()
formdata = formdef.data_class()()
item = JumpWorkflowStatusItem()
LoggedError.wipe()
item.condition = {'type': 'python', 'value': 'form_var_foobar == 0'}
assert item.must_jump(formdata) is False
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.summary == 'Failed to evaluate condition'
assert logged_error.exception_class == 'NameError'
assert logged_error.exception_message == "name 'form_var_foobar' is not defined"
assert logged_error.expression == 'form_var_foobar == 0'
assert logged_error.expression_type == 'python'
LoggedError.wipe()
item.condition = {'type': 'python', 'value': '~ invalid ~'}
assert item.must_jump(formdata) is False
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.summary == 'Failed to evaluate condition'
assert logged_error.exception_class == 'SyntaxError'
assert logged_error.exception_message == 'unexpected EOF while parsing (<string>, line 1)'
assert logged_error.expression == '~ invalid ~'
assert logged_error.expression_type == 'python'
def test_jump_django_conditions(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': 'hello'}
pub.substitutions.feed(formdata)
item = JumpWorkflowStatusItem()
LoggedError.wipe()
item.condition = {'type': 'django', 'value': '1 < 2'}
assert item.must_jump(formdata) is True
item.condition = {'type': 'django', 'value': 'form_var_foo == "hello"'}
assert item.must_jump(formdata) is True
item.condition = {'type': 'django', 'value': 'form_var_foo|first|upper == "H"'}
assert item.must_jump(formdata) is True
item.condition = {'type': 'django', 'value': 'form_var_foo|first|upper == "X"'}
assert item.must_jump(formdata) is False
assert LoggedError.count() == 0
item.condition = {'type': 'django', 'value': '~ invalid ~'}
assert item.must_jump(formdata) is False
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.summary == 'Failed to evaluate condition'
assert logged_error.exception_class == 'TemplateSyntaxError'
assert logged_error.exception_message == "Could not parse the remainder: '~' from '~'"
assert logged_error.expression == '~ invalid ~'
assert logged_error.expression_type == 'django'
def test_check_auth(pub):
user = pub.user_class(name='foo')
user.store()
role = Role(name='bar1')
role.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
status_item = WorkflowStatusItem()
assert status_item.check_auth(formdata, user) is True
status_item.by = []
assert status_item.check_auth(formdata, user) is False
status_item.by = ['logged-users']
assert status_item.check_auth(formdata, user) is True
status_item.by = [role.id]
assert status_item.check_auth(formdata, user) is False
status_item.by = [int(role.id)]
assert status_item.check_auth(formdata, user) is False
user.roles = [role.id]
status_item.by = [role.id]
assert status_item.check_auth(formdata, user) is True
status_item.by = [int(role.id)]
assert status_item.check_auth(formdata, user) is True
status_item.by = ['_submitter']
assert status_item.check_auth(formdata, user) is False
formdata.user_id = user.id
assert status_item.check_auth(formdata, user) is True
formdata.user_id = None
status_item.by = ['_receiver']
assert status_item.check_auth(formdata, user) is False
formdata.workflow_roles = {'_receiver': user.id}
assert status_item.check_auth(formdata, user) is True
formdef.workflow_roles = {'_receiver': user.id}
formdata.workflow_roles = None
assert status_item.check_auth(formdata, user) is True
def test_dispatch(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
Role.wipe()
role = Role(name='xxx')
role.store()
item = DispatchWorkflowStatusItem()
formdata = formdef.data_class()()
item.perform(formdata)
assert not formdata.workflow_roles
formdata = formdef.data_class()()
item.role_key = '_receiver'
item.role_id = role.id
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': role.id}
def test_dispatch_auto(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
StringField(id='1', label='Test', type='string', varname='foo'),
]
formdef.store()
item = DispatchWorkflowStatusItem()
item.role_key = '_receiver'
item.dispatch_type = 'automatic'
formdata = formdef.data_class()()
pub.substitutions.reset()
pub.substitutions.feed(formdata)
item.perform(formdata)
assert not formdata.workflow_roles
Role.wipe()
role1 = Role('xxx1')
role1.store()
role2 = Role('xxx2')
role2.store()
item.variable = 'form_var_foo'
item.rules = [
{'role_id': role1.id, 'value': 'foo'},
{'role_id': role2.id, 'value': 'bar'},
]
pub.substitutions.reset()
pub.substitutions.feed(formdata)
item.perform(formdata)
assert not formdata.workflow_roles
# no match
formdata.data = {'1': 'XXX'}
pub.substitutions.reset()
pub.substitutions.feed(formdata)
item.perform(formdata)
assert not formdata.workflow_roles
# match
formdata.data = {'1': 'foo'}
pub.substitutions.reset()
pub.substitutions.feed(formdata)
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': role1.id}
# other match
formdata.data = {'1': 'bar'}
pub.substitutions.reset()
pub.substitutions.feed(formdata)
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': role2.id}
def test_dispatch_computed(pub, caplog):
pub.cfg['debug'] = {'logger': True}
pub.write_cfg()
pub.get_app_logger(force=True) # force new logger
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
Role.wipe()
role = Role(name='xxx')
role.slug = 'yyy'
role.store()
item = DispatchWorkflowStatusItem()
formdata = formdef.data_class()()
item.perform(formdata)
assert not formdata.workflow_roles
formdata = formdef.data_class()()
item.role_key = '_receiver'
item.role_id = '="yyy"' # slug
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': role.id}
formdata = formdef.data_class()()
item.role_key = '_receiver'
item.role_id = '="xxx"' # name
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': role.id}
# unknown role
formdata = formdef.data_class()()
item.role_key = '_receiver'
item.role_id = '="foobar"'
item.perform(formdata)
assert not formdata.workflow_roles
assert caplog.records[-1].message == 'error in dispatch, missing role (="foobar")'
def test_roles(pub):
user = pub.user_class()
user.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
formdata.user_id = user.id
item = AddRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
item.role_id = '1'
item.perform(formdata)
assert pub.user_class.get(user.id).roles == ['1']
user.roles = None
user.store()
item = RemoveRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
item.role_id = '1'
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
user.roles = ['1']
user.store()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
user.roles = ['2', '1']
user.store()
item.perform(formdata)
assert pub.user_class.get(user.id).roles == ['2']
def test_add_remove_computed_roles(pub):
user = pub.user_class()
user.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
formdata.user_id = user.id
role = Role(name='plop')
role.store()
role2 = Role(name='xxx')
role2.store()
item = AddRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
item.role_id = role.name
item.perform(formdata)
assert pub.user_class.get(user.id).roles == [role.id]
user.roles = None
user.store()
item = RemoveRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
item.role_id = role.name
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
user.roles = [role.id]
user.store()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
user.roles = [role2.id, role.id]
user.store()
item.perform(formdata)
assert pub.user_class.get(user.id).roles == [role2.id]
def test_roles_idp(pub):
pub.cfg['sp'] = {'idp-manage-user-attributes': True}
pub.cfg['idp'] = {'xxx': {'metadata_url': 'http://idp.example.net/idp/saml2/metadata'}}
pub.write_cfg()
user = pub.user_class()
user.name_identifiers = ['xxx']
user.store()
role = Role(name='bar1')
role.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
formdata.user_id = user.id
item = AddRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
http_post_request.return_value = (None, 201, '', None)
get_response().process_after_jobs()
assert http_post_request.call_count == 0
item.role_id = role.id
item.perform(formdata)
assert pub.user_class.get(user.id).roles == [role.id]
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
http_post_request.return_value = (None, 201, '', None)
get_response().process_after_jobs()
assert http_post_request.call_count == 1
assert http_post_request.call_args[0][0].startswith(
'http://idp.example.net/api/roles/bar1/members/xxx/')
assert 'signature=' in http_post_request.call_args[0][0]
user.roles = None
user.store()
item2 = RemoveRoleWorkflowStatusItem()
item2.perform(formdata)
assert not pub.user_class.get(user.id).roles
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
http_delete_request.return_value = (None, 200, '', None)
get_response().process_after_jobs()
assert http_delete_request.call_count == 0
item2.role_id = role.id
user.roles = [role.id]
user.store()
item2.perform(formdata)
assert not pub.user_class.get(user.id).roles
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
http_delete_request.return_value = (None, 200, '', None)
get_response().process_after_jobs()
assert http_delete_request.call_count == 1
assert http_delete_request.call_args[0][0].startswith(
'http://idp.example.net/api/roles/bar1/members/xxx/')
assert 'signature=' in http_delete_request.call_args[0][0]
# out of http request/response cycle
pub._set_request(None)
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
http_post_request.return_value = (None, 201, '', None)
item.perform(formdata)
assert pub.user_class.get(user.id).roles == [role.id]
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
http_delete_request.return_value = (None, 200, '', None)
item2.perform(formdata)
assert pub.user_class.get(user.id).roles == []
def test_anonymise(two_pubs):
# build a backoffice field
Workflow.wipe()
wf = Workflow(name='wf with backoffice field')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='bo field 1', type='string'),
ItemField(id='bo2', label='list', type='item', items=['bofoo', 'bobar']),
]
wf.add_status('Status1')
wf.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
StringField(id='1', label='field 1', type='string'),
ItemField(id='2', label='list', type='item', items=['abc', 'def']),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.user_id = '1'
formdata.data = {'1': 'foo',
'2': 'abc', '2_display': 'abc',
'bo1': 'bar',
'bo2': 'foo', 'bo2_display': 'foo'}
formdata.workflow_data = {'e': 'mc2'}
formdata.submission_context = {'foo': 'bar'}
formdata.store()
item = AnonymiseWorkflowStatusItem()
item.perform(formdata)
assert formdef.data_class().get(formdata.id).user_id is None
assert formdef.data_class().get(formdata.id).anonymised
assert formdef.data_class().get(formdata.id).submission_context is None
assert formdef.data_class().get(formdata.id).data == {'1': None,
'2': 'abc', '2_display': 'abc',
'bo1': None,
'bo2': 'foo', 'bo2_display': 'foo'}
assert formdef.data_class().get(formdata.id).workflow_data is None
assert formdef.data_class().get(formdata.id).evolution[0].who is None
def test_remove(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
formdata.store()
item = RemoveWorkflowStatusItem()
assert formdef.data_class().has_key(formdata.id)
assert item.perform(formdata) == 'http://example.net'
assert not formdef.data_class().has_key(formdata.id)
formdata = formdef.data_class()()
formdata.store()
item = RemoveWorkflowStatusItem()
req = pub.get_request()
req.response.filter['in_backoffice'] = True
assert formdef.data_class().has_key(formdata.id)
assert item.perform(formdata) == '..'
assert not formdef.data_class().has_key(formdata.id)
req.response.filter = {}
assert req.session.message
def test_register_comment(pub):
pub.substitutions.feed(MockSubstitutionVariables())
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = RegisterCommenterWorkflowStatusItem()
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == ''
item.comment = 'Hello world'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<p>Hello world</p>'
item.comment = '<div>Hello world</div>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>Hello world</div>'
item.comment = '{{ test }}'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == ''
item.comment = '[test]'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<p>[test]</p>'
item.comment = '{{ bar }}'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>Foobar</div>'
item.comment = '[bar]'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<p>Foobar</p>'
item.comment = '<p>{{ foo }}</p>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 &lt; 3</p>'
item.comment = '<p>{{ foo|safe }}</p>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 < 3</p>'
item.comment = '{{ foo }}'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 &lt; 3</div>'
item.comment = '{{ foo|safe }}'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
item.comment = '[foo]'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 &lt; 3</p>'
item.comment = '<div>{{ foo }}</div>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 &lt; 3</div>'
item.comment = '<div>[foo]</div>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 &lt; 3</div>'
def test_register_comment_django_escaping(pub, emails):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {'1': '<p>hello</p>'}
formdata.store()
pub.substitutions.feed(formdata)
item = RegisterCommenterWorkflowStatusItem()
item.comment = '<div>{{form_var_foo}}</div>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>&lt;p&gt;hello&lt;/p&gt;</div>'
# |safe
item = RegisterCommenterWorkflowStatusItem()
item.comment = '<div>{{form_var_foo|safe}}</div>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div><p>hello</p></div>'
def test_register_comment_attachment(pub):
pub.substitutions.feed(MockSubstitutionVariables())
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = RegisterCommenterWorkflowStatusItem()
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == ''
if os.path.exists(os.path.join(get_publisher().app_dir, 'attachments')):
shutil.rmtree(os.path.join(get_publisher().app_dir, 'attachments'))
formdata.evolution[-1].parts = [AttachmentEvolutionPart('hello.txt',
fp=StringIO('hello world'), varname='testfile')]
formdata.store()
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
assert len(subdir) == 4
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
item.comment = '{{ attachments.testfile.url }}'
pub.substitutions.feed(formdata)
item.perform(formdata)
url1 = formdata.evolution[-1].parts[-1].content
pub.substitutions.feed(formdata)
item.comment = '{{ form_attachments.testfile.url }}'
item.perform(formdata)
url2 = formdata.evolution[-1].parts[-1].content
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
assert len(subdir) == 4
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
assert url1 == url2
# test with a condition
item.comment = '{% if form_attachments.testfile %}file is there{% endif %}'
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].content == '<div>file is there</div>'
item.comment = '{% if form_attachments.nope %}file is there{% endif %}'
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].content == ''
# test with an action condition
item.condition = {'type': 'django', 'value': 'form_attachments.testfile'}
assert item.check_condition(formdata) is True
item.condition = {'type': 'django', 'value': 'form_attachments.missing'}
assert item.check_condition(formdata) is False
pub.substitutions.feed(formdata)
item.comment = '[attachments.testfile.url]'
item.perform(formdata)
url3 = formdata.evolution[-1].parts[-1].content
pub.substitutions.feed(formdata)
item.comment = '[form_attachments.testfile.url]'
item.perform(formdata)
url4 = formdata.evolution[-1].parts[-1].content
assert url3 == url4
def test_email(pub, emails):
pub.substitutions.feed(MockSubstitutionVariables())
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
user = pub.user_class(name='foo')
user.email = 'zorg@localhost'
user.store()
Role.wipe()
role1 = Role(name='foo')
role1.emails = ['foo@localhost']
role1.store()
role2 = Role(name='bar')
role2.emails = ['bar@localhost', 'baz@localhost']
role2.store()
# send using an uncompleted element
item = SendmailWorkflowStatusItem()
item.perform(formdata) # nothing
get_response().process_after_jobs()
assert emails.count() == 0
item.to = [role1.id]
item.perform(formdata) # no subject nor body
get_response().process_after_jobs()
assert emails.count() == 0
item.subject = 'foobar'
item.perform(formdata) # no body
get_response().process_after_jobs()
assert emails.count() == 0
# send for real
item.body = 'baz'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost']
assert 'baz' in emails.get('foobar')['payload']
# template for subject or body (Django)
emails.empty()
item.subject = '{{ bar }}'
item.body = '{{ foo }}'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert '1 < 3' in emails.get('Foobar')['payload']
# template for subject or body (ezt)
emails.empty()
item.subject = '[bar]'
item.body = '[foo]'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert '1 < 3' in emails.get('Foobar')['payload']
# two recipients
emails.empty()
item.subject = 'foobar'
item.to = [role1.id, role2.id]
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['to'] == 'Undisclosed recipients:;'
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost',
'bar@localhost', 'baz@localhost']
# submitter as recipient, no known email address
emails.empty()
item.to = ['_submitter']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 0
# submitter as recipient, known email address
emails.empty()
formdata.user_id = user.id
formdata.store()
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['zorg@localhost']
# computed recipient
emails.empty()
item.to = ['=email']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['sub@localhost']
# computed list of recipients
emails.empty()
item.to = ['=["foo@localhost", "bar@localhost"]']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost', 'bar@localhost']
# multiple recipients in a single computed string
emails.empty()
item.to = ['="foo@localhost, bar@localhost"']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost', 'bar@localhost']
# string as recipient
emails.empty()
item.to = 'xyz@localhost'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
# string as recipient (but correctly set in a list)
emails.empty()
item.to = ['xyz@localhost']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
# multiple recipients in a static string
emails.empty()
item.to = ['foo@localhost, bar@localhost']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost', 'bar@localhost']
# invalid recipient
emails.empty()
item.to = ['=foobar']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 0
# empty recipient
emails.empty()
item.to = ['=None']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 0
# custom from email
emails.empty()
item.to = [role1.id]
item.custom_from = 'foobar@localhost'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar').get('from') == 'foobar@localhost'
# custom from email (computed)
emails.empty()
item.to = [role1.id]
item.custom_from = '="foobar@localhost"'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar').get('from') == 'foobar@localhost'
def test_email_django_escaping(pub, emails):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = SendmailWorkflowStatusItem()
item.to = ['foo@localhost']
item.subject = 'Foobar'
# explicit safe strings
emails.empty()
formdata.data = {'1': '1 < 3'}
item.body = '{{ form_var_foo|safe }}'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')['payload'].strip() == '1 < 3'
# automatic no-escaping (because text/plain)
emails.empty()
formdata.data = {'1': '1 < 3'}
item.body = '{{ form_var_foo }}'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')['payload'].strip() == '1 < 3'
# automatic escaping (because mail body is HTML)
emails.empty()
formdata.data = {'1': '1 < 3'}
item.body = '<p>{{ form_var_foo }}</p>'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert '<p>1 &lt; 3</p>' in emails.get('Foobar')['payload'].strip()
# no automatic escaping for subject (even if mail body is HTML)
emails.empty()
formdata.data = {'1': '1 < 3'}
item.subject = '{{ form_var_foo }}'
item.body = '<p>{{ form_var_foo }}</p>'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('1 < 3')
def test_email_attachments(pub, emails):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
FileField(id='3', label='File', type='file', varname='file'),
]
formdef.store()
upload = PicklableUpload('test.jpeg', 'image/jpeg')
jpg = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg')).read()
upload.receive([jpg])
formdata = formdef.data_class()()
formdata.data = {'3': upload}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
sendmail = SendmailWorkflowStatusItem()
sendmail.subject = 'foobar'
sendmail.body = '<p>force html</p>'
sendmail.to = ['to@example.net']
sendmail.attachments = ['form_var_file_raw']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
# build a backoffice field
Workflow.wipe()
FormDef.wipe()
wf = Workflow(name='email with attachments')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'),
FileField(id='bo2', label='bo field 2', type='file', varname='backoffice_file2'),
]
st1 = wf.add_status('Status1')
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
FileField(id='1', label='File', type='file', varname='frontoffice_file'),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': upload}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
# store file in backoffice field form_fbo1 / form_var_backoffice_file_raw
setbo = SetBackofficeFieldsWorkflowStatusItem()
setbo.parent = st1
setbo.fields = [{'field_id': 'bo1', 'value': '=form_var_frontoffice_file_raw'}]
setbo.perform(formdata)
emails.empty()
sendmail.attachments = ['form_fbo1']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
emails.empty()
sendmail.attachments = ['form_var_backoffice_file1_raw']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
emails.empty()
sendmail.attachments = ['form_var_backoffice_file1_raw', 'form_var_backoffice_file2_raw']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
# backoffice_file2 is unset, no more parts :
assert len(emails.emails['foobar']['msg'].get_payload()) == 2
# set backoffice_file2 and retry
setbo.fields = [{'field_id': 'bo2',
'value': '={"content": "blah", "filename": "hello.txt", '
'"content_type": "text/plain"}'}]
setbo.perform(formdata)
emails.empty()
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
assert emails.emails['foobar']['msg'].get_payload()[2].get_content_type() == 'text/plain'
assert emails.emails['foobar']['msg'].get_payload()[2].get_payload() == 'blah'
assert len(emails.emails['foobar']['msg'].get_payload()) == 3
emails.empty()
sendmail.attachments = [
'utils.attachment("Hello world")',
'utils.attachment(\'{"hello": "world"}\', content_type=\'application/json\')',
]
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload(0).get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload(1).get_content_type() == 'application/octet-stream'
assert emails.emails['foobar']['msg'].get_payload(2).get_content_type() == 'application/json'
payload1 = emails.emails['foobar']['msg'].get_payload(1)
payload2 = emails.emails['foobar']['msg'].get_payload(2)
assert payload1.get_payload(decode=True) == "Hello world"
assert json.loads(payload2.get_payload(decode=True)) == {'hello': 'world'}
def test_webservice_call(http_requests, pub):
pub.substitutions.feed(MockSubstitutionVariables())
wf = Workflow(name='wf1')
st1 = wf.add_status('Status1', 'st1')
sterr = wf.add_status('StatusErr', 'sterr')
wf.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'GET'
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.post = True
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'POST'
payload = json.loads(http_requests.get_last('body'))
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
assert payload['display_id'] == formdata.get_display_id()
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.post_data = {'str': 'abcd', 'one': '=1', 'django': '{{ form_number }}',
'evalme': '=form_number', 'error':'=1=3'}
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'POST'
payload = json.loads(http_requests.get_last('body'))
assert payload == {'one': 1, 'str': 'abcd',
'evalme': formdata.get_display_id(),
'django': formdata.get_display_id()}
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.post = True
item.post_data = {'str': 'abcd', 'one': '=1', 'decimal': '=Decimal(2)',
'evalme': '=form_number', 'error':'=1=3'}
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'POST'
payload = json.loads(http_requests.get_last('body'))
assert payload['extra'] == {'one': 1, 'str': 'abcd',
'decimal': '2', 'evalme': formdata.get_display_id()}
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
assert payload['display_id'] == formdata.get_display_id()
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_response'] == {'foo': 'bar'}
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.request_signature_key = 'xxx'
item.perform(formdata)
assert 'signature=' in http_requests.get_last('url')
assert http_requests.get_last('method') == 'GET'
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.request_signature_key = '{{ doesntexist }}'
item.perform(formdata)
assert not 'signature=' in http_requests.get_last('url')
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.request_signature_key = '{{ empty }}'
item.perform(formdata)
assert not 'signature=' in http_requests.get_last('url')
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.request_signature_key = '[empty]'
item.perform(formdata)
assert not 'signature=' in http_requests.get_last('url')
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.request_signature_key = '{{ bar }}'
item.perform(formdata)
assert 'signature=' in http_requests.get_last('url')
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.request_signature_key = '[bar]'
item.perform(formdata)
assert 'signature=' in http_requests.get_last('url')
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/204'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 204
assert formdata.workflow_data.get('xxx_time')
assert 'xxx_response' not in formdata.workflow_data
assert 'xxx_error_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/404'
item.varname = 'xxx'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 404
assert formdata.workflow_data.get('xxx_time')
assert 'xxx_error_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/404-json'
item.varname = 'xxx'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 404
assert formdata.workflow_data.get('xxx_error_response') == {'err': 1}
assert formdata.workflow_data.get('xxx_time')
assert 'xxx_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/404'
item.action_on_4xx = ':pass'
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/500'
with pytest.raises(AbortActionException):
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/500'
item.action_on_5xx = ':pass'
item.perform(formdata)
item = WebserviceCallStatusItem()
item.parent = st1
assert item.get_jump_label(st1.id) == 'Webservice'
assert item.get_jump_label('sterr') == 'Error calling webservice'
item.label = 'Plop'
assert item.get_jump_label(st1.id) == 'Webservice "Plop"'
assert item.get_jump_label('sterr') == 'Error calling webservice "Plop"'
item.url = 'http://remote.example.net/500'
item.action_on_5xx = 'sterr' # jump to status
formdata.status = 'wf-st1'
formdata.store()
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.status == 'wf-sterr'
item.action_on_5xx = 'stdeleted' # removed status
formdata.status = 'wf-st1'
formdata.store()
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.status == 'wf-st1' # unknown status acts like :stop
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.varname = 'xxx'
item.action_on_bad_data = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 200
assert 'xxx_response' not in formdata.workflow_data
assert 'xxx_error_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/404'
item.record_errors = True
item.action_on_4xx = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].summary == '404 whatever'
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.varname = 'xxx'
item.action_on_bad_data = ':stop'
item.record_errors = True
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].summary == 'ValueError: No JSON object could be decoded\n'
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_time')
assert 'xxx_error_response' not in formdata.workflow_data
formdata.workflow_data = None
# check storing response as attachment
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.varname = 'xxx'
item.response_type = 'attachment'
item.record_errors = True
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_content_type') == 'text/xml'
attachment = formdata.evolution[-1].parts[-1]
assert isinstance(attachment, AttachmentEvolutionPart)
assert attachment.base_filename == 'xxx.xml'
assert attachment.content_type == 'text/xml'
attachment.fp.seek(0)
assert attachment.fp.read(5) == '<?xml'
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/400-json'
item.record_errors = True
item.action_on_4xx = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
rendered = formdata.evolution[-1].parts[-1].view()
assert not rendered # empty if not in backoffice
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': '/backoffice/'})
pub._set_request(req)
rendered = formdata.evolution[-1].parts[-1].view()
assert 'Error during webservice call' in str(rendered)
assert 'Error Code: 1' in str(rendered)
assert 'Error Description: :(' in str(rendered)
item.label = 'do that'
with pytest.raises(AbortActionException):
item.perform(formdata)
rendered = formdata.evolution[-1].parts[-1].view()
assert 'Error during webservice call &quot;do that&quot;' in str(rendered)
item = WebserviceCallStatusItem()
item.method = 'GET'
item.url = 'http://remote.example.net?in_url=1'
item.qs_data = {
'str': 'abcd',
'one': '=1',
'evalme': '=form_number',
'django': '{{ form_number }}',
'ezt': '[form_number]',
'error': '=1=3',
'in_url': '2',
}
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('method') == 'GET'
qs = urlparse.parse_qs(http_requests.get_last('url').split('?')[1])
assert set(qs.keys()) == set(['in_url', 'str', 'one', 'evalme', 'django', 'ezt'])
assert qs['in_url'] == ['1', '2']
assert qs['one'] == ['1']
assert qs['evalme'] == [formdata.get_display_id()]
assert qs['django'] == [formdata.get_display_id()]
assert qs['ezt'] == [formdata.get_display_id()]
assert qs['str'] == ['abcd']
item = WebserviceCallStatusItem()
item.method = 'DELETE'
item.post = False
item.url = 'http://remote.example.net/json'
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('method') == 'DELETE'
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.method = 'PUT'
item.post = False
item.post_data = {'str': 'abcd', 'one': '=1',
'evalme': '=form_number', 'error':'=1=3'}
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'PUT'
payload = json.loads(http_requests.get_last('body'))
assert payload == {'one': 1, 'str': 'abcd', 'evalme': formdata.get_display_id()}
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.method = 'PATCH'
item.post = False
item.post_data = {'str': 'abcd', 'one': '=1',
'evalme': '=form_number', 'error':'=1=3'}
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'PATCH'
payload = json.loads(http_requests.get_last('body'))
assert payload == {'one': 1, 'str': 'abcd', 'evalme': formdata.get_display_id()}
def test_webservice_waitpoint(pub):
item = WebserviceCallStatusItem()
assert item.waitpoint
item.action_on_app_error = ':pass'
item.action_on_4xx = ':pass'
item.action_on_5xx = ':pass'
item.action_on_bad_data = ':pass'
item.action_on_network_errors = ':pass'
assert not item.waitpoint
item.action_on_network_errors = ':stop'
assert item.waitpoint
def test_webservice_call_error_handling(http_requests, pub):
pub.substitutions.feed(MockSubstitutionVariables())
FormDef.wipe()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-err1'
item.action_on_app_error = ':stop'
item.action_on_4xx = ':pass'
item.action_on_5xx = ':pass'
item.action_on_network_errors = ':pass'
with pytest.raises(AbortActionException):
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheader1'
item.action_on_app_error = ':stop'
item.action_on_4xx = ':pass'
item.action_on_5xx = ':pass'
item.action_on_network_errors = ':pass'
with pytest.raises(AbortActionException):
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheaderstr'
item.action_on_app_error = ':stop'
item.action_on_4xx = ':pass'
item.action_on_5xx = ':pass'
item.action_on_network_errors = ':pass'
with pytest.raises(AbortActionException):
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-err0'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 0
assert formdata.workflow_data['xxx_response'] == {'data': 'foo', 'err': 0}
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-err0'
item.varname = 'xxx'
item.action_on_app_error = ':stop'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 0
assert formdata.workflow_data['xxx_response'] == {'data': 'foo', 'err': 0}
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-err1'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 1
assert 'xxx_response' not in formdata.workflow_data
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errstr'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 'bug'
assert 'xxx_response' not in formdata.workflow_data
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-err1'
item.varname = 'xxx'
item.action_on_app_error = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 1
assert formdata.workflow_data['xxx_error_response'] == {'data': '', 'err': 1}
assert 'xxx_response' not in formdata.workflow_data
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheader0'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 0
assert formdata.workflow_data['xxx_app_error_header'] == '0'
assert formdata.workflow_data['xxx_response'] == {'foo': 'bar'}
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheader1'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 1
assert formdata.workflow_data['xxx_app_error_header'] == '1'
assert formdata.workflow_data['xxx_error_response'] == {'foo': 'bar'}
assert 'xxx_response' not in formdata.workflow_data
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheaderstr'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 'bug'
assert formdata.workflow_data['xxx_app_error_header'] == 'bug'
assert formdata.workflow_data['xxx_error_response'] == {'foo': 'bar'}
assert 'xxx_response' not in formdata.workflow_data
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheader1'
item.varname = 'xxx'
item.action_on_app_error = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 1
assert formdata.workflow_data['xxx_app_error_header'] == '1'
assert formdata.workflow_data['xxx_error_response'] == {'foo': 'bar'}
assert 'xxx_response' not in formdata.workflow_data
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml-errheader'
item.varname = 'xxx'
item.response_type = 'attachment'
item.record_errors = True
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_app_error_code') == 1
assert formdata.workflow_data.get('xxx_app_error_header') == '1'
assert 'xxx_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml-errheader'
item.varname = 'xxx'
item.response_type = 'attachment'
item.record_errors = True
item.action_on_app_error = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_app_error_code') == 1
assert formdata.workflow_data.get('xxx_app_error_header') == '1'
assert 'xxx_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml-errheader'
item.varname = 'xxx'
item.response_type = 'json' # wait for json but receive xml
item.record_errors = True
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_app_error_code') == 1
assert formdata.workflow_data.get('xxx_app_error_header') == '1'
assert 'xxx_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-err1'
item.action_on_app_error = ':stop'
item.response_type = 'attachment' # err value is not an error
item.perform(formdata) # so, everything is "ok" here
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheaderstr'
item.action_on_app_error = ':stop'
item.response_type = 'attachment'
with pytest.raises(AbortActionException):
item.perform(formdata)
formdata.workflow_data = None
# xml instead of json is not a app_error
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.varname = 'xxx'
item.action_on_app_error = ':stop'
item.action_on_4xx = ':pass'
item.action_on_5xx = ':pass'
item.action_on_network_errors = ':pass'
item.action_on_bad_data = ':pass'
item.perform(formdata)
formdata.workflow_data = None
# connection error
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/connection-error'
item.record_errors = True
item.action_on_network_errors = ':pass'
item.perform(formdata)
assert not formdata.workflow_data
# connection error, with varname
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/connection-error'
item.varname = 'plop'
item.record_errors = True
item.action_on_network_errors = ':pass'
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].summary == 'ConnectionError: error\n'
assert formdata.workflow_data['plop_connection_error'] == 'error'
def test_webservice_call_store_in_backoffice_filefield(http_requests, pub):
wf = Workflow(name='wscall to backoffice file field')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'),
]
st1 = wf.add_status('Status1')
wf.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {}
formdata.store()
# check storing response in backoffice file field
item = WebserviceCallStatusItem()
item.parent = st1
item.backoffice_filefield_id = 'bo1'
item.url = 'http://remote.example.net/xml'
item.response_type = 'attachment'
item.record_errors = True
item.perform(formdata)
assert 'bo1' in formdata.data
fbo1 = formdata.data['bo1']
assert fbo1.base_filename == 'file-bo1.xml'
assert fbo1.content_type == 'text/xml'
assert fbo1.get_content().startswith('<?xml')
# nothing else is stored
assert formdata.workflow_data is None
assert not formdata.evolution[-1].parts
# store in backoffice file field + varname
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
item.varname = 'xxx'
item.perform(formdata)
# backoffice file field
assert 'bo1' in formdata.data
fbo1 = formdata.data['bo1']
assert fbo1.base_filename == 'xxx.xml'
assert fbo1.content_type == 'text/xml'
assert fbo1.get_content().startswith('<?xml')
# varname => workflow_data and AttachmentEvolutionPart
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_content_type') == 'text/xml'
attachment = formdata.evolution[-1].parts[-1]
assert isinstance(attachment, AttachmentEvolutionPart)
assert attachment.base_filename == 'xxx.xml'
assert attachment.content_type == 'text/xml'
attachment.fp.seek(0)
assert attachment.fp.read(5) == '<?xml'
# no more 'bo1' backoffice field: do nothing
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
wf.backoffice_fields_formdef.fields = [
FileField(id='bo2', label='bo field 2', type='file'), # id != 'bo1'
]
item.perform(formdata)
assert formdata.data == {}
# backoffice field is not a field file:
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='bo field 1', type='string'),
]
item.perform(formdata)
assert formdata.data == {}
# no field at all:
wf.backoffice_fields_formdef.fields = []
item.perform(formdata)
assert formdata.data == {}
def test_webservice_target_status(pub):
wf = Workflow(name='boo')
status1 = wf.add_status('Status1', 'st1')
status2 = wf.add_status('Status2', 'st2')
wf.store()
item = WebserviceCallStatusItem()
item.parent = status1
assert item.get_target_status() == [status1.id]
item.action_on_app_error = status1.id
item.action_on_4xx = status2.id
item.action_on_5xx = status2.id
targets = item.get_target_status()
assert len(item.get_target_status()) == 4
assert targets.count(status1) == 2
assert targets.count(status2) == 2
item.action_on_bad_data = 'st3' # doesn't exist
targets = item.get_target_status()
assert len(item.get_target_status()) == 4
assert targets.count(status1) == 2
assert targets.count(status2) == 2
def test_timeout(two_pubs):
workflow = Workflow(name='timeout')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = workflow.id
assert formdef.get_workflow().id == workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdata_id = formdata.id
time.sleep(0.3)
_apply_timeouts(two_pubs)
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
# check there's no crash on workflow without jumps
formdef = FormDef()
formdef.name = 'xxx'
formdef.store()
_apply_timeouts(two_pubs)
def test_legacy_timeout(pub):
workflow = Workflow(name='timeout')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
jump = TimeoutWorkflowStatusItem()
jump.id = '_jump'
jump.timeout = 0.1
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = workflow.id
assert formdef.get_workflow().id == workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdata_id = formdata.id
time.sleep(0.3)
_apply_timeouts(pub)
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
def test_timeout_then_remove(two_pubs):
workflow = Workflow(name='timeout-then-remove')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
remove = RemoveWorkflowStatusItem()
st2.items.append(remove)
remove.parent = st2
workflow.store()
formdef = FormDef()
formdef.name = 'baz%s' % id(two_pubs)
formdef.fields = []
formdef.workflow_id = workflow.id
assert formdef.get_workflow().id == workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdata_id = formdata.id
assert str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
time.sleep(0.2)
_apply_timeouts(two_pubs)
assert not str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
def test_sms(pub, sms_mocking):
pub.cfg['sms'] = {'mode': 'xxx'}
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = SendSMSWorkflowStatusItem()
item.to = ['000']
item.body = 'XXX'
assert len(sms_mocking.sms) == 0
item.perform(formdata)
assert len(sms_mocking.sms) == 1
assert sms_mocking.sms[0]['destinations'] == ['000']
assert sms_mocking.sms[0]['text'] == 'XXX'
# check None as recipient is not passed to the SMS backend
item.to = [None]
item.perform(formdata) # nothing
assert len(sms_mocking.sms) == 1
item.to = ['000', None]
item.perform(formdata)
assert len(sms_mocking.sms) == 2
assert sms_mocking.sms[1]['destinations'] == ['000']
assert sms_mocking.sms[1]['text'] == 'XXX'
pub.substitutions.feed(MockSubstitutionVariables())
item.body = 'dj{{ bar }}'
item.perform(formdata)
assert len(sms_mocking.sms) == 3
assert sms_mocking.sms[2]['destinations'] == ['000']
assert sms_mocking.sms[2]['text'] == 'djFoobar'
item.body = 'ezt[bar]'
item.perform(formdata)
assert len(sms_mocking.sms) == 4
assert sms_mocking.sms[3]['destinations'] == ['000']
assert sms_mocking.sms[3]['text'] == 'eztFoobar'
# disable SMS system
pub.cfg['sms'] = {'mode': 'none'}
item.to = ['000']
item.body = 'XXX'
item.perform(formdata) # nothing
assert len(sms_mocking.sms) == 4
def test_sms_with_passerelle(pub):
pub.cfg['sms'] = {'mode': 'passerelle',
'passerelle_url': 'http://passerelle.example.com/send?nostop=1',
'sender': 'Passerelle'}
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = SendSMSWorkflowStatusItem()
item.to = ['1234']
item.body = 'my message'
with mock.patch('wcs.wscalls.get_secret_and_orig') as mocked_secret_and_orig:
mocked_secret_and_orig.return_value = ('secret', 'localhost')
with mock.patch('qommon.misc._http_request') as mocked_http_post:
mocked_http_post.return_value = ('response', '200', 'data', 'headers')
item.perform(formdata)
url = mocked_http_post.call_args[0][0]
payload = mocked_http_post.call_args[1]['body']
assert 'http://passerelle.example.com' in url
assert '?nostop=1' in url
assert 'orig=localhost' in url
assert 'signature=' in url
json_payload = json.loads(payload)
assert 'message' in json_payload
assert json_payload['message'] == 'my message'
assert json_payload['to'] == ['1234']
assert json_payload['from'] == 'Passerelle'
def test_display_form(two_pubs):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
display_form = FormWorkflowStatusItem()
display_form.id = '_x'
display_form.varname = 'xxx'
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields.append(StringField(id='1', label='Test',
type='string'))
display_form.formdef.fields.append(DateField(id='2', label='Date',
type='date', varname='date'))
st1.items.append(display_form)
display_form.parent = st1
form = Form(action='#')
display_form.fill_form(form, formdata, None)
assert form.widgets[0].title == 'Test'
assert form.widgets[1].title == 'Date'
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '2015-05-12', 'submit': 'submit'}
display_form.submit_form(form, formdata, None, None)
assert formdata.get_substitution_variables()['xxx_var_date'] == '2015-05-12'
two_pubs.cfg['language'] = {'language': 'fr'}
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
form = Form(action='#')
display_form.fill_form(form, formdata, None)
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '12/05/2015', 'submit': 'submit'}
display_form.submit_form(form, formdata, None, None)
assert formdata.get_substitution_variables()['xxx_var_date'] == '12/05/2015'
assert formdata.get_substitution_variables()['xxx_var_date_raw'] == \
time.strptime('2015-05-12', '%Y-%m-%d')
two_pubs.cfg['language'] = {'language': 'en'}
def test_display_form_and_comment(pub):
role = Role(name='bar1')
role.store()
user = pub.user_class()
user.roles = [role.id]
user.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
display_form = FormWorkflowStatusItem()
display_form.by = [role.id]
display_form.id = '_x'
display_form.varname = 'xxx'
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields.append(CommentField(id='1', label='Test',
type='comment'))
st1.items.append(display_form)
display_form.parent = st1
commentable = CommentableWorkflowStatusItem()
commentable.by = [role.id]
st1.items.append(commentable)
commentable.parent = st1
wf.store()
formdef.workflow = wf
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
assert formdata.get_status().name == 'Status1'
form = formdata.get_workflow_form(user)
assert 'Test' in str(form.widgets[0].render())
assert '<textarea' in str(form.widgets[1].render())
def test_display_form_migration(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
display_form = FormWorkflowStatusItem()
display_form.id = '_x'
display_form.varname = 'xxx'
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields = [
ItemField(id='1', label='Test', type='item')
]
st1.items.append(display_form)
display_form.parent = st1
display_form.formdef.fields[0].show_as_radio = True
wf.store()
wf = Workflow.get(wf.id)
assert wf.possible_status[0].items[0].formdef.fields[0].display_mode == 'radio'
def test_choice_button_no_label(pub):
role = Role(name='bar1')
role.store()
user = pub.user_class()
user.roles = [role.id]
user.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
choice = ChoiceWorkflowStatusItem()
choice.by = [role.id]
choice.id = '_x'
st1.items.append(choice)
choice.parent = st1
choice2 = ChoiceWorkflowStatusItem()
choice2.label = 'TEST'
choice2.by = [role.id]
choice2.id = '_x2'
st1.items.append(choice2)
choice2.parent = st1
wf.store()
formdef.workflow = wf
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
assert formdata.get_status().name == 'Status1'
form = formdata.get_workflow_form(user)
form.render()
assert str(form.render()).count('<button') == 1
assert '>TEST</button>' in str(form.render())
def test_workflow_role_type_migration(pub):
workflow = Workflow(name='role migration')
st1 = workflow.add_status('Status1', 'st1')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = [1, 2]
st1.items.append(jump)
jump.parent = st1
workflow.store()
reloaded_workflow = Workflow.get(workflow.id)
assert reloaded_workflow.possible_status[0].items[0].by == ['1', '2']
def test_workflow_display_message(pub):
pub.substitutions.feed(MockSubstitutionVariables())
workflow = Workflow(name='display message')
st1 = workflow.add_status('Status1', 'st1')
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.id = '1'
display_message = DisplayMessageWorkflowStatusItem()
display_message.parent = st1
display_message.message = 'test'
assert display_message.get_message(formdata) == 'test'
display_message.message = '{{ number }}'
assert display_message.get_message(formdata) == str(formdata.id)
display_message.message = '[number]'
assert display_message.get_message(formdata) == str(formdata.id)
display_message.message = '{{ bar }}'
assert display_message.get_message(formdata) == 'Foobar'
display_message.message = '[bar]'
assert display_message.get_message(formdata) == 'Foobar'
# makes sure the string is correctly escaped for HTML
display_message.message = '{{ foo }}'
assert display_message.get_message(formdata) == '1 &lt; 3'
display_message.message = '[foo]'
assert display_message.get_message(formdata) == '1 &lt; 3'
def test_workflow_display_message_to(pub):
workflow = Workflow(name='display message to')
st1 = workflow.add_status('Status1', 'st1')
role = Role(name='foorole')
role.store()
role2 = Role(name='no-one-role')
role2.store()
user = pub.user_class(name='baruser')
user.roles = []
user.store()
FormDef.wipe()
formdef = FormDef()
formdef.url_name = 'foobar'
formdef._workflow = workflow
formdata = formdef.data_class()()
formdata.status = 'wf-st1'
display_message = DisplayMessageWorkflowStatusItem()
display_message.parent = st1
st1.items.append(display_message)
display_message.message = 'all'
display_message.to = None
assert display_message.get_message(formdata) == 'all'
assert formdata.get_workflow_messages() == ['all']
display_message.message = 'to-role'
display_message.to = [role.id]
assert display_message.get_message(formdata) == ''
assert formdata.get_workflow_messages() == []
pub._request.user = user
display_message.message = 'to-role'
display_message.to = [role.id]
assert display_message.get_message(formdata) == ''
assert formdata.get_workflow_messages() == []
user.roles = [role.id]
assert display_message.get_message(formdata) == 'to-role'
assert formdata.get_workflow_messages() == ['to-role']
user.roles = []
display_message.message = 'to-submitter'
display_message.to = ['_submitter']
assert display_message.get_message(formdata) == ''
assert formdata.get_workflow_messages() == []
formdata.user_id = user.id
assert display_message.get_message(formdata) == 'to-submitter'
assert formdata.get_workflow_messages() == ['to-submitter']
display_message.message = 'to-role-or-submitter'
display_message.to = [role.id, '_submitter']
assert display_message.get_message(formdata) == 'to-role-or-submitter'
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
formdata.user_id = None
assert display_message.get_message(formdata) == ''
assert formdata.get_workflow_messages() == []
user.roles = [role.id]
assert display_message.get_message(formdata) == 'to-role-or-submitter'
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
formdata.user_id = user.id
assert display_message.get_message(formdata) == 'to-role-or-submitter'
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
display_message.to = [role2.id]
assert display_message.get_message(formdata) == ''
assert formdata.get_workflow_messages() == []
display_message.message = 'd1'
display_message2 = DisplayMessageWorkflowStatusItem()
display_message2.parent = st1
st1.items.append(display_message2)
display_message2.message = 'd2'
display_message2.to = [role.id, '_submitter']
assert formdata.get_workflow_messages() == ['d2']
user.roles = [role.id, role2.id]
assert 'd1' in formdata.get_workflow_messages()
assert 'd2' in formdata.get_workflow_messages()
def test_workflow_display_message_line_details(pub):
workflow = Workflow(name='display message to')
st1 = workflow.add_status('Status1', 'st1')
display_message = DisplayMessageWorkflowStatusItem()
display_message.parent = st1
assert display_message.get_line_details() == 'top of page'
display_message.position = 'top'
assert display_message.get_line_details() == 'top of page'
display_message.position = 'bottom'
assert display_message.get_line_details() == 'bottom of page'
display_message.position = 'actions'
assert display_message.get_line_details() == 'with actions'
role = Role(name='foorole')
role.store()
display_message.to = [role.id]
assert display_message.get_line_details() == 'with actions, for foorole'
def test_workflow_roles(pub, emails):
pub.substitutions.feed(MockSubstitutionVariables())
user = pub.user_class(name='foo')
user.email = 'zorg@localhost'
user.store()
Role.wipe()
role1 = Role(name='foo')
role1.emails = ['foo@localhost']
role1.details = 'Hello World'
role1.store()
role2 = Role(name='bar')
role2.emails = ['bar@localhost', 'baz@localhost']
role2.store()
workflow = Workflow(name='wf roles')
st1 = workflow.add_status('Status1', 'st1')
item = SendmailWorkflowStatusItem()
item.to = ['_receiver', '_other']
item.subject = 'Foobar'
item.body = 'Hello'
st1.items.append(item)
item.parent = st1
workflow.roles['_other'] = 'Other Function'
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_roles = {'_receiver': role1.id, '_other': role2.id}
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
emails.empty()
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert set(emails.get('Foobar')['email_rcpt']) == set(
['foo@localhost', 'bar@localhost', 'baz@localhost'])
workflow.roles['_slug-with-dash'] = 'Dashed Function'
workflow.store()
formdef.workflow_roles['_slug-with-dash'] = role1.id
formdef.store()
substvars = formdata.get_substitution_variables()
assert substvars.get('form_role_other_name') == 'bar'
assert substvars.get('form_role_slug_with_dash_name') == 'foo'
assert substvars.get('form_role_slug_with_dash_details') == 'Hello World'
def test_criticality(pub):
FormDef.wipe()
workflow = Workflow(name='criticality')
workflow.criticality_levels = [
WorkflowCriticalityLevel(name='green'),
WorkflowCriticalityLevel(name='yellow'),
WorkflowCriticalityLevel(name='red'),
]
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.workflow_id = workflow.id
formdef.store()
item = ModifyCriticalityWorkflowStatusItem()
formdata = formdef.data_class()()
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'yellow'
formdata = formdef.data_class()()
item.mode = MODE_INC
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'yellow'
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'red'
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'red'
item.mode = MODE_DEC
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'yellow'
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'green'
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'green'
item.mode = MODE_SET
item.absolute_value = 2
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'red'
item.absolute_value = 0
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'green'
def test_geolocate_address(pub):
formdef = FormDef()
formdef.geolocations = {'base': 'bla'}
formdef.name = 'baz'
formdef.fields = [
StringField(id='1', label='String', type='string', varname='string'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': '169 rue du chateau'}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = GeolocateWorkflowStatusItem()
item.method = 'address_string'
item.address_string = '[form_var_string], paris, france'
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200,
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
item.perform(formdata)
assert 'http://nominatim.openstreetmap.org/search' in http_get_page.call_args[0][0]
assert urllib2.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
pub.load_site_options()
pub.site_options.set('options', 'nominatim_key', 'KEY')
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200,
json.dumps([{'lat':'48.8337085', 'lon':'2.3233693'}]), None)
item.perform(formdata)
assert 'http://nominatim.openstreetmap.org/search' in http_get_page.call_args[0][0]
assert urllib2.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
assert 'key=KEY' in http_get_page.call_args[0][0]
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
pub.load_site_options()
pub.site_options.set('options', 'geocoding_service_url', 'http://example.net/')
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200,
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
item.perform(formdata)
assert 'http://example.net/?q=' in http_get_page.call_args[0][0]
pub.site_options.set('options', 'geocoding_service_url', 'http://example.net/?param=value')
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200,
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
item.perform(formdata)
assert 'http://example.net/?param=value&' in http_get_page.call_args[0][0]
# check for invalid ezt
item.address_string = '[if-any], paris, france'
formdata.geolocations = None
item.perform(formdata)
assert formdata.geolocations == {}
# check for None
item.address_string = '=None'
formdata.geolocations = None
item.perform(formdata)
assert formdata.geolocations == {}
# check for nominatim server error
formdata.geolocations = None
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 500,
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
item.perform(formdata)
assert formdata.geolocations == {}
# check for nominatim returning an empty result set
formdata.geolocations = None
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200, json.dumps([]), None)
item.perform(formdata)
assert formdata.geolocations == {}
# check for nominatim bad json
formdata.geolocations = None
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200, 'bad json', None)
item.perform(formdata)
assert formdata.geolocations == {}
# check for nominatim connection error
formdata.geolocations = None
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.side_effect = ConnectionError
item.perform(formdata)
assert formdata.geolocations == {}
def test_geolocate_image(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.geolocations = {'base': 'bla'}
formdef.fields = [
FileField(id='3', label='File', type='file', varname='file'),
]
formdef.store()
upload = PicklableUpload('test.jpeg', 'image/jpeg')
upload.receive([open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg')).read()])
formdata = formdef.data_class()()
formdata.data = {'3': upload}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = GeolocateWorkflowStatusItem()
item.method = 'photo_variable'
item.photo_variable = '=form_var_file_raw'
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == -1
assert int(formdata.geolocations['base']['lon']) == 6
# invalid expression
formdata.geolocations = None
item.photo_variable = '=1/0'
item.perform(formdata)
assert formdata.geolocations == {}
# invalid type
formdata.geolocations = None
item.photo_variable = '="bla"'
item.perform(formdata)
assert formdata.geolocations == {}
# invalid photo
upload = PicklableUpload('test.jpeg', 'image/jpeg')
upload.receive([open(os.path.join(os.path.dirname(__file__), 'template.odt')).read()])
formdata.data = {'3': upload}
formdata.geolocations = None
item.perform(formdata)
assert formdata.geolocations == {}
def test_geolocate_map(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.geolocations = {'base': 'bla'}
formdef.fields = [
MapField(id='2', label='Map', type='map', varname='map'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'2': '48.8337085;2.3233693'}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = GeolocateWorkflowStatusItem()
item.method = 'map_variable'
item.map_variable = '=form_var_map'
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
# invalid data
formdata.geolocations = None
item.map_variable = '=form_var'
item.perform(formdata)
assert formdata.geolocations == {}
def test_geolocate_overwrite(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.geolocations = {'base': 'bla'}
formdef.fields = [
MapField(id='2', label='Map', type='map', varname='map'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'2': '48.8337085;2.3233693'}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = GeolocateWorkflowStatusItem()
item.method = 'map_variable'
item.map_variable = '=form_var_map'
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
formdata.data = {'2': '48.8337085;3.3233693'}
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 3
formdata.data = {'2': '48.8337085;4.3233693'}
item.overwrite = False
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 3
@pytest.mark.skipif(transform_to_pdf is None, reason='libreoffice not found')
def test_transform_to_pdf():
instream = open(os.path.join(os.path.dirname(__file__), 'template.odt'))
outstream = transform_to_pdf(instream)
assert outstream is not False
assert outstream.read(10).startswith('%PDF-')
def test_export_to_model_image(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
FileField(id='3', label='File', type='file', varname='image'),
]
formdef.store()
upload = PicklableUpload('test.jpeg', 'image/jpeg')
image_data = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg')).read()
upload.receive([image_data])
formdata = formdef.data_class()()
formdata.data = {'3': upload}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = ExportToModel()
item.convert_to_pdf = False
item.method = 'non-interactive'
template_filename = os.path.join(os.path.dirname(__file__), 'template-with-image.odt')
template = open(template_filename).read()
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
upload.fp = StringIO()
upload.fp.write(template)
upload.fp.seek(0)
item.model_file = UploadedFile(pub.app_dir, None, upload)
item.attach_to_history = True
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].base_filename == 'template.odt'
zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
# check the image has been replaced by the one from the formdata
assert zinfo.file_size == len(image_data)
# check with missing data or wrong kind of data
for field_value in (None, 'wrong kind'):
formdata = formdef.data_class()()
formdata.data = {'3': field_value}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item.perform(formdata)
zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
# check the original image has been left
assert zinfo.file_size == 580
item.filename = 'formulaire-{{form_number}}.odt'
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].base_filename == 'formulaire-%s-%s.odt' % (formdef.id,
formdata.id)
def test_export_to_model_backoffice_field(pub):
wf = Workflow(name='email with attachments')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'),
]
st1 = wf.add_status('Status1')
wf.store()
formdef = FormDef()
formdef.name = 'foo-export-to-bofile'
formdef.fields = [
StringField(id='1', label='String', type='string', varname='string'),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = ExportToModel()
item.method = 'non-interactive'
item.convert_to_pdf = False
template_filename = os.path.join(os.path.dirname(__file__), 'template.odt')
template = open(template_filename).read()
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
upload.fp = StringIO()
upload.fp.write(template)
upload.fp.seek(0)
item.model_file = UploadedFile(pub.app_dir, None, upload)
item.parent = st1
item.backoffice_filefield_id = 'bo1'
item.perform(formdata)
assert 'bo1' in formdata.data
fbo1 = formdata.data['bo1']
assert fbo1.base_filename == 'template.odt'
assert fbo1.content_type == 'application/octet-stream'
zfile = zipfile.ZipFile(fbo1.get_file())
assert 'foo-export-to-bofile' in zfile.read('content.xml')
# no more 'bo1' backoffice field: do nothing
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
# id is not bo1:
wf.backoffice_fields_formdef.fields = [
FileField(id='bo2', label='bo field 2', type='file'),
]
item.perform(formdata)
assert formdata.data == {}
# field is not a field file:
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='bo field 1', type='string'),
]
item.perform(formdata)
assert formdata.data == {}
# no field at all:
wf.backoffice_fields_formdef.fields = []
item.perform(formdata)
assert formdata.data == {}
def test_export_to_model_django_template(pub):
formdef = FormDef()
formdef.name = 'foo-export-to-template-with-django'
formdef.fields = [
StringField(id='1', label='String', type='string', varname='string'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = ExportToModel()
item.method = 'non-interactive'
item.attach_to_history = True
template_filename = os.path.join(os.path.dirname(__file__), 'template-django.odt')
template = open(template_filename).read()
upload = QuixoteUpload('/foo/template-django.odt', content_type='application/octet-stream')
upload.fp = StringIO()
upload.fp.write(template)
upload.fp.seek(0)
item.model_file = UploadedFile(pub.app_dir, None, upload)
item.convert_to_pdf = False
item.perform(formdata)
new_content = zipfile.ZipFile(open(formdata.evolution[0].parts[0].filename)).read('content.xml')
assert '>foo-export-to-template-with-django<' in new_content
formdef.name = 'Name with a \' simple quote'
formdef.store()
item.perform(formdata)
new_content = zipfile.ZipFile(open(formdata.evolution[0].parts[1].filename)).read('content.xml')
assert '>Name with a \' simple quote<' in new_content
formdef.name = 'A <> name'
formdef.store()
item.perform(formdata)
new_content = zipfile.ZipFile(open(formdata.evolution[0].parts[2].filename)).read('content.xml')
assert '>A &lt;&gt; name<' in new_content
def test_global_timeouts(pub):
FormDef.wipe()
Workflow.wipe()
workflow = Workflow(name='global-timeouts')
workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
workflow.criticality_levels = [
WorkflowCriticalityLevel(name='green'),
WorkflowCriticalityLevel(name='yellow'),
WorkflowCriticalityLevel(name='red'),
]
action = workflow.add_global_action('Timeout Test')
item = action.append_item('modify_criticality')
trigger = action.append_trigger('timeout')
trigger.anchor = 'creation'
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = workflow.id
formdef.store()
formdata1 = formdef.data_class()()
formdata1.just_created()
formdata1.store()
# delay isn't set yet, no crash
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
# delay didn't expire yet, no change
trigger.timeout = '2'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.receipt_time = time.localtime(time.time()-3*86400)
formdata1.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
# make sure it's not triggered a second time
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
# change id so it's triggered again
trigger.id = 'XXX1'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'red'
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'red'
# reset formdata to initial state
formdata1.store()
trigger.anchor = '1st-arrival'
trigger.anchor_status_first = None
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.evolution[-1].time = time.localtime(time.time()-3*86400)
formdata1.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store() # reset
trigger.anchor = 'latest-arrival'
trigger.anchor_status_latest = None
workflow.store()
formdata1.evolution[-1].time = time.localtime()
formdata1.store()
formdata1.jump_status('new')
formdata1.evolution[-1].time = time.localtime(time.time()-7*86400)
formdata1.jump_status('accepted')
formdata1.jump_status('new')
formdata1.evolution[-1].time = time.localtime(time.time()-1*86400)
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
formdata1.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# limit trigger to formdata with "accepted" status
trigger.anchor_status_latest = 'wf-accepted'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.store()
# limit trigger to formdata with "new" status
trigger.anchor_status_latest = 'wf-new'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# use python expression as anchor
# timestamp
trigger.anchor = 'python'
trigger.anchor_expression = repr(time.time())
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
trigger.anchor = 'python'
trigger.anchor_expression = repr(time.time() - 10*86400)
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# datetime object
trigger.anchor = 'python'
trigger.anchor_expression = 'datetime.datetime(%s, %s, %s, %s, %s)' % (
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:5]
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# datetime object
trigger.anchor = 'python'
trigger.anchor_expression = 'datetime.date(%s, %s, %s)' % (
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:3]
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# string object
trigger.anchor = 'python'
trigger.anchor_expression = '"%04d-%02d-%02d"' % (
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:3]
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# invalid variable
trigger.anchor = 'python'
trigger.anchor_expression = 'Ellipsis'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.store()
# invalid expression
trigger.anchor = 'python'
trigger.anchor_expression = 'XXX'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.store()
def test_profile(two_pubs):
User = two_pubs.user_class
user = User()
user.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
StringField(id='1', label='Test', type='string', varname='foo'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.user_id = user.id
formdata.data = {'1': 'bar@localhost'}
item = UpdateUserProfileStatusItem()
item.fields = [{'field_id': '__email', 'value': '=form_var_foo'}]
item.perform(formdata)
assert User.get(user.id).email == 'bar@localhost'
formdata.data = {'1': 'Plop'}
item.fields = [{'field_id': '__name', 'value': '=form_var_foo'}]
item.perform(formdata)
assert User.get(user.id).name == 'Plop'
item.fields = [{'field_id': '__name', 'value': 'dj{{form_var_foo}}'}]
item.perform(formdata)
assert User.get(user.id).name == 'djPlop'
item.fields = [{'field_id': '__name', 'value': 'ezt[form_var_foo]'}]
item.perform(formdata)
assert User.get(user.id).name == 'eztPlop'
from wcs.admin.settings import UserFieldsFormDef
formdef = UserFieldsFormDef(two_pubs)
formdef.fields = [
StringField(id='3', label='test', type='string', varname='plop'),
DateField(id='4', label='Date', type='date', varname='bar'),
]
formdef.store()
item.fields = [{'field_id': 'plop', 'value': '=form_var_foo'}]
item.perform(formdata)
assert User.get(user.id).form_data.get('3') == 'Plop'
assert not User.get(user.id).form_data.get('4')
# check transmission to IdP
get_publisher().cfg['sp'] = {'idp-manage-user-attributes': True}
get_publisher().cfg['idp'] = {'xxx': {'metadata_url': 'http://idp.example.net/idp/saml2/metadata'}}
get_publisher().write_cfg()
user = User.get(user.id)
user.name_identifiers = ['xyz']
user.store()
for date_value in (
'20/03/2018',
'=utils.make_date("20/03/2018")',
'=utils.make_date("20/03/2018").timetuple()'):
# check local value
item.fields = [{'field_id': 'bar', 'value': date_value}]
item.perform(formdata)
assert User.get(user.id).form_data.get('3') == 'Plop'
assert User.get(user.id).form_data.get('4').tm_year == 2018
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
http_patch_request.return_value = (None, 200, '', None)
get_response().process_after_jobs()
assert http_patch_request.call_count == 1
assert http_patch_request.call_args[0][1] == '{"bar": "2018-03-20"}'
for date_value in ('baddate', '', {}, [], None):
# reset date to a known value
user.form_data['4'] = datetime.datetime.now().timetuple()
user.store()
year = User.get(user.id).form_data.get('4').tm_year
# perform action
item.fields = [{'field_id': 'bar', 'value': date_value}]
item.perform(formdata)
if date_value not in (None, ''): # bad value : do nothing
assert User.get(user.id).form_data.get('4').tm_year == year
else: # empty value : empty field
assert User.get(user.id).form_data.get('4') == None
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
http_patch_request.return_value = (None, 200, '', None)
get_response().process_after_jobs()
assert http_patch_request.call_count == 1
if date_value not in (None, ''): # bad value : do nothing
assert http_patch_request.call_args[0][1] == '{}'
else: # empty value : null field
assert http_patch_request.call_args[0][1] == '{"bar": null}'
def test_set_backoffice_field(http_requests, two_pubs):
Workflow.wipe()
FormDef.wipe()
LoggedError.wipe()
wf = Workflow(name='xxx')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='1st backoffice field',
type='string', varname='backoffice_blah'),
]
st1 = wf.add_status('Status1')
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
StringField(id='00', label='String', type='string', varname='string'),
StringField(id='01', label='Other string', type='string', varname='other'),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'00': 'HELLO'}
formdata.just_created()
formdata.store()
two_pubs.substitutions.feed(formdata)
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data.get('bo1') == None
item.fields = [{'field_id': 'bo1', 'value': '=form_var_string'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO'
item.fields = [{'field_id': 'bo1', 'value': '{{ form_var_string }} WORLD'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO WORLD'
item.fields = [{'field_id': 'bo1', 'value': '[form_var_string] GOODBYE'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO GOODBYE'
item.fields = [{'field_id': 'bo1', 'value': '{{ form.var.string }} LAZY'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO LAZY'
item.fields = [{'field_id': 'bo1', 'value': '=form.var.string'}] # lazy python
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO'
item.fields = [{'field_id': 'bo1', 'value': '=vars().get("form_var_string") + " PLOP"'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO PLOP'
item.fields = [{'field_id': 'bo1',
'value': '=vars().get("form_var_other") or vars().get("form_var_string")'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO'
item.fields = [{'field_id': 'bo1', 'value': '=vars().get("form_var_bouh", "X") + " PLOP"'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'X PLOP'
assert LoggedError.count() == 0
item.fields = [{'field_id': 'bo1', 'value': '= ~ invalid python ~'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.summary == 'Failed to compute Python expression'
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.expression == ' ~ invalid python ~'
assert logged_error.expression_type == 'python'
assert logged_error.exception_class == 'SyntaxError'
assert logged_error.exception_message == 'invalid syntax (<string>, line 1)'
LoggedError.wipe()
item.fields = [{'field_id': 'bo1', 'value': '{% if bad django %}'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.summary == 'Failed to compute template'
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.expression == '{% if bad django %}'
assert logged_error.expression_type == 'template'
assert logged_error.exception_class == 'TemplateError'
assert logged_error.exception_message.startswith('syntax error in Django template')
def test_set_backoffice_field_file(http_requests, two_pubs):
Workflow.wipe()
FormDef.wipe()
wf = Workflow(name='xxx')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
FileField(id='bo1', label='1st backoffice field',
type='file', varname='backoffice_file'),
]
st1 = wf.add_status('Status1')
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
FileField(id='00', label='File', type='file', varname='file'),
]
formdef.workflow_id = wf.id
formdef.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': '=locals().get("form_var_file_raw")'}]
# the file does not exist
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == None
# store a PiclableUpload
upload = PicklableUpload('test.jpeg', 'image/jpeg')
upload.receive([open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg')).read()])
formdata = formdef.data_class()()
formdata.data = {'00': upload}
formdata.just_created()
formdata.store()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'].base_filename == 'test.jpeg'
assert formdata.data['bo1'].content_type == 'image/jpeg'
assert formdata.data['bo1'].get_content() == open(os.path.join(os.path.dirname(__file__),
'image-with-gps-data.jpeg')).read()
# same test with PicklableUpload wcs.qommon.form
from wcs.qommon.form import PicklableUpload as PicklableUpload2
upload2 = PicklableUpload2('test2.odt', 'application/vnd.oasis.opendocument.text')
upload2.receive([open(os.path.join(os.path.dirname(__file__), 'template.odt')).read()])
formdata = formdef.data_class()()
formdata.data = {'00': upload2}
formdata.just_created()
formdata.store()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'].base_filename == 'test2.odt'
assert formdata.data['bo1'].content_type == 'application/vnd.oasis.opendocument.text'
assert formdata.data['bo1'].get_content() == open(os.path.join(os.path.dirname(__file__),
'template.odt')).read()
# check storing response as attachment
two_pubs.substitutions.feed(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.varname = 'xxx'
item.response_type = 'attachment'
item.record_errors = True
item.perform(formdata)
attachment = formdata.evolution[-1].parts[-1]
assert isinstance(attachment, AttachmentEvolutionPart)
assert attachment.base_filename == 'xxx.xml'
assert attachment.content_type == 'text/xml'
formdata = formdef.data_class().get(formdata.id)
two_pubs.substitutions.feed(formdata)
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': '=attachments.xxx'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'].base_filename == 'xxx.xml'
# check storing a file from an assembled dictionary
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1',
'value': '={"content": "hello world", "filename": "hello.txt"}'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'].base_filename == 'hello.txt'
assert formdata.data['bo1'].get_content() == 'hello world'
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1',
'value': '={"b64_content": "SEVMTE8gV09STEQ=", "filename": "hello.txt"}'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'].base_filename == 'hello.txt'
assert formdata.data['bo1'].get_content() == 'HELLO WORLD'
hello_world = formdata.data['bo1']
# check wrong value, or None (no file)
for value in ('="HELLO"', '', 'BAD', '={}', '=[]', None):
formdata.data['bo1'] = hello_world
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': value}]
LoggedError.wipe()
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
if value is not None: # wrong value : do nothing
assert formdata.data['bo1'].base_filename == 'hello.txt'
assert formdata.data['bo1'].get_content() == 'HELLO WORLD'
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.summary.startswith('Failed to convert')
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'ValueError'
else: # empty value : remove field
assert formdata.data['bo1'] is None
assert LoggedError.count() == 0
# check wrong field
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo3', 'value': '=form_var_file_raw'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data.get('bo1') is None
assert formdata.data.get('bo3') is None
def test_set_backoffice_field_item(two_pubs):
Workflow.wipe()
FormDef.wipe()
wf = Workflow(name='xxx')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
st1 = wf.add_status('Status1')
wf.backoffice_fields_formdef.fields = [
ItemField(id='bo1', label='1st backoffice field',
type='item', varname='backoffice_item',
items=['a', 'b', 'c']),
]
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': 'a'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'a'
assert formdata.data['bo1_display'] == 'a'
datasource = {'type': 'formula',
'value': repr([('a', 'aa'), ('b', 'bb'), ('c', 'cc')])}
wf.backoffice_fields_formdef.fields = [
ItemField(id='bo1', label='1st backoffice field',
type='item', varname='backoffice_item',
data_source=datasource),
]
wf.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': 'a'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'a'
assert formdata.data['bo1_display'] == 'aa'
datasource = {'type': 'formula',
'value': repr([{'id': 'a', 'text': 'aa', 'more': 'aaa'},
{'id': 'b', 'text': 'bb', 'more': 'bbb'}])}
wf.backoffice_fields_formdef.fields = [
ItemField(id='bo1', label='1st backoffice field',
type='item', varname='backoffice_item',
data_source=datasource),
]
wf.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': 'a'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'a'
assert formdata.data['bo1_display'] == 'aa'
assert formdata.data['bo1_structured'] == {'id': 'a', 'more': 'aaa', 'text': 'aa'}
# check when assigning using the display value
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': 'aa'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'a'
assert formdata.data['bo1_display'] == 'aa'
assert formdata.data['bo1_structured'] == {'id': 'a', 'more': 'aaa', 'text': 'aa'}
# check with unknown value
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': 'foobar'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'foobar'
assert formdata.data.get('bo1_display') is None
assert formdata.data.get('bo1_structured') is None
def test_set_backoffice_field_items(two_pubs):
Workflow.wipe()
FormDef.wipe()
wf = Workflow(name='xxx')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
st1 = wf.add_status('Status1')
wf.backoffice_fields_formdef.fields = [
ItemsField(id='bo1', label='1st backoffice field',
type='items', varname='backoffice_item',
items=['a', 'b', 'c']),
]
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': "=['a', 'b']"}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == ['a', 'b']
assert formdata.data['bo1_display'] == 'a, b'
datasource = {'type': 'formula',
'value': repr([('a', 'aa'), ('b', 'bb'), ('c', 'cc')])}
wf.backoffice_fields_formdef.fields = [
ItemsField(id='bo1', label='1st backoffice field',
type='items', varname='backoffice_item',
data_source=datasource),
]
wf.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': "=['a', 'b']"}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == ['a', 'b']
assert formdata.data['bo1_display'] == 'aa, bb'
datasource = {'type': 'formula',
'value': repr([{'id': 'a', 'text': 'aa', 'more': 'aaa'},
{'id': 'b', 'text': 'bb', 'more': 'bbb'},
{'id': 'c', 'text': 'cc', 'more': 'ccc'}])}
wf.backoffice_fields_formdef.fields = [
ItemsField(id='bo1', label='1st backoffice field',
type='items', varname='backoffice_item',
data_source=datasource),
]
wf.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': "=['a', 'c']"}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == ['a', 'c']
assert formdata.data['bo1_display'] == 'aa, cc'
assert len(formdata.data['bo1_structured']) == 2
assert {'id': 'a', 'more': 'aaa', 'text': 'aa'} in formdata.data['bo1_structured']
assert {'id': 'c', 'more': 'ccc', 'text': 'cc'} in formdata.data['bo1_structured']
def test_set_backoffice_field_date(two_pubs):
Workflow.wipe()
FormDef.wipe()
LoggedError.wipe()
wf = Workflow(name='xxx')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
st1 = wf.add_status('Status1')
wf.backoffice_fields_formdef.fields = [
DateField(id='bo1', label='1st backoffice field',
type='date', varname='backoffice_date'),
]
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': "=utils.today()"}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date.today()
formdata.data['bo1'] = None
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': '{% now "j/n/Y" %}'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date.today()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': "23/3/2017"}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
# invalid values => do nothing
assert LoggedError.count() == 0
for value in ('plop', {}, []):
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': value}]
LoggedError.wipe()
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
assert LoggedError.count() == 1
assert LoggedError.select()[0].summary.startswith('Failed to convert')
# None : empty date
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': None}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] is None
def test_set_backoffice_field_immediate_use(http_requests, two_pubs):
Workflow.wipe()
FormDef.wipe()
wf = Workflow(name='xxx')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='1st backoffice field',
type='string', varname='backoffice_blah'),
StringField(id='bo2', label='2nd backoffice field',
type='string', varname='backoffice_barr'),
]
st1 = wf.add_status('Status1')
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
StringField(id='00', label='String', type='string', varname='string'),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'00': 'HELLO'}
formdata.just_created()
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [
{'field_id': 'bo1', 'value': '{{form_var_string}}'},
]
two_pubs.substitutions.reset()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data.get('bo1') == 'HELLO'
item.fields = [
{'field_id': 'bo1', 'value': 'WORLD'},
]
two_pubs.substitutions.reset()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data.get('bo1') == 'WORLD'
item.fields = [
{'field_id': 'bo1', 'value': 'X{{form_var_string}}X'},
{'field_id': 'bo2', 'value': "Y{{form_var_backoffice_blah}}Y"},
]
two_pubs.substitutions.reset()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data.get('bo1') == 'XHELLOX'
assert formdata.data.get('bo2') == 'YXHELLOXY'
def test_redirect_to_url(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
StringField(id='1', label='Test', type='string', varname='foo'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': 'bar'}
item = RedirectToUrlWorkflowStatusItem()
assert item.render_as_line() == 'Web Redirection (not configured)'
item.url = 'https://www.example.net/?foo=[form_var_foo]'
assert item.render_as_line() == 'Web Redirection (to https://www.example.net/?foo=[form_var_foo])'
pub.substitutions.feed(formdata)
assert item.perform(formdata) == 'https://www.example.net/?foo=bar'
item.url = 'https://www.example.net/?django={{ form_var_foo }}'
assert item.render_as_line() == 'Web Redirection (to https://www.example.net/?django={{ form_var_foo }})'
pub.substitutions.feed(formdata)
assert item.perform(formdata) == 'https://www.example.net/?django=bar'
item.url = '[if-any nada]https://www.example.net/[end]'
pub.substitutions.feed(formdata)
assert item.perform(formdata) == None
item.url = '{% if nada %}https://www.example.net/{% endif %}'
pub.substitutions.feed(formdata)
assert item.perform(formdata) == None
def test_workflow_jump_condition_migration(pub):
workflow = Workflow(name='jump condition migration')
st1 = workflow.add_status('Status1', 'st1')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
st1.items.append(jump)
jump.parent = st1
workflow.store()
reloaded_workflow = Workflow.get(workflow.id)
assert reloaded_workflow.possible_status[0].items[0].condition is None
jump.condition = 'foobar'
workflow.store()
reloaded_workflow = Workflow.get(workflow.id)
assert reloaded_workflow.possible_status[0].items[0].condition == {
'type': 'python', 'value': 'foobar'}
def test_workflow_action_condition(two_pubs):
pub = two_pubs
pub._set_request(None) # to avoid after jobs
workflow = Workflow(name='jump condition migration')
st1 = workflow.add_status('Status1', 'st1')
workflow.store()
role = Role(name='bar1')
role.store()
user = pub.user_class()
user.roles = [role.id]
user.store()
choice = ChoiceWorkflowStatusItem()
choice.by = [role.id]
choice.id = '_x'
st1.items.append(choice)
choice.parent = st1
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
formdef.workflow_id = workflow.id
formdef.store()
formdata1 = formdef.data_class()()
formdata1.data = {'1': 'foo'}
formdata1.just_created()
formdata1.store()
formdata2 = formdef.data_class()()
formdata2.data = {'2': 'bar'}
formdata2.just_created()
formdata2.store()
assert formdata1.get_actions_roles() == set([role.id])
assert formdata2.get_actions_roles() == set([role.id])
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 2
choice.condition = {'type': 'python', 'value': 'form_var_foo == "foo"'}
workflow.store()
with pub.substitutions.temporary_feed(formdata1):
assert FormDef.get(formdef.id).data_class().get(formdata1.id).get_actions_roles() == set([role.id])
with pub.substitutions.temporary_feed(formdata2):
assert FormDef.get(formdef.id).data_class().get(formdata2.id).get_actions_roles() == set()
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 1
# check with a formdef condition
choice.condition = {'type': 'python', 'value': 'form_name == "test"'}
workflow.store()
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 0
choice.condition = {'type': 'python', 'value': 'form_name == "baz"'}
workflow.store()
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 2
# bad condition
LoggedError.wipe()
choice.condition = {'type': 'python', 'value': 'foobar == barfoo'}
workflow.store()
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 0
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.occurences_count > 1 # should be 2... == 12 with pickle, 4 with sql
assert logged_error.summary == 'Failed to evaluate condition'
assert logged_error.exception_class == 'NameError'
assert logged_error.exception_message == "name 'foobar' is not defined"
assert logged_error.expression == 'foobar == barfoo'
assert logged_error.expression_type == 'python'