wcs/tests/test_workflows.py

4155 lines
145 KiB
Python

import json
import datetime
import os
import pytest
import shutil
import time
import zipfile
import mock
from django.utils import six
from django.utils.encoding import force_bytes
from django.utils.six import BytesIO, StringIO
from django.utils.six.moves.urllib import parse as urlparse
from quixote import cleanup, get_response
from wcs.qommon.errors import ConnectionError
from quixote.http_request import Upload as QuixoteUpload
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.form import *
from wcs.formdef import FormDef
from wcs import sessions
from wcs.fields import (StringField, DateField, MapField, FileField, ItemField,
ItemsField, CommentField)
from wcs.logged_errors import LoggedError
from wcs.roles import Role
from wcs.workflows import (Workflow, WorkflowStatusItem,
SendmailWorkflowStatusItem, SendSMSWorkflowStatusItem,
CommentableWorkflowStatusItem, ChoiceWorkflowStatusItem,
DisplayMessageWorkflowStatusItem,
AbortActionException, WorkflowCriticalityLevel,
AttachmentEvolutionPart, WorkflowBackofficeFieldsFormDef)
from wcs.wf.anonymise import AnonymiseWorkflowStatusItem
from wcs.wf.criticality import ModifyCriticalityWorkflowStatusItem, MODE_INC, MODE_DEC, MODE_SET
from wcs.wf.dispatch import DispatchWorkflowStatusItem
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
from wcs.wf.timeout_jump import TimeoutWorkflowStatusItem
from wcs.wf.profile import UpdateUserProfileStatusItem
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem, JournalEvolutionPart
from wcs.wf.remove import RemoveWorkflowStatusItem
from wcs.wf.roles import AddRoleWorkflowStatusItem, RemoveRoleWorkflowStatusItem
from wcs.wf.wscall import WebserviceCallStatusItem
from wcs.wf.export_to_model import ExportToModel, transform_to_pdf
from wcs.wf.geolocate import GeolocateWorkflowStatusItem
from wcs.wf.backoffice_fields import SetBackofficeFieldsWorkflowStatusItem
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
from wcs.wf.notification import SendNotificationWorkflowStatusItem
from utilities import (create_temporary_pub, MockSubstitutionVariables,
clean_temporary_pub)
def setup_module(module):
cleanup()
def teardown_module(module):
clean_temporary_pub()
def pytest_generate_tests(metafunc):
if 'two_pubs' in metafunc.fixturenames:
metafunc.parametrize('two_pubs', ['pickle', 'sql'], indirect=True)
@pytest.fixture
def pub(request):
pub = create_temporary_pub()
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
req.response.filter = {}
req._user = None
pub._set_request(req)
req.session = sessions.BasicSession(id=1)
pub.set_config(req)
return pub
@pytest.fixture
def two_pubs(request):
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
req.response.filter = {}
req._user = None
pub._set_request(req)
req.session = sessions.BasicSession(id=1)
pub.set_config(req)
return pub
def test_get_json_export_dict(pub):
workflow = Workflow(name='wf')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
st2.forced_endpoint = True
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
workflow.roles['_other'] = 'Other Function'
root = workflow.get_json_export_dict()
assert set(root.keys()) >= set(['statuses', 'name', 'functions'])
assert root['name'] == 'wf'
assert len(root['statuses']) == 2
assert set(st['id'] for st in root['statuses']) == set(['st1', 'st2'])
assert all(set(status.keys()) >= set(['id', 'name', 'forced_endpoint']) for status in
root['statuses'])
assert root['statuses'][0]['id'] == 'st1'
assert root['statuses'][0]['name'] == 'Status1'
assert root['statuses'][0]['forced_endpoint'] is False
assert root['statuses'][0]['endpoint'] is False
assert root['statuses'][1]['id'] == 'st2'
assert root['statuses'][1]['name'] == 'Status2'
assert root['statuses'][1]['forced_endpoint'] is True
assert root['statuses'][1]['endpoint'] is True
def test_variable_compute(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': 'hello'}
formdata.store()
pub.substitutions.feed(formdata)
item = JumpWorkflowStatusItem()
# straight string
assert item.compute('blah') == 'blah'
# django template
assert item.compute('{{ form_var_foo }}') == 'hello'
assert item.compute('{{ form_var_foo }}', render=False) == '{{ form_var_foo }}'
assert item.compute('{% if form_var_foo %}its here{% endif %}') == 'its here'
assert item.compute('{% if form_var_foo %}') == '{% if form_var_foo %}'
with pytest.raises(Exception):
item.compute('{% if form_var_foo %}', raises=True)
# ezt string
assert item.compute('[form_var_foo]') == 'hello'
# ezt string, but not ezt asked
assert item.compute('[form_var_foo]', render=False) == '[form_var_foo]'
# ezt string, with an error
assert item.compute('[end]', raises=False) == '[end]'
with pytest.raises(Exception):
item.compute('[end]', raises=True)
# python expression
assert item.compute('=form_var_foo') == 'hello'
# python expression, with an error
assert item.compute('=1/0', raises=False) == '=1/0'
with pytest.raises(Exception):
item.compute('=1/0', raises=True)
# with context
assert item.compute('{{ form_var_foo }} {{ bar }}', context={'bar': 'world'}) == 'hello world'
assert item.compute('[form_var_foo] [bar]', context={'bar': 'world'}) == 'hello world'
assert item.compute('=form_var_foo + " " + bar', context={'bar': 'world'}) == 'hello world'
# django wins
assert item.compute('{{ form_var_foo }} [bar]', context={'bar': 'world'}) == 'hello [bar]'
# django template, no escaping by default
formdata.data = {'1': '<b>hello</b>'}
formdata.store()
assert item.compute('{{ form_var_foo }}') == '<b>hello</b>' # autoescape off by default
assert item.compute('{{ form_var_foo|safe }}') == '<b>hello</b>' # no escaping (implicit |safe)
assert item.compute('{{ form_var_foo|escape }}') == '&lt;b&gt;hello&lt;/b&gt;' #escaping
def test_variable_compute_dates(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': '2017-07-17'}
formdata.store()
pub.substitutions.feed(formdata)
item = JumpWorkflowStatusItem()
assert item.compute('=date(form_var_foo)') == datetime.date(2017, 7, 17)
assert item.compute('=date(form_var_foo) + days(1)') == datetime.date(2017, 7, 18)
assert item.compute('=date(2017, 7, 18)') == datetime.date(2017, 7, 18)
def test_jump_nothing(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.store()
formdata = formdef.data_class()()
item = JumpWorkflowStatusItem()
assert item.must_jump(formdata) is True
def test_jump_datetime_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.store()
formdata = formdef.data_class()()
item = JumpWorkflowStatusItem()
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
item.condition = {'type': 'python', 'value': 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
yesterday.timetuple()[:3]}
assert item.must_jump(formdata) is True
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
item.condition = {'type': 'python', 'value': 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
tomorrow.timetuple()[:3]}
assert item.must_jump(formdata) is False
def test_jump_date_conditions(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [DateField(id='2', label='Date', type='date', varname='date')]
formdef.store()
# create/store/get, to make sure the date format is acceptable
formdata = formdef.data_class()()
formdata.data = {'2': DateField().convert_value_from_str('2015-01-04')}
formdata.store()
formdata = formdef.data_class().get(formdata.id)
pub.substitutions.feed(formdata)
item = JumpWorkflowStatusItem()
item.condition = {
'type': 'python',
'value': 'utils.make_date(form_var_date) == utils.make_date("2015-01-04")'}
assert item.must_jump(formdata) is True
item = JumpWorkflowStatusItem()
item.condition = {
'type': 'python',
'value': 'utils.time_delta(form_var_date, "2015-01-04").days == 0'}
assert item.must_jump(formdata) is True
item = JumpWorkflowStatusItem()
item.condition = {
'type': 'python',
'value': 'utils.time_delta(utils.today(), "2015-01-04").days > 0'}
assert item.must_jump(formdata) is True
item = JumpWorkflowStatusItem()
item.condition = {
'type': 'python',
'value': 'utils.time_delta(datetime.datetime.now(), "2015-01-04").days > 0'}
assert item.must_jump(formdata) is True
item = JumpWorkflowStatusItem()
item.condition = {
'type': 'python',
'value': 'utils.time_delta(utils.time.localtime(), "2015-01-04").days > 0'}
assert item.must_jump(formdata) is True
def test_jump_count_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.store()
pub.substitutions.feed(formdef)
formdef.data_class().wipe()
formdata = formdef.data_class()()
item = JumpWorkflowStatusItem()
item.condition = {'type': 'python', 'value': 'form_objects.count < 2'}
assert item.must_jump(formdata) is True
for i in range(10):
formdata = formdef.data_class()()
formdata.store()
item.condition = {'type': 'python', 'value': 'form_objects.count < 2'}
assert item.must_jump(formdata) is False
def test_jump_bad_python_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.store()
pub.substitutions.feed(formdef)
formdef.data_class().wipe()
formdata = formdef.data_class()()
item = JumpWorkflowStatusItem()
LoggedError.wipe()
item.condition = {'type': 'python', 'value': 'form_var_foobar == 0'}
assert item.must_jump(formdata) is False
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.summary == 'Failed to evaluate condition'
assert logged_error.exception_class == 'NameError'
assert logged_error.exception_message == "name 'form_var_foobar' is not defined"
assert logged_error.expression == 'form_var_foobar == 0'
assert logged_error.expression_type == 'python'
LoggedError.wipe()
item.condition = {'type': 'python', 'value': '~ invalid ~'}
assert item.must_jump(formdata) is False
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.summary == 'Failed to evaluate condition'
assert logged_error.exception_class == 'SyntaxError'
assert logged_error.exception_message == 'unexpected EOF while parsing (<string>, line 1)'
assert logged_error.expression == '~ invalid ~'
assert logged_error.expression_type == 'python'
def test_jump_django_conditions(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': 'hello'}
pub.substitutions.feed(formdata)
item = JumpWorkflowStatusItem()
LoggedError.wipe()
item.condition = {'type': 'django', 'value': '1 < 2'}
assert item.must_jump(formdata) is True
item.condition = {'type': 'django', 'value': 'form_var_foo == "hello"'}
assert item.must_jump(formdata) is True
item.condition = {'type': 'django', 'value': 'form_var_foo|first|upper == "H"'}
assert item.must_jump(formdata) is True
item.condition = {'type': 'django', 'value': 'form_var_foo|first|upper == "X"'}
assert item.must_jump(formdata) is False
assert LoggedError.count() == 0
item.condition = {'type': 'django', 'value': '~ invalid ~'}
assert item.must_jump(formdata) is False
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.summary == 'Failed to evaluate condition'
assert logged_error.exception_class == 'TemplateSyntaxError'
assert logged_error.exception_message == "Could not parse the remainder: '~' from '~'"
assert logged_error.expression == '~ invalid ~'
assert logged_error.expression_type == 'django'
def test_check_auth(pub):
user = pub.user_class(name='foo')
user.store()
role = Role(name='bar1')
role.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
status_item = WorkflowStatusItem()
assert status_item.check_auth(formdata, user) is True
status_item.by = []
assert status_item.check_auth(formdata, user) is False
status_item.by = ['logged-users']
assert status_item.check_auth(formdata, user) is True
status_item.by = [role.id]
assert status_item.check_auth(formdata, user) is False
status_item.by = [int(role.id)]
assert status_item.check_auth(formdata, user) is False
user.roles = [role.id]
status_item.by = [role.id]
assert status_item.check_auth(formdata, user) is True
status_item.by = [int(role.id)]
assert status_item.check_auth(formdata, user) is True
status_item.by = ['_submitter']
assert status_item.check_auth(formdata, user) is False
formdata.user_id = user.id
assert status_item.check_auth(formdata, user) is True
formdata.user_id = None
status_item.by = ['_receiver']
assert status_item.check_auth(formdata, user) is False
formdata.workflow_roles = {'_receiver': user.id}
assert status_item.check_auth(formdata, user) is True
formdef.workflow_roles = {'_receiver': user.id}
formdata.workflow_roles = None
assert status_item.check_auth(formdata, user) is True
def test_dispatch(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
Role.wipe()
role = Role(name='xxx')
role.store()
item = DispatchWorkflowStatusItem()
formdata = formdef.data_class()()
item.perform(formdata)
assert not formdata.workflow_roles
formdata = formdef.data_class()()
item.role_key = '_receiver'
item.role_id = role.id
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': role.id}
def test_dispatch_auto(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
StringField(id='1', label='Test', type='string', varname='foo'),
]
formdef.store()
item = DispatchWorkflowStatusItem()
item.role_key = '_receiver'
item.dispatch_type = 'automatic'
formdata = formdef.data_class()()
pub.substitutions.reset()
pub.substitutions.feed(formdata)
item.perform(formdata)
assert not formdata.workflow_roles
Role.wipe()
role1 = Role('xxx1')
role1.store()
role2 = Role('xxx2')
role2.store()
for variable in ('form_var_foo', '{{form_var_foo}}'):
formdata.data = {}
formdata.workflow_roles = {}
item.variable = variable
item.rules = [
{'role_id': role1.id, 'value': 'foo'},
{'role_id': role2.id, 'value': 'bar'},
]
pub.substitutions.reset()
pub.substitutions.feed(formdata)
item.perform(formdata)
assert not formdata.workflow_roles
# no match
formdata.data = {'1': 'XXX'}
pub.substitutions.reset()
pub.substitutions.feed(formdata)
item.perform(formdata)
assert not formdata.workflow_roles
# match
formdata.data = {'1': 'foo'}
pub.substitutions.reset()
pub.substitutions.feed(formdata)
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': role1.id}
# other match
formdata.data = {'1': 'bar'}
pub.substitutions.reset()
pub.substitutions.feed(formdata)
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': role2.id}
def test_dispatch_computed(pub, caplog):
pub.cfg['debug'] = {'logger': True}
pub.write_cfg()
pub.get_app_logger(force=True) # force new logger
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
Role.wipe()
role = Role(name='xxx')
role.slug = 'yyy'
role.store()
item = DispatchWorkflowStatusItem()
formdata = formdef.data_class()()
item.perform(formdata)
assert not formdata.workflow_roles
formdata = formdef.data_class()()
item.role_key = '_receiver'
item.role_id = '="yyy"' # slug
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': role.id}
formdata = formdef.data_class()()
item.role_key = '_receiver'
item.role_id = '="xxx"' # name
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': role.id}
# unknown role
formdata = formdef.data_class()()
item.role_key = '_receiver'
item.role_id = '="foobar"'
item.perform(formdata)
assert not formdata.workflow_roles
assert caplog.records[-1].message == 'error in dispatch, missing role (="foobar")'
def test_roles(pub):
user = pub.user_class()
user.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
formdata.user_id = user.id
item = AddRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
item.role_id = '1'
item.perform(formdata)
assert pub.user_class.get(user.id).roles == ['1']
user.roles = None
user.store()
item = RemoveRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
item.role_id = '1'
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
user.roles = ['1']
user.store()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
user.roles = ['2', '1']
user.store()
item.perform(formdata)
assert pub.user_class.get(user.id).roles == ['2']
def test_add_remove_computed_roles(pub):
user = pub.user_class()
user.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
formdata.user_id = user.id
role = Role(name='plop')
role.store()
role2 = Role(name='xxx')
role2.store()
item = AddRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
item.role_id = role.name
item.perform(formdata)
assert pub.user_class.get(user.id).roles == [role.id]
user.roles = None
user.store()
item = RemoveRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
item.role_id = role.name
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
user.roles = [role.id]
user.store()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
user.roles = [role2.id, role.id]
user.store()
item.perform(formdata)
assert pub.user_class.get(user.id).roles == [role2.id]
def test_roles_idp(pub):
pub.cfg['sp'] = {'idp-manage-user-attributes': True}
pub.cfg['idp'] = {'xxx': {'metadata_url': 'http://idp.example.net/idp/saml2/metadata'}}
pub.write_cfg()
user = pub.user_class()
user.name_identifiers = ['xxx']
user.store()
role = Role(name='bar1')
role.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
formdata.user_id = user.id
item = AddRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
http_post_request.return_value = (None, 201, '', None)
get_response().process_after_jobs()
assert http_post_request.call_count == 0
item.role_id = role.id
item.perform(formdata)
assert pub.user_class.get(user.id).roles == [role.id]
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
http_post_request.return_value = (None, 201, '', None)
get_response().process_after_jobs()
assert http_post_request.call_count == 1
assert http_post_request.call_args[0][0].startswith(
'http://idp.example.net/api/roles/bar1/members/xxx/')
assert 'signature=' in http_post_request.call_args[0][0]
user.roles = None
user.store()
item2 = RemoveRoleWorkflowStatusItem()
item2.perform(formdata)
assert not pub.user_class.get(user.id).roles
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
http_delete_request.return_value = (None, 200, '', None)
get_response().process_after_jobs()
assert http_delete_request.call_count == 0
item2.role_id = role.id
user.roles = [role.id]
user.store()
item2.perform(formdata)
assert not pub.user_class.get(user.id).roles
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
http_delete_request.return_value = (None, 200, '', None)
get_response().process_after_jobs()
assert http_delete_request.call_count == 1
assert http_delete_request.call_args[0][0].startswith(
'http://idp.example.net/api/roles/bar1/members/xxx/')
assert 'signature=' in http_delete_request.call_args[0][0]
# out of http request/response cycle
pub._set_request(None)
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
http_post_request.return_value = (None, 201, '', None)
item.perform(formdata)
assert pub.user_class.get(user.id).roles == [role.id]
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
http_delete_request.return_value = (None, 200, '', None)
item2.perform(formdata)
assert pub.user_class.get(user.id).roles == []
def test_anonymise(two_pubs):
# build a backoffice field
Workflow.wipe()
wf = Workflow(name='wf with backoffice field')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='bo field 1', type='string'),
ItemField(id='bo2', label='list', type='item', items=['bofoo', 'bobar']),
]
wf.add_status('Status1')
wf.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
StringField(id='1', label='field 1', type='string'),
ItemField(id='2', label='list', type='item', items=['abc', 'def']),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.user_id = '1'
formdata.data = {'1': 'foo',
'2': 'abc', '2_display': 'abc',
'bo1': 'bar',
'bo2': 'foo', 'bo2_display': 'foo'}
formdata.workflow_data = {'e': 'mc2'}
formdata.submission_context = {'foo': 'bar'}
formdata.store()
item = AnonymiseWorkflowStatusItem()
item.perform(formdata)
assert formdef.data_class().get(formdata.id).user_id is None
assert formdef.data_class().get(formdata.id).anonymised
assert formdef.data_class().get(formdata.id).submission_context is None
assert formdef.data_class().get(formdata.id).data == {'1': None,
'2': 'abc', '2_display': 'abc',
'bo1': None,
'bo2': 'foo', 'bo2_display': 'foo'}
assert formdef.data_class().get(formdata.id).workflow_data is None
assert formdef.data_class().get(formdata.id).evolution[0].who is None
def test_remove(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
formdata.store()
item = RemoveWorkflowStatusItem()
assert formdef.data_class().has_key(formdata.id)
assert item.perform(formdata) == 'http://example.net'
assert not formdef.data_class().has_key(formdata.id)
formdata = formdef.data_class()()
formdata.store()
item = RemoveWorkflowStatusItem()
req = pub.get_request()
req.response.filter['in_backoffice'] = True
assert formdef.data_class().has_key(formdata.id)
assert item.perform(formdata) == '..'
assert not formdef.data_class().has_key(formdata.id)
req.response.filter = {}
assert req.session.message
def test_register_comment(pub):
pub.substitutions.feed(MockSubstitutionVariables())
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = RegisterCommenterWorkflowStatusItem()
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == ''
item.comment = 'Hello world'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<p>Hello world</p>'
item.comment = '<div>Hello world</div>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>Hello world</div>'
item.comment = '{{ test }}'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == ''
item.comment = '[test]'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<p>[test]</p>'
item.comment = '{{ bar }}'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>Foobar</div>'
item.comment = '[bar]'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<p>Foobar</p>'
item.comment = '<p>{{ foo }}</p>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 &lt; 3</p>'
item.comment = '<p>{{ foo|safe }}</p>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 < 3</p>'
item.comment = '{{ foo }}'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 &lt; 3</div>'
item.comment = '{{ foo|safe }}'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
item.comment = '[foo]'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 &lt; 3</p>'
item.comment = '<div>{{ foo }}</div>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 &lt; 3</div>'
item.comment = '<div>[foo]</div>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 &lt; 3</div>'
def test_register_comment_django_escaping(pub, emails):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {'1': '<p>hello</p>'}
formdata.store()
pub.substitutions.feed(formdata)
item = RegisterCommenterWorkflowStatusItem()
item.comment = '<div>{{form_var_foo}}</div>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div>&lt;p&gt;hello&lt;/p&gt;</div>'
# |safe
item = RegisterCommenterWorkflowStatusItem()
item.comment = '<div>{{form_var_foo|safe}}</div>'
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == '<div><p>hello</p></div>'
def test_register_comment_attachment(pub):
pub.substitutions.feed(MockSubstitutionVariables())
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = RegisterCommenterWorkflowStatusItem()
item.perform(formdata)
formdata.evolution[-1]._display_parts = None
assert formdata.evolution[-1].display_parts()[-1] == ''
if os.path.exists(os.path.join(get_publisher().app_dir, 'attachments')):
shutil.rmtree(os.path.join(get_publisher().app_dir, 'attachments'))
formdata.evolution[-1].parts = [AttachmentEvolutionPart('hello.txt',
fp=BytesIO(b'hello world'), varname='testfile')]
formdata.store()
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
assert len(subdir) == 4
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
item.comment = '{{ attachments.testfile.url }}'
pub.substitutions.feed(formdata)
item.perform(formdata)
url1 = formdata.evolution[-1].parts[-1].content
pub.substitutions.feed(formdata)
item.comment = '{{ form_attachments.testfile.url }}'
item.perform(formdata)
url2 = formdata.evolution[-1].parts[-1].content
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
assert len(subdir) == 4
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
assert url1 == url2
# test with a condition
item.comment = '{% if form_attachments.testfile %}file is there{% endif %}'
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].content == '<div>file is there</div>'
item.comment = '{% if form_attachments.nope %}file is there{% endif %}'
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].content == ''
# test with an action condition
item.condition = {'type': 'django', 'value': 'form_attachments.testfile'}
assert item.check_condition(formdata) is True
item.condition = {'type': 'django', 'value': 'form_attachments.missing'}
assert item.check_condition(formdata) is False
pub.substitutions.feed(formdata)
item.comment = '[attachments.testfile.url]'
item.perform(formdata)
url3 = formdata.evolution[-1].parts[-1].content
pub.substitutions.feed(formdata)
item.comment = '[form_attachments.testfile.url]'
item.perform(formdata)
url4 = formdata.evolution[-1].parts[-1].content
assert url3 == url4
def test_register_comment_with_attachment_file(pub):
wf = Workflow(name='comment with attachments')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'),
]
st1 = wf.add_status('Status1')
wf.store()
upload = PicklableUpload('test.jpeg', 'image/jpeg')
jpg = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
upload.receive([jpg])
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
FileField(id='1', label='File', type='file', varname='frontoffice_file'),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': upload}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
setbo = SetBackofficeFieldsWorkflowStatusItem()
setbo.parent = st1
setbo.fields = [{'field_id': 'bo1', 'value': '=form_var_frontoffice_file_raw'}]
setbo.perform(formdata)
if os.path.exists(os.path.join(get_publisher().app_dir, 'attachments')):
shutil.rmtree(os.path.join(get_publisher().app_dir, 'attachments'))
comment_text = 'File is attached to the form history'
item = RegisterCommenterWorkflowStatusItem()
item.attachments = ['form_var_backoffice_file1_raw']
item.comment = comment_text
item.perform(formdata)
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
assert len(subdir) == 4
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
assert len(formdata.evolution[-1].parts) == 2
assert isinstance(formdata.evolution[-1].parts[0], AttachmentEvolutionPart)
assert formdata.evolution[-1].parts[0].orig_filename == upload.orig_filename
assert isinstance(formdata.evolution[-1].parts[1], JournalEvolutionPart)
assert len(formdata.evolution[-1].parts[1].content) > 0
comment_view = str(formdata.evolution[-1].parts[1].view())
assert comment_view == '<p>%s</p>' % comment_text
if os.path.exists(os.path.join(get_publisher().app_dir, 'attachments')):
shutil.rmtree(os.path.join(get_publisher().app_dir, 'attachments'))
formdata.evolution[-1].parts = []
formdata.store()
ws_response_varname = 'ws_response_afile'
wf_data = {
'%s_filename' % ws_response_varname: 'hello.txt',
'%s_content_type' % ws_response_varname: 'text/plain',
'%s_b64_content' % ws_response_varname: base64.encodestring(b'hello world'),
}
formdata.update_workflow_data(wf_data)
formdata.store()
assert hasattr(formdata, 'workflow_data')
assert isinstance(formdata.workflow_data, dict)
item = RegisterCommenterWorkflowStatusItem()
item.attachments = ["utils.dict_from_prefix('%s_', locals())" % ws_response_varname]
item.comment = comment_text
item.perform(formdata)
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
assert len(subdir) == 4
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
assert len(formdata.evolution[-1].parts) == 2
assert isinstance(formdata.evolution[-1].parts[0], AttachmentEvolutionPart)
assert formdata.evolution[-1].parts[0].orig_filename == 'hello.txt'
assert isinstance(formdata.evolution[-1].parts[1], JournalEvolutionPart)
assert len(formdata.evolution[-1].parts[1].content) > 0
comment_view = str(formdata.evolution[-1].parts[1].view())
assert comment_view == '<p>%s</p>' % comment_text
def test_email(pub, emails):
pub.substitutions.feed(MockSubstitutionVariables())
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
user = pub.user_class(name='foo')
user.email = 'zorg@localhost'
user.store()
Role.wipe()
role1 = Role(name='foo')
role1.emails = ['foo@localhost']
role1.store()
role2 = Role(name='bar')
role2.emails = ['bar@localhost', 'baz@localhost']
role2.store()
# send using an uncompleted element
item = SendmailWorkflowStatusItem()
item.perform(formdata) # nothing
get_response().process_after_jobs()
assert emails.count() == 0
item.to = [role1.id]
item.perform(formdata) # no subject nor body
get_response().process_after_jobs()
assert emails.count() == 0
item.subject = 'foobar'
item.perform(formdata) # no body
get_response().process_after_jobs()
assert emails.count() == 0
# send for real
item.body = 'baz'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost']
assert 'baz' in emails.get('foobar')['payload']
# template for subject or body (Django)
emails.empty()
item.subject = '{{ bar }}'
item.body = '{{ foo }}'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert '1 < 3' in emails.get('Foobar')['payload']
# template for subject or body (ezt)
emails.empty()
item.subject = '[bar]'
item.body = '[foo]'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert '1 < 3' in emails.get('Foobar')['payload']
# two recipients
emails.empty()
item.subject = 'foobar'
item.to = [role1.id, role2.id]
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['to'] == 'Undisclosed recipients:;'
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost',
'bar@localhost', 'baz@localhost']
# submitter as recipient, no known email address
emails.empty()
item.to = ['_submitter']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 0
# submitter as recipient, known email address
emails.empty()
formdata.user_id = user.id
formdata.store()
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['zorg@localhost']
# computed recipient
emails.empty()
item.to = ['=email']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['sub@localhost']
# computed list of recipients
emails.empty()
item.to = ['=["foo@localhost", "bar@localhost"]']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost', 'bar@localhost']
# multiple recipients in a single computed string
emails.empty()
item.to = ['="foo@localhost, bar@localhost"']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost', 'bar@localhost']
# string as recipient
emails.empty()
item.to = 'xyz@localhost'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
# string as recipient (but correctly set in a list)
emails.empty()
item.to = ['xyz@localhost']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
# multiple recipients in a static string
emails.empty()
item.to = ['foo@localhost, bar@localhost']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost', 'bar@localhost']
# invalid recipient
emails.empty()
item.to = ['=foobar']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 0
# empty recipient
emails.empty()
item.to = ['=None']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 0
# custom from email
emails.empty()
item.to = [role1.id]
item.custom_from = 'foobar@localhost'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar').get('from') == 'foobar@localhost'
# custom from email (computed)
emails.empty()
item.to = [role1.id]
item.custom_from = '="foobar@localhost"'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar').get('from') == 'foobar@localhost'
def test_email_django_escaping(pub, emails):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = SendmailWorkflowStatusItem()
item.to = ['foo@localhost']
item.subject = 'Foobar'
# explicit safe strings
emails.empty()
formdata.data = {'1': '1 < 3'}
item.body = '{{ form_var_foo|safe }}'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')['payload'].strip() == '1 < 3'
# automatic no-escaping (because text/plain)
emails.empty()
formdata.data = {'1': '1 < 3'}
item.body = '{{ form_var_foo }}'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')['payload'].strip() == '1 < 3'
# automatic escaping (because mail body is HTML)
emails.empty()
formdata.data = {'1': '1 < 3'}
item.body = '<p>{{ form_var_foo }}</p>'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert '<p>1 &lt; 3</p>' in emails.get('Foobar')['payload'].strip()
# no automatic escaping for subject (even if mail body is HTML)
emails.empty()
formdata.data = {'1': '1 < 3'}
item.subject = '{{ form_var_foo }}'
item.body = '<p>{{ form_var_foo }}</p>'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('1 < 3')
def test_email_attachments(pub, emails):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
FileField(id='3', label='File', type='file', varname='file'),
]
formdef.store()
upload = PicklableUpload('test.jpeg', 'image/jpeg')
jpg = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
upload.receive([jpg])
formdata = formdef.data_class()()
formdata.data = {'3': upload}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
sendmail = SendmailWorkflowStatusItem()
sendmail.subject = 'foobar'
sendmail.body = '<p>force html</p>'
sendmail.to = ['to@example.net']
sendmail.attachments = ['form_var_file_raw']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
# build a backoffice field
Workflow.wipe()
FormDef.wipe()
wf = Workflow(name='email with attachments')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
FileField(id='bo1-1x', label='bo field 1', type='file', varname='backoffice_file1'),
FileField(id='bo2', label='bo field 2', type='file', varname='backoffice_file2'),
]
st1 = wf.add_status('Status1')
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
FileField(id='1', label='File', type='file', varname='frontoffice_file'),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': upload}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
# store file in backoffice field form_fbo1_1x / form_var_backoffice_file_raw
setbo = SetBackofficeFieldsWorkflowStatusItem()
setbo.parent = st1
setbo.fields = [{'field_id': 'bo1-1x', 'value': '=form_var_frontoffice_file_raw'}]
setbo.perform(formdata)
# check compatibility with actions defined before #33366 was fixed
emails.empty()
sendmail.attachments = ['form_fbo1-1x']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
# check with correct varname-less field
emails.empty()
sendmail.attachments = ['form_fbo1_1x']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
# check with variable
emails.empty()
sendmail.attachments = ['form_var_backoffice_file1_raw']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
emails.empty()
sendmail.attachments = ['form_var_backoffice_file1_raw', 'form_var_backoffice_file2_raw']
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
# backoffice_file2 is unset, no more parts :
assert len(emails.emails['foobar']['msg'].get_payload()) == 2
# set backoffice_file2 and retry
setbo.fields = [{'field_id': 'bo2',
'value': '={"content": "blah", "filename": "hello.txt", '
'"content_type": "text/plain"}'}]
setbo.perform(formdata)
emails.empty()
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
assert emails.emails['foobar']['msg'].get_payload()[2].get_content_type() == 'text/plain'
if six.PY2:
assert emails.emails['foobar']['msg'].get_payload()[2].get_payload() == 'blah'
else:
assert base64.decodestring(force_bytes(emails.emails['foobar']['msg'].get_payload()[2].get_payload())) == b'blah'
assert len(emails.emails['foobar']['msg'].get_payload()) == 3
emails.empty()
sendmail.attachments = [
'utils.attachment("Hello world")',
'utils.attachment(\'{"hello": "world"}\', content_type=\'application/json\')',
]
sendmail.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.emails['foobar']['msg'].is_multipart()
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
assert emails.emails['foobar']['msg'].get_payload(0).get_content_type() == 'text/html'
assert emails.emails['foobar']['msg'].get_payload(1).get_content_type() == 'application/octet-stream'
assert emails.emails['foobar']['msg'].get_payload(2).get_content_type() == 'application/json'
payload1 = emails.emails['foobar']['msg'].get_payload(1)
payload2 = emails.emails['foobar']['msg'].get_payload(2)
assert payload1.get_payload(decode=True) == b"Hello world"
assert json.loads(force_text(payload2.get_payload(decode=True))) == {'hello': 'world'}
def test_webservice_call(http_requests, pub):
pub.substitutions.feed(MockSubstitutionVariables())
wf = Workflow(name='wf1')
st1 = wf.add_status('Status1', 'st1')
sterr = wf.add_status('StatusErr', 'sterr')
wf.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'GET'
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.post = True
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'POST'
payload = json.loads(http_requests.get_last('body'))
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
assert payload['display_id'] == formdata.get_display_id()
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.post_data = {'str': 'abcd', 'one': '=1', 'django': '{{ form_number }}',
'evalme': '=form_number', 'error':'=1=3'}
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'POST'
payload = json.loads(http_requests.get_last('body'))
assert payload == {'one': 1, 'str': 'abcd',
'evalme': formdata.get_display_id(),
'django': formdata.get_display_id()}
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.post = True
item.post_data = {'str': 'abcd', 'one': '=1', 'decimal': '=Decimal(2)',
'evalme': '=form_number', 'error':'=1=3'}
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'POST'
payload = json.loads(http_requests.get_last('body'))
assert payload['extra'] == {'one': 1, 'str': 'abcd',
'decimal': '2', 'evalme': formdata.get_display_id()}
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
assert payload['display_id'] == formdata.get_display_id()
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_response'] == {'foo': 'bar'}
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.request_signature_key = 'xxx'
item.perform(formdata)
assert 'signature=' in http_requests.get_last('url')
assert http_requests.get_last('method') == 'GET'
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.request_signature_key = '{{ doesntexist }}'
item.perform(formdata)
assert not 'signature=' in http_requests.get_last('url')
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.request_signature_key = '{{ empty }}'
item.perform(formdata)
assert not 'signature=' in http_requests.get_last('url')
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.request_signature_key = '[empty]'
item.perform(formdata)
assert not 'signature=' in http_requests.get_last('url')
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.request_signature_key = '{{ bar }}'
item.perform(formdata)
assert 'signature=' in http_requests.get_last('url')
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.request_signature_key = '[bar]'
item.perform(formdata)
assert 'signature=' in http_requests.get_last('url')
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/204'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 204
assert formdata.workflow_data.get('xxx_time')
assert 'xxx_response' not in formdata.workflow_data
assert 'xxx_error_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/404'
item.varname = 'xxx'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 404
assert formdata.workflow_data.get('xxx_time')
assert 'xxx_error_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/404-json'
item.varname = 'xxx'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 404
assert formdata.workflow_data.get('xxx_error_response') == {'err': 1}
assert formdata.workflow_data.get('xxx_time')
assert 'xxx_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/404'
item.action_on_4xx = ':pass'
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/500'
with pytest.raises(AbortActionException):
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/500'
item.action_on_5xx = ':pass'
item.perform(formdata)
item = WebserviceCallStatusItem()
item.parent = st1
assert item.get_jump_label(st1.id) == 'Webservice'
assert item.get_jump_label('sterr') == 'Error calling webservice'
item.label = 'Plop'
assert item.get_jump_label(st1.id) == 'Webservice "Plop"'
assert item.get_jump_label('sterr') == 'Error calling webservice "Plop"'
item.url = 'http://remote.example.net/500'
item.action_on_5xx = 'sterr' # jump to status
formdata.status = 'wf-st1'
formdata.store()
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.status == 'wf-sterr'
item.action_on_5xx = 'stdeleted' # removed status
formdata.status = 'wf-st1'
formdata.store()
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.status == 'wf-st1' # unknown status acts like :stop
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.varname = 'xxx'
item.action_on_bad_data = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 200
assert 'xxx_response' not in formdata.workflow_data
assert 'xxx_error_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/404'
item.record_errors = True
item.action_on_4xx = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].summary == '404 whatever'
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.varname = 'xxx'
item.action_on_bad_data = ':stop'
item.record_errors = True
with pytest.raises(AbortActionException):
item.perform(formdata)
if six.PY2:
assert formdata.evolution[-1].parts[-1].summary == 'ValueError: No JSON object could be decoded\n'
else:
assert formdata.evolution[-1].parts[-1].summary == 'json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)\n'
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_time')
assert 'xxx_error_response' not in formdata.workflow_data
formdata.workflow_data = None
# check storing response as attachment
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.varname = 'xxx'
item.response_type = 'attachment'
item.record_errors = True
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_content_type') == 'text/xml'
attachment = formdata.evolution[-1].parts[-1]
assert isinstance(attachment, AttachmentEvolutionPart)
assert attachment.base_filename == 'xxx.xml'
assert attachment.content_type == 'text/xml'
attachment.fp.seek(0)
assert attachment.fp.read(5) == b'<?xml'
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/400-json'
item.record_errors = True
item.action_on_4xx = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
rendered = formdata.evolution[-1].parts[-1].view()
assert not rendered # empty if not in backoffice
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': '/backoffice/'})
pub._set_request(req)
rendered = formdata.evolution[-1].parts[-1].view()
assert 'Error during webservice call' in str(rendered)
assert 'Error Code: 1' in str(rendered)
assert 'Error Description: :(' in str(rendered)
item.label = 'do that'
with pytest.raises(AbortActionException):
item.perform(formdata)
rendered = formdata.evolution[-1].parts[-1].view()
assert 'Error during webservice call &quot;do that&quot;' in str(rendered)
item = WebserviceCallStatusItem()
item.method = 'GET'
item.url = 'http://remote.example.net?in_url=1'
item.qs_data = {
'str': 'abcd',
'one': '=1',
'evalme': '=form_number',
'django': '{{ form_number }}',
'ezt': '[form_number]',
'error': '=1=3',
'in_url': '2',
}
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('method') == 'GET'
qs = urlparse.parse_qs(http_requests.get_last('url').split('?')[1])
assert set(qs.keys()) == set(['in_url', 'str', 'one', 'evalme', 'django', 'ezt'])
assert qs['in_url'] == ['1', '2']
assert qs['one'] == ['1']
assert qs['evalme'] == [formdata.get_display_id()]
assert qs['django'] == [formdata.get_display_id()]
assert qs['ezt'] == [formdata.get_display_id()]
assert qs['str'] == ['abcd']
item = WebserviceCallStatusItem()
item.method = 'DELETE'
item.post = False
item.url = 'http://remote.example.net/json'
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('method') == 'DELETE'
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.method = 'PUT'
item.post = False
item.post_data = {'str': 'abcd', 'one': '=1',
'evalme': '=form_number', 'error':'=1=3'}
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'PUT'
payload = json.loads(http_requests.get_last('body'))
assert payload == {'one': 1, 'str': 'abcd', 'evalme': formdata.get_display_id()}
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.method = 'PATCH'
item.post = False
item.post_data = {'str': 'abcd', 'one': '=1',
'evalme': '=form_number', 'error':'=1=3'}
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'PATCH'
payload = json.loads(http_requests.get_last('body'))
assert payload == {'one': 1, 'str': 'abcd', 'evalme': formdata.get_display_id()}
def test_webservice_waitpoint(pub):
item = WebserviceCallStatusItem()
assert item.waitpoint
item.action_on_app_error = ':pass'
item.action_on_4xx = ':pass'
item.action_on_5xx = ':pass'
item.action_on_bad_data = ':pass'
item.action_on_network_errors = ':pass'
assert not item.waitpoint
item.action_on_network_errors = ':stop'
assert item.waitpoint
def test_webservice_call_error_handling(http_requests, pub):
pub.substitutions.feed(MockSubstitutionVariables())
FormDef.wipe()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-err1'
item.action_on_app_error = ':stop'
item.action_on_4xx = ':pass'
item.action_on_5xx = ':pass'
item.action_on_network_errors = ':pass'
with pytest.raises(AbortActionException):
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheader1'
item.action_on_app_error = ':stop'
item.action_on_4xx = ':pass'
item.action_on_5xx = ':pass'
item.action_on_network_errors = ':pass'
with pytest.raises(AbortActionException):
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheaderstr'
item.action_on_app_error = ':stop'
item.action_on_4xx = ':pass'
item.action_on_5xx = ':pass'
item.action_on_network_errors = ':pass'
with pytest.raises(AbortActionException):
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-err0'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 0
assert formdata.workflow_data['xxx_response'] == {'data': 'foo', 'err': 0}
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-err0'
item.varname = 'xxx'
item.action_on_app_error = ':stop'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 0
assert formdata.workflow_data['xxx_response'] == {'data': 'foo', 'err': 0}
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-err1'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 1
assert 'xxx_response' not in formdata.workflow_data
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errstr'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 'bug'
assert 'xxx_response' not in formdata.workflow_data
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-err1'
item.varname = 'xxx'
item.action_on_app_error = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 1
assert formdata.workflow_data['xxx_error_response'] == {'data': '', 'err': 1}
assert 'xxx_response' not in formdata.workflow_data
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheader0'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 0
assert formdata.workflow_data['xxx_app_error_header'] == '0'
assert formdata.workflow_data['xxx_response'] == {'foo': 'bar'}
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheader1'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 1
assert formdata.workflow_data['xxx_app_error_header'] == '1'
assert formdata.workflow_data['xxx_error_response'] == {'foo': 'bar'}
assert 'xxx_response' not in formdata.workflow_data
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheaderstr'
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 'bug'
assert formdata.workflow_data['xxx_app_error_header'] == 'bug'
assert formdata.workflow_data['xxx_error_response'] == {'foo': 'bar'}
assert 'xxx_response' not in formdata.workflow_data
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheader1'
item.varname = 'xxx'
item.action_on_app_error = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.workflow_data['xxx_status'] == 200
assert formdata.workflow_data['xxx_app_error_code'] == 1
assert formdata.workflow_data['xxx_app_error_header'] == '1'
assert formdata.workflow_data['xxx_error_response'] == {'foo': 'bar'}
assert 'xxx_response' not in formdata.workflow_data
assert formdata.workflow_data.get('xxx_time')
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml-errheader'
item.varname = 'xxx'
item.response_type = 'attachment'
item.record_errors = True
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_app_error_code') == 1
assert formdata.workflow_data.get('xxx_app_error_header') == '1'
assert 'xxx_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml-errheader'
item.varname = 'xxx'
item.response_type = 'attachment'
item.record_errors = True
item.action_on_app_error = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_app_error_code') == 1
assert formdata.workflow_data.get('xxx_app_error_header') == '1'
assert 'xxx_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml-errheader'
item.varname = 'xxx'
item.response_type = 'json' # wait for json but receive xml
item.record_errors = True
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_app_error_code') == 1
assert formdata.workflow_data.get('xxx_app_error_header') == '1'
assert 'xxx_response' not in formdata.workflow_data
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-err1'
item.action_on_app_error = ':stop'
item.response_type = 'attachment' # err value is not an error
item.perform(formdata) # so, everything is "ok" here
formdata.workflow_data = None
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json-errheaderstr'
item.action_on_app_error = ':stop'
item.response_type = 'attachment'
with pytest.raises(AbortActionException):
item.perform(formdata)
formdata.workflow_data = None
# xml instead of json is not a app_error
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.varname = 'xxx'
item.action_on_app_error = ':stop'
item.action_on_4xx = ':pass'
item.action_on_5xx = ':pass'
item.action_on_network_errors = ':pass'
item.action_on_bad_data = ':pass'
item.perform(formdata)
formdata.workflow_data = None
# connection error
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/connection-error'
item.record_errors = True
item.action_on_network_errors = ':pass'
item.perform(formdata)
assert not formdata.workflow_data
# connection error, with varname
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/connection-error'
item.varname = 'plop'
item.record_errors = True
item.action_on_network_errors = ':pass'
item.perform(formdata)
assert 'ConnectionError: error\n' in formdata.evolution[-1].parts[-1].summary
assert formdata.workflow_data['plop_connection_error'] == 'error'
def test_webservice_call_store_in_backoffice_filefield(http_requests, pub):
wf = Workflow(name='wscall to backoffice file field')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'),
]
st1 = wf.add_status('Status1')
wf.store()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {}
formdata.store()
# check storing response in backoffice file field
item = WebserviceCallStatusItem()
item.parent = st1
item.backoffice_filefield_id = 'bo1'
item.url = 'http://remote.example.net/xml'
item.response_type = 'attachment'
item.record_errors = True
item.perform(formdata)
assert 'bo1' in formdata.data
fbo1 = formdata.data['bo1']
assert fbo1.base_filename == 'file-bo1.xml'
assert fbo1.content_type == 'text/xml'
assert fbo1.get_content().startswith(b'<?xml')
# nothing else is stored
assert formdata.workflow_data is None
assert not formdata.evolution[-1].parts
# store in backoffice file field + varname
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
item.varname = 'xxx'
item.perform(formdata)
# backoffice file field
assert 'bo1' in formdata.data
fbo1 = formdata.data['bo1']
assert fbo1.base_filename == 'xxx.xml'
assert fbo1.content_type == 'text/xml'
assert fbo1.get_content().startswith(b'<?xml')
# varname => workflow_data and AttachmentEvolutionPart
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_content_type') == 'text/xml'
attachment = formdata.evolution[-1].parts[-1]
assert isinstance(attachment, AttachmentEvolutionPart)
assert attachment.base_filename == 'xxx.xml'
assert attachment.content_type == 'text/xml'
attachment.fp.seek(0)
assert attachment.fp.read(5) == b'<?xml'
# no more 'bo1' backoffice field: do nothing
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
wf.backoffice_fields_formdef.fields = [
FileField(id='bo2', label='bo field 2', type='file'), # id != 'bo1'
]
item.perform(formdata)
assert formdata.data == {}
# backoffice field is not a field file:
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='bo field 1', type='string'),
]
item.perform(formdata)
assert formdata.data == {}
# no field at all:
wf.backoffice_fields_formdef.fields = []
item.perform(formdata)
assert formdata.data == {}
def test_webservice_target_status(pub):
wf = Workflow(name='boo')
status1 = wf.add_status('Status1', 'st1')
status2 = wf.add_status('Status2', 'st2')
wf.store()
item = WebserviceCallStatusItem()
item.parent = status1
assert item.get_target_status() == [status1.id]
item.action_on_app_error = status1.id
item.action_on_4xx = status2.id
item.action_on_5xx = status2.id
targets = item.get_target_status()
assert len(item.get_target_status()) == 4
assert targets.count(status1) == 2
assert targets.count(status2) == 2
item.action_on_bad_data = 'st3' # doesn't exist
targets = item.get_target_status()
assert len(item.get_target_status()) == 4
assert targets.count(status1) == 2
assert targets.count(status2) == 2
def test_timeout(two_pubs):
workflow = Workflow(name='timeout')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = workflow.id
assert formdef.get_workflow().id == workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdata_id = formdata.id
time.sleep(0.3)
_apply_timeouts(two_pubs)
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
# check there's no crash on workflow without jumps
formdef = FormDef()
formdef.name = 'xxx'
formdef.store()
_apply_timeouts(two_pubs)
def test_legacy_timeout(pub):
workflow = Workflow(name='timeout')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
jump = TimeoutWorkflowStatusItem()
jump.id = '_jump'
jump.timeout = 0.1
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = workflow.id
assert formdef.get_workflow().id == workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdata_id = formdata.id
time.sleep(0.3)
_apply_timeouts(pub)
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
def test_timeout_then_remove(two_pubs):
workflow = Workflow(name='timeout-then-remove')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
remove = RemoveWorkflowStatusItem()
st2.items.append(remove)
remove.parent = st2
workflow.store()
formdef = FormDef()
formdef.name = 'baz%s' % id(two_pubs)
formdef.fields = []
formdef.workflow_id = workflow.id
assert formdef.get_workflow().id == workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdata_id = formdata.id
assert str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
time.sleep(0.2)
_apply_timeouts(two_pubs)
assert not str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
def test_timeout_with_mark(two_pubs):
workflow = Workflow(name='timeout')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.status = 'st2'
jump.set_marker_on_status = True
st1.items.append(jump)
jump.parent = st1
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = workflow.id
assert formdef.get_workflow().id == workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdata_id = formdata.id
time.sleep(0.3)
_apply_timeouts(two_pubs)
formdata = formdef.data_class().get(formdata_id)
assert formdata.workflow_data.get('_markers_stack') == [{'status_id': 'st1'}]
def test_sms(pub, sms_mocking):
pub.cfg['sms'] = {'mode': 'xxx'}
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = SendSMSWorkflowStatusItem()
item.to = ['000']
item.body = 'XXX'
assert len(sms_mocking.sms) == 0
item.perform(formdata)
assert len(sms_mocking.sms) == 1
assert sms_mocking.sms[0]['destinations'] == ['000']
assert sms_mocking.sms[0]['text'] == 'XXX'
# check None as recipient is not passed to the SMS backend
item.to = [None]
item.perform(formdata) # nothing
assert len(sms_mocking.sms) == 1
item.to = ['000', None]
item.perform(formdata)
assert len(sms_mocking.sms) == 2
assert sms_mocking.sms[1]['destinations'] == ['000']
assert sms_mocking.sms[1]['text'] == 'XXX'
pub.substitutions.feed(MockSubstitutionVariables())
item.body = 'dj{{ bar }}'
item.perform(formdata)
assert len(sms_mocking.sms) == 3
assert sms_mocking.sms[2]['destinations'] == ['000']
assert sms_mocking.sms[2]['text'] == 'djFoobar'
item.body = 'ezt[bar]'
item.perform(formdata)
assert len(sms_mocking.sms) == 4
assert sms_mocking.sms[3]['destinations'] == ['000']
assert sms_mocking.sms[3]['text'] == 'eztFoobar'
# disable SMS system
pub.cfg['sms'] = {'mode': 'none'}
item.to = ['000']
item.body = 'XXX'
item.perform(formdata) # nothing
assert len(sms_mocking.sms) == 4
def test_sms_with_passerelle(pub):
pub.cfg['sms'] = {'mode': 'passerelle',
'passerelle_url': 'http://passerelle.example.com/send?nostop=1',
'sender': 'Passerelle'}
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = SendSMSWorkflowStatusItem()
item.to = ['1234']
item.body = 'my message'
with mock.patch('wcs.wscalls.get_secret_and_orig') as mocked_secret_and_orig:
mocked_secret_and_orig.return_value = ('secret', 'localhost')
with mock.patch('wcs.qommon.misc._http_request') as mocked_http_post:
mocked_http_post.return_value = ('response', '200', 'data', 'headers')
item.perform(formdata)
url = mocked_http_post.call_args[0][0]
payload = mocked_http_post.call_args[1]['body']
assert 'http://passerelle.example.com' in url
assert '?nostop=1' in url
assert 'orig=localhost' in url
assert 'signature=' in url
json_payload = json.loads(payload)
assert 'message' in json_payload
assert json_payload['message'] == 'my message'
assert json_payload['to'] == ['1234']
assert json_payload['from'] == 'Passerelle'
def test_display_form(two_pubs):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
display_form = FormWorkflowStatusItem()
display_form.id = '_x'
display_form.varname = 'xxx'
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields.append(StringField(id='1', label='Test',
type='string'))
display_form.formdef.fields.append(DateField(id='2', label='Date',
type='date', varname='date'))
st1.items.append(display_form)
display_form.parent = st1
form = Form(action='#')
display_form.fill_form(form, formdata, None)
assert form.widgets[0].title == 'Test'
assert form.widgets[1].title == 'Date'
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '2015-05-12', 'submit': 'submit'}
display_form.submit_form(form, formdata, None, None)
assert formdata.get_substitution_variables()['xxx_var_date'] == '2015-05-12'
two_pubs.cfg['language'] = {'language': 'fr'}
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
form = Form(action='#')
display_form.fill_form(form, formdata, None)
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '12/05/2015', 'submit': 'submit'}
display_form.submit_form(form, formdata, None, None)
assert formdata.get_substitution_variables()['xxx_var_date'] == '12/05/2015'
assert formdata.get_substitution_variables()['xxx_var_date_raw'] == \
time.strptime('2015-05-12', '%Y-%m-%d')
two_pubs.cfg['language'] = {'language': 'en'}
def test_display_form_and_comment(pub):
role = Role(name='bar1')
role.store()
user = pub.user_class()
user.roles = [role.id]
user.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
display_form = FormWorkflowStatusItem()
display_form.by = [role.id]
display_form.id = '_x'
display_form.varname = 'xxx'
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields.append(CommentField(id='1', label='Test',
type='comment'))
st1.items.append(display_form)
display_form.parent = st1
commentable = CommentableWorkflowStatusItem()
commentable.by = [role.id]
st1.items.append(commentable)
commentable.parent = st1
wf.store()
formdef.workflow = wf
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
assert formdata.get_status().name == 'Status1'
form = formdata.get_workflow_form(user)
assert 'Test' in str(form.widgets[0].render())
assert '<textarea' in str(form.widgets[1].render())
def test_display_form_migration(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
display_form = FormWorkflowStatusItem()
display_form.id = '_x'
display_form.varname = 'xxx'
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields = [
ItemField(id='1', label='Test', type='item')
]
st1.items.append(display_form)
display_form.parent = st1
display_form.formdef.fields[0].show_as_radio = True
wf.store()
wf = Workflow.get(wf.id)
assert wf.possible_status[0].items[0].formdef.fields[0].display_mode == 'radio'
def test_choice_button_no_label(pub):
role = Role(name='bar1')
role.store()
user = pub.user_class()
user.roles = [role.id]
user.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
choice = ChoiceWorkflowStatusItem()
choice.by = [role.id]
choice.id = '_x'
st1.items.append(choice)
choice.parent = st1
choice2 = ChoiceWorkflowStatusItem()
choice2.label = 'TEST'
choice2.by = [role.id]
choice2.id = '_x2'
st1.items.append(choice2)
choice2.parent = st1
wf.store()
formdef.workflow = wf
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
assert formdata.get_status().name == 'Status1'
form = formdata.get_workflow_form(user)
form.render()
assert str(form.render()).count('<button') == 1
assert '>TEST</button>' in str(form.render())
def test_workflow_role_type_migration(pub):
workflow = Workflow(name='role migration')
st1 = workflow.add_status('Status1', 'st1')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = [1, 2]
st1.items.append(jump)
jump.parent = st1
workflow.store()
reloaded_workflow = Workflow.get(workflow.id)
assert reloaded_workflow.possible_status[0].items[0].by == ['1', '2']
def test_workflow_display_message(pub):
pub.substitutions.feed(MockSubstitutionVariables())
workflow = Workflow(name='display message')
st1 = workflow.add_status('Status1', 'st1')
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.id = '1'
display_message = DisplayMessageWorkflowStatusItem()
display_message.parent = st1
display_message.message = 'test'
assert display_message.get_message(formdata) == 'test'
display_message.message = '{{ number }}'
assert display_message.get_message(formdata) == str(formdata.id)
display_message.message = '[number]'
assert display_message.get_message(formdata) == str(formdata.id)
display_message.message = '{{ bar }}'
assert display_message.get_message(formdata) == 'Foobar'
display_message.message = '[bar]'
assert display_message.get_message(formdata) == 'Foobar'
# makes sure the string is correctly escaped for HTML
display_message.message = '{{ foo }}'
assert display_message.get_message(formdata) == '1 &lt; 3'
display_message.message = '[foo]'
assert display_message.get_message(formdata) == '1 &lt; 3'
def test_workflow_display_message_to(pub):
workflow = Workflow(name='display message to')
st1 = workflow.add_status('Status1', 'st1')
role = Role(name='foorole')
role.store()
role2 = Role(name='no-one-role')
role2.store()
user = pub.user_class(name='baruser')
user.roles = []
user.store()
FormDef.wipe()
formdef = FormDef()
formdef.url_name = 'foobar'
formdef._workflow = workflow
formdata = formdef.data_class()()
formdata.status = 'wf-st1'
display_message = DisplayMessageWorkflowStatusItem()
display_message.parent = st1
st1.items.append(display_message)
display_message.message = 'all'
display_message.to = None
assert display_message.get_message(formdata) == 'all'
assert formdata.get_workflow_messages() == ['all']
display_message.message = 'to-role'
display_message.to = [role.id]
assert display_message.get_message(formdata) == ''
assert formdata.get_workflow_messages() == []
pub._request._user = user
display_message.message = 'to-role'
display_message.to = [role.id]
assert display_message.get_message(formdata) == ''
assert formdata.get_workflow_messages() == []
user.roles = [role.id]
assert display_message.get_message(formdata) == 'to-role'
assert formdata.get_workflow_messages() == ['to-role']
user.roles = []
display_message.message = 'to-submitter'
display_message.to = ['_submitter']
assert display_message.get_message(formdata) == ''
assert formdata.get_workflow_messages() == []
formdata.user_id = user.id
assert display_message.get_message(formdata) == 'to-submitter'
assert formdata.get_workflow_messages() == ['to-submitter']
display_message.message = 'to-role-or-submitter'
display_message.to = [role.id, '_submitter']
assert display_message.get_message(formdata) == 'to-role-or-submitter'
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
formdata.user_id = None
assert display_message.get_message(formdata) == ''
assert formdata.get_workflow_messages() == []
user.roles = [role.id]
assert display_message.get_message(formdata) == 'to-role-or-submitter'
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
formdata.user_id = user.id
assert display_message.get_message(formdata) == 'to-role-or-submitter'
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
display_message.to = [role2.id]
assert display_message.get_message(formdata) == ''
assert formdata.get_workflow_messages() == []
display_message.message = 'd1'
display_message2 = DisplayMessageWorkflowStatusItem()
display_message2.parent = st1
st1.items.append(display_message2)
display_message2.message = 'd2'
display_message2.to = [role.id, '_submitter']
assert formdata.get_workflow_messages() == ['d2']
user.roles = [role.id, role2.id]
assert 'd1' in formdata.get_workflow_messages()
assert 'd2' in formdata.get_workflow_messages()
def test_workflow_display_message_line_details(pub):
workflow = Workflow(name='display message to')
st1 = workflow.add_status('Status1', 'st1')
display_message = DisplayMessageWorkflowStatusItem()
display_message.parent = st1
assert display_message.get_line_details() == 'top of page'
display_message.position = 'top'
assert display_message.get_line_details() == 'top of page'
display_message.position = 'bottom'
assert display_message.get_line_details() == 'bottom of page'
display_message.position = 'actions'
assert display_message.get_line_details() == 'with actions'
role = Role(name='foorole')
role.store()
display_message.to = [role.id]
assert display_message.get_line_details() == 'with actions, for foorole'
def test_workflow_roles(pub, emails):
pub.substitutions.feed(MockSubstitutionVariables())
user = pub.user_class(name='foo')
user.email = 'zorg@localhost'
user.store()
Role.wipe()
role1 = Role(name='foo')
role1.emails = ['foo@localhost']
role1.details = 'Hello World'
role1.store()
role2 = Role(name='bar')
role2.emails = ['bar@localhost', 'baz@localhost']
role2.store()
workflow = Workflow(name='wf roles')
st1 = workflow.add_status('Status1', 'st1')
item = SendmailWorkflowStatusItem()
item.to = ['_receiver', '_other']
item.subject = 'Foobar'
item.body = 'Hello'
st1.items.append(item)
item.parent = st1
workflow.roles['_other'] = 'Other Function'
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_roles = {'_receiver': role1.id, '_other': role2.id}
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
emails.empty()
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert set(emails.get('Foobar')['email_rcpt']) == set(
['foo@localhost', 'bar@localhost', 'baz@localhost'])
workflow.roles['_slug-with-dash'] = 'Dashed Function'
workflow.store()
formdef.workflow_roles['_slug-with-dash'] = role1.id
formdef.store()
substvars = formdata.get_substitution_variables()
assert substvars.get('form_role_other_name') == 'bar'
assert substvars.get('form_role_slug_with_dash_name') == 'foo'
assert substvars.get('form_role_slug_with_dash_details') == 'Hello World'
def test_criticality(pub):
FormDef.wipe()
workflow = Workflow(name='criticality')
workflow.criticality_levels = [
WorkflowCriticalityLevel(name='green'),
WorkflowCriticalityLevel(name='yellow'),
WorkflowCriticalityLevel(name='red'),
]
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.workflow_id = workflow.id
formdef.store()
item = ModifyCriticalityWorkflowStatusItem()
formdata = formdef.data_class()()
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'yellow'
formdata = formdef.data_class()()
item.mode = MODE_INC
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'yellow'
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'red'
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'red'
item.mode = MODE_DEC
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'yellow'
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'green'
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'green'
item.mode = MODE_SET
item.absolute_value = 2
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'red'
item.absolute_value = 0
item.perform(formdata)
assert formdata.get_criticality_level_object().name == 'green'
def test_geolocate_address(pub):
formdef = FormDef()
formdef.geolocations = {'base': 'bla'}
formdef.name = 'baz'
formdef.fields = [
StringField(id='1', label='String', type='string', varname='string'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': '169 rue du chateau'}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = GeolocateWorkflowStatusItem()
item.method = 'address_string'
item.address_string = '[form_var_string], paris, france'
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200,
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
item.perform(formdata)
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0]
assert urlparse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
pub.load_site_options()
pub.site_options.set('options', 'nominatim_key', 'KEY')
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200,
json.dumps([{'lat':'48.8337085', 'lon':'2.3233693'}]), None)
item.perform(formdata)
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0]
assert urlparse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
assert 'key=KEY' in http_get_page.call_args[0][0]
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
pub.load_site_options()
pub.site_options.set('options', 'geocoding_service_url', 'http://example.net/')
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200,
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
item.perform(formdata)
assert 'http://example.net/?q=' in http_get_page.call_args[0][0]
pub.site_options.set('options', 'geocoding_service_url', 'http://example.net/?param=value')
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200,
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
item.perform(formdata)
assert 'http://example.net/?param=value&' in http_get_page.call_args[0][0]
# check for invalid ezt
item.address_string = '[if-any], paris, france'
formdata.geolocations = None
item.perform(formdata)
assert formdata.geolocations == {}
# check for None
item.address_string = '=None'
formdata.geolocations = None
item.perform(formdata)
assert formdata.geolocations == {}
# check for nominatim server error
formdata.geolocations = None
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 500,
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
item.perform(formdata)
assert formdata.geolocations == {}
# check for nominatim returning an empty result set
formdata.geolocations = None
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200, json.dumps([]), None)
item.perform(formdata)
assert formdata.geolocations == {}
# check for nominatim bad json
formdata.geolocations = None
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.return_value = (None, 200, 'bad json', None)
item.perform(formdata)
assert formdata.geolocations == {}
# check for nominatim connection error
formdata.geolocations = None
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
http_get_page.side_effect = ConnectionError
item.perform(formdata)
assert formdata.geolocations == {}
def test_geolocate_image(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.geolocations = {'base': 'bla'}
formdef.fields = [
FileField(id='3', label='File', type='file', varname='file'),
]
formdef.store()
upload = PicklableUpload('test.jpeg', 'image/jpeg')
upload.receive([open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()])
formdata = formdef.data_class()()
formdata.data = {'3': upload}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = GeolocateWorkflowStatusItem()
item.method = 'photo_variable'
item.photo_variable = '=form_var_file_raw'
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == -1
assert int(formdata.geolocations['base']['lon']) == 6
# invalid expression
formdata.geolocations = None
item.photo_variable = '=1/0'
item.perform(formdata)
assert formdata.geolocations == {}
# invalid type
formdata.geolocations = None
item.photo_variable = '="bla"'
item.perform(formdata)
assert formdata.geolocations == {}
# invalid photo
upload = PicklableUpload('test.jpeg', 'image/jpeg')
upload.receive([open(os.path.join(os.path.dirname(__file__), 'template.odt'), 'rb').read()])
formdata.data = {'3': upload}
formdata.geolocations = None
item.perform(formdata)
assert formdata.geolocations == {}
def test_geolocate_map(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.geolocations = {'base': 'bla'}
formdef.fields = [
MapField(id='2', label='Map', type='map', varname='map'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'2': '48.8337085;2.3233693'}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = GeolocateWorkflowStatusItem()
item.method = 'map_variable'
item.map_variable = '=form_var_map'
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
# invalid data
formdata.geolocations = None
item.map_variable = '=form_var'
item.perform(formdata)
assert formdata.geolocations == {}
def test_geolocate_overwrite(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.geolocations = {'base': 'bla'}
formdef.fields = [
MapField(id='2', label='Map', type='map', varname='map'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'2': '48.8337085;2.3233693'}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = GeolocateWorkflowStatusItem()
item.method = 'map_variable'
item.map_variable = '=form_var_map'
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 2
formdata.data = {'2': '48.8337085;3.3233693'}
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 3
formdata.data = {'2': '48.8337085;4.3233693'}
item.overwrite = False
item.perform(formdata)
assert int(formdata.geolocations['base']['lat']) == 48
assert int(formdata.geolocations['base']['lon']) == 3
@pytest.mark.skipif(transform_to_pdf is None, reason='libreoffice not found')
def test_transform_to_pdf():
instream = open(os.path.join(os.path.dirname(__file__), 'template.odt'), 'rb')
outstream = transform_to_pdf(instream)
assert outstream is not False
assert outstream.read(10).startswith(b'%PDF-')
def test_export_to_model_image(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
FileField(id='3', label='File', type='file', varname='image'),
]
formdef.store()
upload = PicklableUpload('test.jpeg', 'image/jpeg')
image_data = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
upload.receive([image_data])
formdata = formdef.data_class()()
formdata.data = {'3': upload}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = ExportToModel()
item.convert_to_pdf = False
item.method = 'non-interactive'
template_filename = os.path.join(os.path.dirname(__file__), 'template-with-image.odt')
template = open(template_filename, 'rb').read()
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
upload.fp = BytesIO()
upload.fp.write(template)
upload.fp.seek(0)
item.model_file = UploadedFile(pub.app_dir, None, upload)
item.attach_to_history = True
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].base_filename == 'template.odt'
zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
# check the image has been replaced by the one from the formdata
assert zinfo.file_size == len(image_data)
# check with missing data or wrong kind of data
for field_value in (None, 'wrong kind'):
formdata = formdef.data_class()()
formdata.data = {'3': field_value}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item.perform(formdata)
zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
# check the original image has been left
assert zinfo.file_size == 580
item.filename = 'formulaire-{{form_number}}.odt'
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].base_filename == 'formulaire-%s-%s.odt' % (formdef.id,
formdata.id)
def test_export_to_model_backoffice_field(pub):
wf = Workflow(name='email with attachments')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'),
]
st1 = wf.add_status('Status1')
wf.store()
formdef = FormDef()
formdef.name = 'foo-export-to-bofile'
formdef.fields = [
StringField(id='1', label='String', type='string', varname='string'),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = ExportToModel()
item.method = 'non-interactive'
item.convert_to_pdf = False
template_filename = os.path.join(os.path.dirname(__file__), 'template.odt')
template = open(template_filename, 'rb').read()
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
upload.fp = BytesIO()
upload.fp.write(template)
upload.fp.seek(0)
item.model_file = UploadedFile(pub.app_dir, None, upload)
item.parent = st1
item.backoffice_filefield_id = 'bo1'
item.perform(formdata)
assert 'bo1' in formdata.data
fbo1 = formdata.data['bo1']
assert fbo1.base_filename == 'template.odt'
assert fbo1.content_type == 'application/octet-stream'
zfile = zipfile.ZipFile(fbo1.get_file())
assert b'foo-export-to-bofile' in zfile.read('content.xml')
# no more 'bo1' backoffice field: do nothing
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
# id is not bo1:
wf.backoffice_fields_formdef.fields = [
FileField(id='bo2', label='bo field 2', type='file'),
]
item.perform(formdata)
assert formdata.data == {}
# field is not a field file:
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='bo field 1', type='string'),
]
item.perform(formdata)
assert formdata.data == {}
# no field at all:
wf.backoffice_fields_formdef.fields = []
item.perform(formdata)
assert formdata.data == {}
def test_export_to_model_django_template(pub):
formdef = FormDef()
formdef.name = 'foo-export-to-template-with-django'
formdef.fields = [
StringField(id='1', label='String', type='string', varname='string'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
pub.substitutions.feed(formdata)
item = ExportToModel()
item.method = 'non-interactive'
item.attach_to_history = True
template_filename = os.path.join(os.path.dirname(__file__), 'template-django.odt')
template = open(template_filename, 'rb').read()
upload = QuixoteUpload('/foo/template-django.odt', content_type='application/octet-stream')
upload.fp = BytesIO()
upload.fp.write(template)
upload.fp.seek(0)
item.model_file = UploadedFile(pub.app_dir, None, upload)
item.convert_to_pdf = False
item.perform(formdata)
new_content = zipfile.ZipFile(open(formdata.evolution[0].parts[0].filename, 'rb')).read('content.xml')
assert b'>foo-export-to-template-with-django<' in new_content
formdef.name = 'Name with a \' simple quote'
formdef.store()
item.perform(formdata)
new_content = zipfile.ZipFile(open(formdata.evolution[0].parts[1].filename, 'rb')).read('content.xml')
assert b'>Name with a \' simple quote<' in new_content
formdef.name = 'A <> name'
formdef.store()
item.perform(formdata)
new_content = zipfile.ZipFile(open(formdata.evolution[0].parts[2].filename, 'rb')).read('content.xml')
assert b'>A &lt;&gt; name<' in new_content
def test_global_timeouts(two_pubs):
pub = two_pubs
FormDef.wipe()
Workflow.wipe()
workflow = Workflow(name='global-timeouts')
workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
workflow.criticality_levels = [
WorkflowCriticalityLevel(name='green'),
WorkflowCriticalityLevel(name='yellow'),
WorkflowCriticalityLevel(name='red'),
]
action = workflow.add_global_action('Timeout Test')
item = action.append_item('modify_criticality')
trigger = action.append_trigger('timeout')
trigger.anchor = 'creation'
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = workflow.id
formdef.store()
formdef.data_class().wipe()
formdata1 = formdef.data_class()()
formdata1.just_created()
formdata1.store()
# delay isn't set yet, no crash
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
# delay didn't expire yet, no change
trigger.timeout = '2'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.receipt_time = time.localtime(time.time()-3*86400)
formdata1.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
# make sure it's not triggered a second time
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
# change id so it's triggered again
trigger.id = 'XXX1'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'red'
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'red'
# reset formdata to initial state
formdata1.store()
trigger.anchor = '1st-arrival'
trigger.anchor_status_first = None
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.evolution[-1].time = time.localtime(time.time()-3*86400)
formdata1.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store() # reset
# bad (obsolete) status: do nothing
trigger.anchor_status_first = 'wf-foobar'
workflow.store()
formdata1.evolution[-1].time = time.localtime(time.time()-3*86400)
formdata1.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.store()
trigger.anchor = 'latest-arrival'
trigger.anchor_status_latest = None
workflow.store()
formdata1.evolution[-1].time = time.localtime()
formdata1.store()
formdata1.jump_status('new')
formdata1.evolution[-1].time = time.localtime(time.time()-7*86400)
formdata1.jump_status('accepted')
formdata1.jump_status('new')
formdata1.evolution[-1].time = time.localtime(time.time()-1*86400)
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
formdata1.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# limit trigger to formdata with "accepted" status
trigger.anchor_status_latest = 'wf-accepted'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.store()
# limit trigger to formdata with "new" status
trigger.anchor_status_latest = 'wf-new'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# bad (obsolete) status: do nothing
trigger.anchor_status_latest = 'wf-foobar'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.store()
# check trigger is not run on finalized formdata
formdata1.jump_status('finished')
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
formdata1.store()
trigger.anchor = 'creation'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.store()
# check trigger is run on finalized formdata when anchor status is an
# endpoint
formdata1.jump_status('finished')
formdata1.evolution[-1].last_jump_datetime = None
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
formdata1.store()
trigger.anchor = 'latest-arrival'
trigger.anchor_status_latest = 'wf-finished'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# check "finalized" anchor
trigger.anchor = 'finalized'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# use python expression as anchor
# timestamp
formdata1.jump_status('new')
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
formdata1.evolution[-1].last_jump_datetime = None
formdata1.store()
trigger.anchor = 'python'
trigger.anchor_expression = repr(time.time())
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
trigger.anchor = 'python'
trigger.anchor_expression = repr(time.time() - 10*86400)
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# datetime object
trigger.anchor = 'python'
trigger.anchor_expression = 'datetime.datetime(%s, %s, %s, %s, %s)' % (
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:5]
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# datetime object
trigger.anchor = 'python'
trigger.anchor_expression = 'datetime.date(%s, %s, %s)' % (
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:3]
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# string object
trigger.anchor = 'python'
trigger.anchor_expression = '"%04d-%02d-%02d"' % (
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:3]
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
formdata1.store()
# invalid variable
trigger.anchor = 'python'
trigger.anchor_expression = 'Ellipsis'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.store()
# invalid expression
trigger.anchor = 'python'
trigger.anchor_expression = 'XXX'
workflow.store()
pub.apply_global_action_timeouts()
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
formdata1.store()
def test_profile(two_pubs):
User = two_pubs.user_class
user = User()
user.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
StringField(id='1', label='Test', type='string', varname='foo'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.user_id = user.id
formdata.data = {'1': 'bar@localhost'}
item = UpdateUserProfileStatusItem()
item.fields = [{'field_id': '__email', 'value': '=form_var_foo'}]
item.perform(formdata)
assert User.get(user.id).email == 'bar@localhost'
formdata.data = {'1': 'Plop'}
item.fields = [{'field_id': '__name', 'value': '=form_var_foo'}]
item.perform(formdata)
assert User.get(user.id).name == 'Plop'
item.fields = [{'field_id': '__name', 'value': 'dj{{form_var_foo}}'}]
item.perform(formdata)
assert User.get(user.id).name == 'djPlop'
item.fields = [{'field_id': '__name', 'value': 'ezt[form_var_foo]'}]
item.perform(formdata)
assert User.get(user.id).name == 'eztPlop'
from wcs.admin.settings import UserFieldsFormDef
formdef = UserFieldsFormDef(two_pubs)
formdef.fields = [
StringField(id='3', label='test', type='string', varname='plop'),
DateField(id='4', label='Date', type='date', varname='bar'),
]
formdef.store()
item.fields = [{'field_id': 'plop', 'value': '=form_var_foo'}]
item.perform(formdata)
assert User.get(user.id).form_data.get('3') == 'Plop'
assert not User.get(user.id).form_data.get('4')
# check transmission to IdP
get_publisher().cfg['sp'] = {'idp-manage-user-attributes': True}
get_publisher().cfg['idp'] = {'xxx': {'metadata_url': 'http://idp.example.net/idp/saml2/metadata'}}
get_publisher().write_cfg()
user = User.get(user.id)
user.name_identifiers = ['xyz']
user.store()
for date_value in (
'20/03/2018',
'=utils.make_date("20/03/2018")',
'=utils.make_date("20/03/2018").timetuple()'):
# check local value
item.fields = [{'field_id': 'bar', 'value': date_value}]
item.perform(formdata)
assert User.get(user.id).form_data.get('3') == 'Plop'
assert User.get(user.id).form_data.get('4').tm_year == 2018
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
http_patch_request.return_value = (None, 200, '', None)
get_response().process_after_jobs()
assert http_patch_request.call_count == 1
assert http_patch_request.call_args[0][1] == '{"bar": "2018-03-20"}'
for date_value in ('baddate', '', {}, [], None):
# reset date to a known value
user.form_data['4'] = datetime.datetime.now().timetuple()
user.store()
year = User.get(user.id).form_data.get('4').tm_year
# perform action
item.fields = [{'field_id': 'bar', 'value': date_value}]
item.perform(formdata)
if date_value not in (None, ''): # bad value : do nothing
assert User.get(user.id).form_data.get('4').tm_year == year
else: # empty value : empty field
assert User.get(user.id).form_data.get('4') == None
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
http_patch_request.return_value = (None, 200, '', None)
get_response().process_after_jobs()
assert http_patch_request.call_count == 1
if date_value not in (None, ''): # bad value : do nothing
assert http_patch_request.call_args[0][1] == '{}'
else: # empty value : null field
assert http_patch_request.call_args[0][1] == '{"bar": null}'
def test_set_backoffice_field(http_requests, two_pubs):
Workflow.wipe()
FormDef.wipe()
LoggedError.wipe()
wf = Workflow(name='xxx')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='1st backoffice field',
type='string', varname='backoffice_blah'),
]
st1 = wf.add_status('Status1')
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
StringField(id='00', label='String', type='string', varname='string'),
StringField(id='01', label='Other string', type='string', varname='other'),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'00': 'HELLO'}
formdata.just_created()
formdata.store()
two_pubs.substitutions.feed(formdata)
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data.get('bo1') == None
item.fields = [{'field_id': 'bo1', 'value': '=form_var_string'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO'
item.fields = [{'field_id': 'bo1', 'value': '{{ form_var_string }} WORLD'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO WORLD'
item.fields = [{'field_id': 'bo1', 'value': '[form_var_string] GOODBYE'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO GOODBYE'
item.fields = [{'field_id': 'bo1', 'value': '{{ form.var.string }} LAZY'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO LAZY'
item.fields = [{'field_id': 'bo1', 'value': '=form.var.string'}] # lazy python
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO'
item.fields = [{'field_id': 'bo1', 'value': '=vars().get("form_var_string") + " PLOP"'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO PLOP'
item.fields = [{'field_id': 'bo1',
'value': '=vars().get("form_var_other") or vars().get("form_var_string")'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'HELLO'
item.fields = [{'field_id': 'bo1', 'value': '=vars().get("form_var_bouh", "X") + " PLOP"'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'X PLOP'
assert LoggedError.count() == 0
item.fields = [{'field_id': 'bo1', 'value': '= ~ invalid python ~'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.summary == 'Failed to compute Python expression'
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.expression == ' ~ invalid python ~'
assert logged_error.expression_type == 'python'
assert logged_error.exception_class == 'SyntaxError'
assert logged_error.exception_message == 'invalid syntax (<string>, line 1)'
LoggedError.wipe()
item.fields = [{'field_id': 'bo1', 'value': '{% if bad django %}'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.summary == 'Failed to compute template'
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.expression == '{% if bad django %}'
assert logged_error.expression_type == 'template'
assert logged_error.exception_class == 'TemplateError'
assert logged_error.exception_message.startswith('syntax error in Django template')
def test_set_backoffice_field_file(http_requests, two_pubs):
Workflow.wipe()
FormDef.wipe()
wf = Workflow(name='xxx')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
FileField(id='bo1', label='1st backoffice field',
type='file', varname='backoffice_file'),
]
st1 = wf.add_status('Status1')
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
FileField(id='00', label='File', type='file', varname='file'),
]
formdef.workflow_id = wf.id
formdef.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': '=locals().get("form_var_file_raw")'}]
# the file does not exist
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == None
# store a PiclableUpload
upload = PicklableUpload('test.jpeg', 'image/jpeg')
upload.receive([open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()])
formdata = formdef.data_class()()
formdata.data = {'00': upload}
formdata.just_created()
formdata.store()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'].base_filename == 'test.jpeg'
assert formdata.data['bo1'].content_type == 'image/jpeg'
assert formdata.data['bo1'].get_content() == open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
# same test with PicklableUpload wcs.qommon.form
from wcs.qommon.form import PicklableUpload as PicklableUpload2
upload2 = PicklableUpload2('test2.odt', 'application/vnd.oasis.opendocument.text')
upload2.receive([open(os.path.join(os.path.dirname(__file__), 'template.odt'), 'rb').read()])
formdata = formdef.data_class()()
formdata.data = {'00': upload2}
formdata.just_created()
formdata.store()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'].base_filename == 'test2.odt'
assert formdata.data['bo1'].content_type == 'application/vnd.oasis.opendocument.text'
assert formdata.data['bo1'].get_content() == open(os.path.join(os.path.dirname(__file__), 'template.odt'), 'rb').read()
# check storing response as attachment
two_pubs.substitutions.feed(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.varname = 'xxx'
item.response_type = 'attachment'
item.record_errors = True
item.perform(formdata)
attachment = formdata.evolution[-1].parts[-1]
assert isinstance(attachment, AttachmentEvolutionPart)
assert attachment.base_filename == 'xxx.xml'
assert attachment.content_type == 'text/xml'
formdata = formdef.data_class().get(formdata.id)
two_pubs.substitutions.feed(formdata)
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': '=attachments.xxx'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'].base_filename == 'xxx.xml'
# check storing a file from an assembled dictionary
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1',
'value': '={"content": "hello world", "filename": "hello.txt"}'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'].base_filename == 'hello.txt'
assert formdata.data['bo1'].get_content() == b'hello world'
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1',
'value': '={"b64_content": "SEVMTE8gV09STEQ=", "filename": "hello.txt"}'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'].base_filename == 'hello.txt'
assert formdata.data['bo1'].get_content() == b'HELLO WORLD'
hello_world = formdata.data['bo1']
# check wrong value, or None (no file)
for value in ('="HELLO"', '', 'BAD', '={}', '=[]', None):
formdata.data['bo1'] = hello_world
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': value}]
LoggedError.wipe()
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
if value is not None: # wrong value : do nothing
assert formdata.data['bo1'].base_filename == 'hello.txt'
assert formdata.data['bo1'].get_content() == b'HELLO WORLD'
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.summary.startswith('Failed to convert')
assert logged_error.formdata_id == str(formdata.id)
assert logged_error.exception_class == 'ValueError'
else: # empty value : remove field
assert formdata.data['bo1'] is None
assert LoggedError.count() == 0
# check wrong field
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo3', 'value': '=form_var_file_raw'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data.get('bo1') is None
assert formdata.data.get('bo3') is None
def test_set_backoffice_field_item(two_pubs):
Workflow.wipe()
FormDef.wipe()
wf = Workflow(name='xxx')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
st1 = wf.add_status('Status1')
wf.backoffice_fields_formdef.fields = [
ItemField(id='bo1', label='1st backoffice field',
type='item', varname='backoffice_item',
items=['a', 'b', 'c']),
]
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': 'a'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'a'
assert formdata.data['bo1_display'] == 'a'
datasource = {'type': 'formula',
'value': repr([('a', 'aa'), ('b', 'bb'), ('c', 'cc')])}
wf.backoffice_fields_formdef.fields = [
ItemField(id='bo1', label='1st backoffice field',
type='item', varname='backoffice_item',
data_source=datasource),
]
wf.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': 'a'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'a'
assert formdata.data['bo1_display'] == 'aa'
datasource = {'type': 'formula',
'value': repr([{'id': 'a', 'text': 'aa', 'more': 'aaa'},
{'id': 'b', 'text': 'bb', 'more': 'bbb'}])}
wf.backoffice_fields_formdef.fields = [
ItemField(id='bo1', label='1st backoffice field',
type='item', varname='backoffice_item',
data_source=datasource),
]
wf.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': 'a'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'a'
assert formdata.data['bo1_display'] == 'aa'
assert formdata.data['bo1_structured'] == {'id': 'a', 'more': 'aaa', 'text': 'aa'}
# check when assigning using the display value
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': 'aa'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'a'
assert formdata.data['bo1_display'] == 'aa'
assert formdata.data['bo1_structured'] == {'id': 'a', 'more': 'aaa', 'text': 'aa'}
# check with unknown value
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': 'foobar'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == 'foobar'
assert formdata.data.get('bo1_display') is None
assert formdata.data.get('bo1_structured') is None
def test_set_backoffice_field_items(two_pubs):
Workflow.wipe()
FormDef.wipe()
wf = Workflow(name='xxx')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
st1 = wf.add_status('Status1')
wf.backoffice_fields_formdef.fields = [
ItemsField(id='bo1', label='1st backoffice field',
type='items', varname='backoffice_item',
items=['a', 'b', 'c']),
]
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': "=['a', 'b']"}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == ['a', 'b']
assert formdata.data['bo1_display'] == 'a, b'
datasource = {'type': 'formula',
'value': repr([('a', 'aa'), ('b', 'bb'), ('c', 'cc')])}
wf.backoffice_fields_formdef.fields = [
ItemsField(id='bo1', label='1st backoffice field',
type='items', varname='backoffice_item',
data_source=datasource),
]
wf.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': "=['a', 'b']"}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == ['a', 'b']
assert formdata.data['bo1_display'] == 'aa, bb'
datasource = {'type': 'formula',
'value': repr([{'id': 'a', 'text': 'aa', 'more': 'aaa'},
{'id': 'b', 'text': 'bb', 'more': 'bbb'},
{'id': 'c', 'text': 'cc', 'more': 'ccc'}])}
wf.backoffice_fields_formdef.fields = [
ItemsField(id='bo1', label='1st backoffice field',
type='items', varname='backoffice_item',
data_source=datasource),
]
wf.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': "=['a', 'c']"}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] == ['a', 'c']
assert formdata.data['bo1_display'] == 'aa, cc'
assert len(formdata.data['bo1_structured']) == 2
assert {'id': 'a', 'more': 'aaa', 'text': 'aa'} in formdata.data['bo1_structured']
assert {'id': 'c', 'more': 'ccc', 'text': 'cc'} in formdata.data['bo1_structured']
def test_set_backoffice_field_date(two_pubs):
Workflow.wipe()
FormDef.wipe()
LoggedError.wipe()
wf = Workflow(name='xxx')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
st1 = wf.add_status('Status1')
wf.backoffice_fields_formdef.fields = [
DateField(id='bo1', label='1st backoffice field',
type='date', varname='backoffice_date'),
]
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {}
formdata.just_created()
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': "=utils.today()"}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date.today()
formdata.data['bo1'] = None
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': '{% now "j/n/Y" %}'}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date.today()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': "23/3/2017"}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
# invalid values => do nothing
assert LoggedError.count() == 0
for value in ('plop', {}, []):
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': value}]
LoggedError.wipe()
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
assert LoggedError.count() == 1
assert LoggedError.select()[0].summary.startswith('Failed to convert')
# None : empty date
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [{'field_id': 'bo1', 'value': None}]
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data['bo1'] is None
def test_set_backoffice_field_immediate_use(http_requests, two_pubs):
Workflow.wipe()
FormDef.wipe()
wf = Workflow(name='xxx')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='1st backoffice field',
type='string', varname='backoffice_blah'),
StringField(id='bo2', label='2nd backoffice field',
type='string', varname='backoffice_barr'),
]
st1 = wf.add_status('Status1')
wf.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
StringField(id='00', label='String', type='string', varname='string'),
]
formdef.workflow_id = wf.id
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'00': 'HELLO'}
formdata.just_created()
formdata.store()
item = SetBackofficeFieldsWorkflowStatusItem()
item.parent = st1
item.fields = [
{'field_id': 'bo1', 'value': '{{form_var_string}}'},
]
two_pubs.substitutions.reset()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data.get('bo1') == 'HELLO'
item.fields = [
{'field_id': 'bo1', 'value': 'WORLD'},
]
two_pubs.substitutions.reset()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data.get('bo1') == 'WORLD'
item.fields = [
{'field_id': 'bo1', 'value': 'X{{form_var_string}}X'},
{'field_id': 'bo2', 'value': "Y{{form_var_backoffice_blah}}Y"},
]
two_pubs.substitutions.reset()
two_pubs.substitutions.feed(formdata)
item.perform(formdata)
formdata = formdef.data_class().get(formdata.id)
assert formdata.data.get('bo1') == 'XHELLOX'
assert formdata.data.get('bo2') == 'YXHELLOXY'
def test_redirect_to_url(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [
StringField(id='1', label='Test', type='string', varname='foo'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'1': 'bar'}
item = RedirectToUrlWorkflowStatusItem()
assert item.render_as_line() == 'Web Redirection (not configured)'
item.url = 'https://www.example.net/?foo=[form_var_foo]'
assert item.render_as_line() == 'Web Redirection (to https://www.example.net/?foo=[form_var_foo])'
pub.substitutions.feed(formdata)
assert item.perform(formdata) == 'https://www.example.net/?foo=bar'
item.url = 'https://www.example.net/?django={{ form_var_foo }}'
assert item.render_as_line() == 'Web Redirection (to https://www.example.net/?django={{ form_var_foo }})'
pub.substitutions.feed(formdata)
assert item.perform(formdata) == 'https://www.example.net/?django=bar'
item.url = '[if-any nada]https://www.example.net/[end]'
pub.substitutions.feed(formdata)
assert item.perform(formdata) == None
item.url = '{% if nada %}https://www.example.net/{% endif %}'
pub.substitutions.feed(formdata)
assert item.perform(formdata) == None
def test_workflow_jump_condition_migration(pub):
workflow = Workflow(name='jump condition migration')
st1 = workflow.add_status('Status1', 'st1')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
st1.items.append(jump)
jump.parent = st1
workflow.store()
reloaded_workflow = Workflow.get(workflow.id)
assert reloaded_workflow.possible_status[0].items[0].condition is None
jump.condition = 'foobar'
workflow.store()
reloaded_workflow = Workflow.get(workflow.id)
assert reloaded_workflow.possible_status[0].items[0].condition == {
'type': 'python', 'value': 'foobar'}
def test_workflow_action_condition(two_pubs):
pub = two_pubs
pub._set_request(None) # to avoid after jobs
workflow = Workflow(name='jump condition migration')
st1 = workflow.add_status('Status1', 'st1')
workflow.store()
role = Role(name='bar1')
role.store()
user = pub.user_class()
user.roles = [role.id]
user.store()
choice = ChoiceWorkflowStatusItem()
choice.by = [role.id]
choice.id = '_x'
st1.items.append(choice)
choice.parent = st1
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
formdef.workflow_id = workflow.id
formdef.store()
formdata1 = formdef.data_class()()
formdata1.data = {'1': 'foo'}
formdata1.just_created()
formdata1.store()
formdata2 = formdef.data_class()()
formdata2.data = {'2': 'bar'}
formdata2.just_created()
formdata2.store()
assert formdata1.get_actions_roles() == set([role.id])
assert formdata2.get_actions_roles() == set([role.id])
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 2
choice.condition = {'type': 'python', 'value': 'form_var_foo == "foo"'}
workflow.store()
with pub.substitutions.temporary_feed(formdata1):
assert FormDef.get(formdef.id).data_class().get(formdata1.id).get_actions_roles() == set([role.id])
with pub.substitutions.temporary_feed(formdata2):
assert FormDef.get(formdef.id).data_class().get(formdata2.id).get_actions_roles() == set()
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 1
# check with a formdef condition
choice.condition = {'type': 'python', 'value': 'form_name == "test"'}
workflow.store()
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 0
choice.condition = {'type': 'python', 'value': 'form_name == "baz"'}
workflow.store()
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 2
# bad condition
LoggedError.wipe()
choice.condition = {'type': 'python', 'value': 'foobar == barfoo'}
workflow.store()
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 0
assert LoggedError.count() == 1
logged_error = LoggedError.select()[0]
assert logged_error.occurences_count > 1 # should be 2... == 12 with pickle, 4 with sql
assert logged_error.summary == 'Failed to evaluate condition'
assert logged_error.exception_class == 'NameError'
assert logged_error.exception_message == "name 'foobar' is not defined"
assert logged_error.expression == 'foobar == barfoo'
assert logged_error.expression_type == 'python'
def test_notifications(pub, http_requests):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
assert not SendNotificationWorkflowStatusItem.is_available()
if not pub.site_options.has_section('variables'):
pub.site_options.add_section('variables')
pub.site_options.set('variables', 'portal_url', 'https://portal/')
assert SendNotificationWorkflowStatusItem.is_available()
item = SendNotificationWorkflowStatusItem()
assert item.to == ['_submitter']
item.title = 'xxx'
item.body = 'XXX'
# no user
http_requests.empty()
item.perform(formdata)
assert http_requests.count() == 0
# user
http_requests.empty()
user = pub.user_class()
user.name_identifiers = ['xxx']
user.store()
formdata.user_id = user.id
formdata.store()
item.perform(formdata)
assert http_requests.count() == 1
assert http_requests.get_last('url') == 'https://portal/api/notification/add/?NameID=xxx'
assert json.loads(http_requests.get_last('body')) == {
'body': 'XXX',
'url': formdata.get_url(),
'id': 'formdata:%s' % formdata.get_display_id(),
'origin': '',
'summary': 'xxx'
}
# roles (not exposed in current UI)
http_requests.empty()
role = Role(name='blah')
role.store()
user1 = pub.user_class()
user1.roles = [role.id]
user1.name_identifiers = ['xxy1']
user1.store()
user2 = pub.user_class()
user2.roles = [role.id]
user2.name_identifiers = ['xxy2']
user2.store()
formdef.workflow_roles = {'_receiver': role.id}
item.to = ['_receiver']
item.perform(formdata)
assert http_requests.count() == 2
assert set(x['url'] for x in http_requests.requests) == set([
'https://portal/api/notification/add/?NameID=xxy1',
'https://portal/api/notification/add/?NameID=xxy2'])
def test_workflow_field_migration(pub):
Workflow.wipe()
wf = Workflow(name='wf with backoffice field')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='bo field 1', type='string', in_listing=True),
]
wf.add_status('Status1')
wf.store()
wf = Workflow.get(wf.id)
assert wf.backoffice_fields_formdef.fields[0].display_locations == ['validation', 'summary', 'listings']