4239 lines
146 KiB
Python
4239 lines
146 KiB
Python
import json
|
|
import datetime
|
|
import os
|
|
import pytest
|
|
import shutil
|
|
import time
|
|
import zipfile
|
|
|
|
import mock
|
|
|
|
from django.utils import six
|
|
from django.utils.encoding import force_bytes
|
|
from django.utils.six import BytesIO, StringIO
|
|
from django.utils.six.moves.urllib import parse as urlparse
|
|
|
|
from quixote import cleanup, get_response
|
|
from wcs.qommon.errors import ConnectionError
|
|
from quixote.http_request import Upload as QuixoteUpload
|
|
from wcs.qommon.http_request import HTTPRequest
|
|
from wcs.qommon.form import *
|
|
|
|
from wcs.formdef import FormDef
|
|
from wcs import sessions
|
|
from wcs.fields import (StringField, DateField, MapField, FileField, ItemField,
|
|
ItemsField, CommentField)
|
|
from wcs.logged_errors import LoggedError
|
|
from wcs.roles import Role
|
|
from wcs.workflows import (Workflow, WorkflowStatusItem,
|
|
SendmailWorkflowStatusItem, SendSMSWorkflowStatusItem,
|
|
CommentableWorkflowStatusItem, ChoiceWorkflowStatusItem,
|
|
DisplayMessageWorkflowStatusItem,
|
|
AbortActionException, WorkflowCriticalityLevel,
|
|
AttachmentEvolutionPart, WorkflowBackofficeFieldsFormDef)
|
|
from wcs.wf.anonymise import AnonymiseWorkflowStatusItem
|
|
from wcs.wf.criticality import ModifyCriticalityWorkflowStatusItem, MODE_INC, MODE_DEC, MODE_SET
|
|
from wcs.wf.dispatch import DispatchWorkflowStatusItem
|
|
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
|
|
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
|
|
from wcs.wf.timeout_jump import TimeoutWorkflowStatusItem
|
|
from wcs.wf.profile import UpdateUserProfileStatusItem
|
|
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem, JournalEvolutionPart
|
|
from wcs.wf.remove import RemoveWorkflowStatusItem
|
|
from wcs.wf.roles import AddRoleWorkflowStatusItem, RemoveRoleWorkflowStatusItem
|
|
from wcs.wf.wscall import WebserviceCallStatusItem
|
|
from wcs.wf.export_to_model import ExportToModel, transform_to_pdf
|
|
from wcs.wf.geolocate import GeolocateWorkflowStatusItem
|
|
from wcs.wf.backoffice_fields import SetBackofficeFieldsWorkflowStatusItem
|
|
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
|
|
from wcs.wf.notification import SendNotificationWorkflowStatusItem
|
|
|
|
from utilities import (create_temporary_pub, MockSubstitutionVariables,
|
|
clean_temporary_pub)
|
|
|
|
|
|
def setup_module(module):
|
|
cleanup()
|
|
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
|
|
|
|
def pytest_generate_tests(metafunc):
|
|
if 'two_pubs' in metafunc.fixturenames:
|
|
metafunc.parametrize('two_pubs', ['pickle', 'sql'], indirect=True)
|
|
|
|
|
|
@pytest.fixture
|
|
def pub(request):
|
|
pub = create_temporary_pub()
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
|
|
req.response.filter = {}
|
|
req._user = None
|
|
pub._set_request(req)
|
|
req.session = sessions.BasicSession(id=1)
|
|
pub.set_config(req)
|
|
return pub
|
|
|
|
|
|
@pytest.fixture
|
|
def two_pubs(request):
|
|
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
|
|
req.response.filter = {}
|
|
req._user = None
|
|
pub._set_request(req)
|
|
req.session = sessions.BasicSession(id=1)
|
|
pub.set_config(req)
|
|
return pub
|
|
|
|
|
|
def test_get_json_export_dict(pub):
|
|
workflow = Workflow(name='wf')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
st2.forced_endpoint = True
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = ['_submitter', '_receiver']
|
|
jump.timeout = 0.1
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
workflow.roles['_other'] = 'Other Function'
|
|
root = workflow.get_json_export_dict()
|
|
assert set(root.keys()) >= set(['statuses', 'name', 'functions'])
|
|
|
|
assert root['name'] == 'wf'
|
|
assert len(root['statuses']) == 2
|
|
assert set(st['id'] for st in root['statuses']) == set(['st1', 'st2'])
|
|
assert all(set(status.keys()) >= set(['id', 'name', 'forced_endpoint']) for status in
|
|
root['statuses'])
|
|
assert root['statuses'][0]['id'] == 'st1'
|
|
assert root['statuses'][0]['name'] == 'Status1'
|
|
assert root['statuses'][0]['forced_endpoint'] is False
|
|
assert root['statuses'][0]['endpoint'] is False
|
|
assert root['statuses'][1]['id'] == 'st2'
|
|
assert root['statuses'][1]['name'] == 'Status2'
|
|
assert root['statuses'][1]['forced_endpoint'] is True
|
|
assert root['statuses'][1]['endpoint'] is True
|
|
|
|
|
|
def test_variable_compute(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'1': 'hello'}
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = JumpWorkflowStatusItem()
|
|
|
|
# straight string
|
|
assert item.compute('blah') == 'blah'
|
|
|
|
# django template
|
|
assert item.compute('{{ form_var_foo }}') == 'hello'
|
|
assert item.compute('{{ form_var_foo }}', render=False) == '{{ form_var_foo }}'
|
|
assert item.compute('{% if form_var_foo %}its here{% endif %}') == 'its here'
|
|
assert item.compute('{% if form_var_foo %}') == '{% if form_var_foo %}'
|
|
with pytest.raises(Exception):
|
|
item.compute('{% if form_var_foo %}', raises=True)
|
|
|
|
# ezt string
|
|
assert item.compute('[form_var_foo]') == 'hello'
|
|
# ezt string, but not ezt asked
|
|
assert item.compute('[form_var_foo]', render=False) == '[form_var_foo]'
|
|
# ezt string, with an error
|
|
assert item.compute('[end]', raises=False) == '[end]'
|
|
with pytest.raises(Exception):
|
|
item.compute('[end]', raises=True)
|
|
|
|
# python expression
|
|
assert item.compute('=form_var_foo') == 'hello'
|
|
# python expression, with an error
|
|
assert item.compute('=1/0', raises=False) == '=1/0'
|
|
with pytest.raises(Exception):
|
|
item.compute('=1/0', raises=True)
|
|
|
|
# with context
|
|
assert item.compute('{{ form_var_foo }} {{ bar }}', context={'bar': 'world'}) == 'hello world'
|
|
assert item.compute('[form_var_foo] [bar]', context={'bar': 'world'}) == 'hello world'
|
|
assert item.compute('=form_var_foo + " " + bar', context={'bar': 'world'}) == 'hello world'
|
|
|
|
# django wins
|
|
assert item.compute('{{ form_var_foo }} [bar]', context={'bar': 'world'}) == 'hello [bar]'
|
|
|
|
# django template, no escaping by default
|
|
formdata.data = {'1': '<b>hello</b>'}
|
|
formdata.store()
|
|
assert item.compute('{{ form_var_foo }}') == '<b>hello</b>' # autoescape off by default
|
|
assert item.compute('{{ form_var_foo|safe }}') == '<b>hello</b>' # no escaping (implicit |safe)
|
|
assert item.compute('{{ form_var_foo|escape }}') == '<b>hello</b>' #escaping
|
|
|
|
|
|
def test_variable_compute_dates(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'1': '2017-07-17'}
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = JumpWorkflowStatusItem()
|
|
|
|
assert item.compute('=date(form_var_foo)') == datetime.date(2017, 7, 17)
|
|
assert item.compute('=date(form_var_foo) + days(1)') == datetime.date(2017, 7, 18)
|
|
assert item.compute('=date(2017, 7, 18)') == datetime.date(2017, 7, 18)
|
|
|
|
|
|
def test_jump_nothing(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
item = JumpWorkflowStatusItem()
|
|
assert item.must_jump(formdata) is True
|
|
|
|
|
|
def test_jump_datetime_condition(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
item = JumpWorkflowStatusItem()
|
|
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
|
item.condition = {'type': 'python', 'value': 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
|
|
yesterday.timetuple()[:3]}
|
|
assert item.must_jump(formdata) is True
|
|
|
|
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
|
|
item.condition = {'type': 'python', 'value': 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
|
|
tomorrow.timetuple()[:3]}
|
|
assert item.must_jump(formdata) is False
|
|
|
|
|
|
def test_jump_date_conditions(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.fields = [DateField(id='2', label='Date', type='date', varname='date')]
|
|
formdef.store()
|
|
|
|
# create/store/get, to make sure the date format is acceptable
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'2': DateField().convert_value_from_str('2015-01-04')}
|
|
formdata.store()
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = JumpWorkflowStatusItem()
|
|
item.condition = {
|
|
'type': 'python',
|
|
'value': 'utils.make_date(form_var_date) == utils.make_date("2015-01-04")'}
|
|
assert item.must_jump(formdata) is True
|
|
|
|
item = JumpWorkflowStatusItem()
|
|
item.condition = {
|
|
'type': 'python',
|
|
'value': 'utils.time_delta(form_var_date, "2015-01-04").days == 0'}
|
|
assert item.must_jump(formdata) is True
|
|
|
|
item = JumpWorkflowStatusItem()
|
|
item.condition = {
|
|
'type': 'python',
|
|
'value': 'utils.time_delta(utils.today(), "2015-01-04").days > 0'}
|
|
assert item.must_jump(formdata) is True
|
|
|
|
item = JumpWorkflowStatusItem()
|
|
item.condition = {
|
|
'type': 'python',
|
|
'value': 'utils.time_delta(datetime.datetime.now(), "2015-01-04").days > 0'}
|
|
assert item.must_jump(formdata) is True
|
|
|
|
item = JumpWorkflowStatusItem()
|
|
item.condition = {
|
|
'type': 'python',
|
|
'value': 'utils.time_delta(utils.time.localtime(), "2015-01-04").days > 0'}
|
|
assert item.must_jump(formdata) is True
|
|
|
|
|
|
def test_jump_count_condition(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.store()
|
|
pub.substitutions.feed(formdef)
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
item = JumpWorkflowStatusItem()
|
|
item.condition = {'type': 'python', 'value': 'form_objects.count < 2'}
|
|
assert item.must_jump(formdata) is True
|
|
|
|
for i in range(10):
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
|
|
item.condition = {'type': 'python', 'value': 'form_objects.count < 2'}
|
|
assert item.must_jump(formdata) is False
|
|
|
|
|
|
def test_jump_bad_python_condition(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.store()
|
|
pub.substitutions.feed(formdef)
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
item = JumpWorkflowStatusItem()
|
|
|
|
LoggedError.wipe()
|
|
item.condition = {'type': 'python', 'value': 'form_var_foobar == 0'}
|
|
assert item.must_jump(formdata) is False
|
|
assert LoggedError.count() == 1
|
|
logged_error = LoggedError.select()[0]
|
|
assert logged_error.summary == 'Failed to evaluate condition'
|
|
assert logged_error.exception_class == 'NameError'
|
|
assert logged_error.exception_message == "name 'form_var_foobar' is not defined"
|
|
assert logged_error.expression == 'form_var_foobar == 0'
|
|
assert logged_error.expression_type == 'python'
|
|
|
|
LoggedError.wipe()
|
|
item.condition = {'type': 'python', 'value': '~ invalid ~'}
|
|
assert item.must_jump(formdata) is False
|
|
assert LoggedError.count() == 1
|
|
logged_error = LoggedError.select()[0]
|
|
assert logged_error.summary == 'Failed to evaluate condition'
|
|
assert logged_error.exception_class == 'SyntaxError'
|
|
assert logged_error.exception_message == 'unexpected EOF while parsing (<string>, line 1)'
|
|
assert logged_error.expression == '~ invalid ~'
|
|
assert logged_error.expression_type == 'python'
|
|
|
|
|
|
def test_jump_django_conditions(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'1': 'hello'}
|
|
pub.substitutions.feed(formdata)
|
|
item = JumpWorkflowStatusItem()
|
|
|
|
LoggedError.wipe()
|
|
item.condition = {'type': 'django', 'value': '1 < 2'}
|
|
assert item.must_jump(formdata) is True
|
|
|
|
item.condition = {'type': 'django', 'value': 'form_var_foo == "hello"'}
|
|
assert item.must_jump(formdata) is True
|
|
|
|
item.condition = {'type': 'django', 'value': 'form_var_foo|first|upper == "H"'}
|
|
assert item.must_jump(formdata) is True
|
|
|
|
item.condition = {'type': 'django', 'value': 'form_var_foo|first|upper == "X"'}
|
|
assert item.must_jump(formdata) is False
|
|
|
|
assert LoggedError.count() == 0
|
|
|
|
item.condition = {'type': 'django', 'value': '~ invalid ~'}
|
|
assert item.must_jump(formdata) is False
|
|
assert LoggedError.count() == 1
|
|
logged_error = LoggedError.select()[0]
|
|
assert logged_error.summary == 'Failed to evaluate condition'
|
|
assert logged_error.exception_class == 'TemplateSyntaxError'
|
|
assert logged_error.exception_message == "Could not parse the remainder: '~' from '~'"
|
|
assert logged_error.expression == '~ invalid ~'
|
|
assert logged_error.expression_type == 'django'
|
|
|
|
|
|
def test_check_auth(pub):
|
|
user = pub.user_class(name='foo')
|
|
user.store()
|
|
|
|
role = Role(name='bar1')
|
|
role.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
|
|
status_item = WorkflowStatusItem()
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
status_item.by = []
|
|
assert status_item.check_auth(formdata, user) is False
|
|
|
|
status_item.by = ['logged-users']
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
status_item.by = [role.id]
|
|
assert status_item.check_auth(formdata, user) is False
|
|
status_item.by = [int(role.id)]
|
|
assert status_item.check_auth(formdata, user) is False
|
|
|
|
user.roles = [role.id]
|
|
status_item.by = [role.id]
|
|
assert status_item.check_auth(formdata, user) is True
|
|
status_item.by = [int(role.id)]
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
status_item.by = ['_submitter']
|
|
assert status_item.check_auth(formdata, user) is False
|
|
formdata.user_id = user.id
|
|
assert status_item.check_auth(formdata, user) is True
|
|
formdata.user_id = None
|
|
|
|
status_item.by = ['_receiver']
|
|
assert status_item.check_auth(formdata, user) is False
|
|
formdata.workflow_roles = {'_receiver': user.id}
|
|
assert status_item.check_auth(formdata, user) is True
|
|
formdef.workflow_roles = {'_receiver': user.id}
|
|
formdata.workflow_roles = None
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
|
|
def test_dispatch(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
Role.wipe()
|
|
role = Role(name='xxx')
|
|
role.store()
|
|
|
|
item = DispatchWorkflowStatusItem()
|
|
|
|
formdata = formdef.data_class()()
|
|
item.perform(formdata)
|
|
assert not formdata.workflow_roles
|
|
|
|
formdata = formdef.data_class()()
|
|
item.role_key = '_receiver'
|
|
item.role_id = role.id
|
|
item.perform(formdata)
|
|
assert formdata.workflow_roles == {'_receiver': role.id}
|
|
|
|
|
|
def test_dispatch_auto(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
StringField(id='1', label='Test', type='string', varname='foo'),
|
|
]
|
|
formdef.store()
|
|
|
|
item = DispatchWorkflowStatusItem()
|
|
item.role_key = '_receiver'
|
|
item.dispatch_type = 'automatic'
|
|
|
|
formdata = formdef.data_class()()
|
|
pub.substitutions.reset()
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert not formdata.workflow_roles
|
|
|
|
Role.wipe()
|
|
role1 = Role('xxx1')
|
|
role1.store()
|
|
role2 = Role('xxx2')
|
|
role2.store()
|
|
|
|
for variable in ('form_var_foo', '{{form_var_foo}}'):
|
|
formdata.data = {}
|
|
formdata.workflow_roles = {}
|
|
item.variable = variable
|
|
item.rules = [
|
|
{'role_id': role1.id, 'value': 'foo'},
|
|
{'role_id': role2.id, 'value': 'bar'},
|
|
]
|
|
|
|
pub.substitutions.reset()
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert not formdata.workflow_roles
|
|
|
|
# no match
|
|
formdata.data = {'1': 'XXX'}
|
|
pub.substitutions.reset()
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert not formdata.workflow_roles
|
|
|
|
# match
|
|
formdata.data = {'1': 'foo'}
|
|
pub.substitutions.reset()
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert formdata.workflow_roles == {'_receiver': role1.id}
|
|
|
|
# other match
|
|
formdata.data = {'1': 'bar'}
|
|
pub.substitutions.reset()
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert formdata.workflow_roles == {'_receiver': role2.id}
|
|
|
|
|
|
def test_dispatch_computed(pub, caplog):
|
|
pub.cfg['debug'] = {'logger': True}
|
|
pub.write_cfg()
|
|
pub.get_app_logger(force=True) # force new logger
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
Role.wipe()
|
|
role = Role(name='xxx')
|
|
role.slug = 'yyy'
|
|
role.store()
|
|
|
|
item = DispatchWorkflowStatusItem()
|
|
|
|
formdata = formdef.data_class()()
|
|
item.perform(formdata)
|
|
assert not formdata.workflow_roles
|
|
|
|
formdata = formdef.data_class()()
|
|
item.role_key = '_receiver'
|
|
item.role_id = '="yyy"' # slug
|
|
item.perform(formdata)
|
|
assert formdata.workflow_roles == {'_receiver': role.id}
|
|
|
|
formdata = formdef.data_class()()
|
|
item.role_key = '_receiver'
|
|
item.role_id = '="xxx"' # name
|
|
item.perform(formdata)
|
|
assert formdata.workflow_roles == {'_receiver': role.id}
|
|
|
|
# unknown role
|
|
formdata = formdef.data_class()()
|
|
item.role_key = '_receiver'
|
|
item.role_id = '="foobar"'
|
|
item.perform(formdata)
|
|
assert not formdata.workflow_roles
|
|
assert caplog.records[-1].message == 'error in dispatch, missing role (="foobar")'
|
|
|
|
|
|
def test_roles(pub):
|
|
user = pub.user_class()
|
|
user.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.user_id = user.id
|
|
|
|
item = AddRoleWorkflowStatusItem()
|
|
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
item.role_id = '1'
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == ['1']
|
|
|
|
user.roles = None
|
|
user.store()
|
|
item = RemoveRoleWorkflowStatusItem()
|
|
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
item.role_id = '1'
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
user.roles = ['1']
|
|
user.store()
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
user.roles = ['2', '1']
|
|
user.store()
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == ['2']
|
|
|
|
|
|
def test_add_remove_computed_roles(pub):
|
|
user = pub.user_class()
|
|
user.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.user_id = user.id
|
|
|
|
role = Role(name='plop')
|
|
role.store()
|
|
role2 = Role(name='xxx')
|
|
role2.store()
|
|
|
|
item = AddRoleWorkflowStatusItem()
|
|
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
item.role_id = role.name
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == [role.id]
|
|
|
|
user.roles = None
|
|
user.store()
|
|
item = RemoveRoleWorkflowStatusItem()
|
|
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
item.role_id = role.name
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
user.roles = [role.id]
|
|
user.store()
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
user.roles = [role2.id, role.id]
|
|
user.store()
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == [role2.id]
|
|
|
|
|
|
def test_roles_idp(pub):
|
|
pub.cfg['sp'] = {'idp-manage-user-attributes': True}
|
|
pub.cfg['idp'] = {'xxx': {'metadata_url': 'http://idp.example.net/idp/saml2/metadata'}}
|
|
pub.write_cfg()
|
|
user = pub.user_class()
|
|
user.name_identifiers = ['xxx']
|
|
user.store()
|
|
|
|
role = Role(name='bar1')
|
|
role.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.user_id = user.id
|
|
|
|
item = AddRoleWorkflowStatusItem()
|
|
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
|
|
http_post_request.return_value = (None, 201, '', None)
|
|
get_response().process_after_jobs()
|
|
assert http_post_request.call_count == 0
|
|
|
|
item.role_id = role.id
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == [role.id]
|
|
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
|
|
http_post_request.return_value = (None, 201, '', None)
|
|
get_response().process_after_jobs()
|
|
assert http_post_request.call_count == 1
|
|
assert http_post_request.call_args[0][0].startswith(
|
|
'http://idp.example.net/api/roles/bar1/members/xxx/')
|
|
assert 'signature=' in http_post_request.call_args[0][0]
|
|
|
|
user.roles = None
|
|
user.store()
|
|
|
|
item2 = RemoveRoleWorkflowStatusItem()
|
|
|
|
item2.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
|
|
http_delete_request.return_value = (None, 200, '', None)
|
|
get_response().process_after_jobs()
|
|
assert http_delete_request.call_count == 0
|
|
|
|
item2.role_id = role.id
|
|
user.roles = [role.id]
|
|
user.store()
|
|
item2.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
|
|
http_delete_request.return_value = (None, 200, '', None)
|
|
get_response().process_after_jobs()
|
|
assert http_delete_request.call_count == 1
|
|
assert http_delete_request.call_args[0][0].startswith(
|
|
'http://idp.example.net/api/roles/bar1/members/xxx/')
|
|
assert 'signature=' in http_delete_request.call_args[0][0]
|
|
|
|
# out of http request/response cycle
|
|
pub._set_request(None)
|
|
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
|
|
http_post_request.return_value = (None, 201, '', None)
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == [role.id]
|
|
|
|
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
|
|
http_delete_request.return_value = (None, 200, '', None)
|
|
item2.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == []
|
|
|
|
|
|
def test_anonymise(two_pubs):
|
|
# build a backoffice field
|
|
Workflow.wipe()
|
|
wf = Workflow(name='wf with backoffice field')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
wf.backoffice_fields_formdef.fields = [
|
|
StringField(id='bo1', label='bo field 1', type='string'),
|
|
ItemField(id='bo2', label='list', type='item', items=['bofoo', 'bobar']),
|
|
]
|
|
wf.add_status('Status1')
|
|
wf.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
StringField(id='1', label='field 1', type='string'),
|
|
ItemField(id='2', label='list', type='item', items=['abc', 'def']),
|
|
]
|
|
formdef.workflow_id = wf.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.user_id = '1'
|
|
formdata.data = {'1': 'foo',
|
|
'2': 'abc', '2_display': 'abc',
|
|
'bo1': 'bar',
|
|
'bo2': 'foo', 'bo2_display': 'foo'}
|
|
formdata.workflow_data = {'e': 'mc2'}
|
|
formdata.submission_context = {'foo': 'bar'}
|
|
formdata.store()
|
|
|
|
item = AnonymiseWorkflowStatusItem()
|
|
item.perform(formdata)
|
|
assert formdef.data_class().get(formdata.id).user_id is None
|
|
assert formdef.data_class().get(formdata.id).anonymised
|
|
assert formdef.data_class().get(formdata.id).submission_context is None
|
|
assert formdef.data_class().get(formdata.id).data == {'1': None,
|
|
'2': 'abc', '2_display': 'abc',
|
|
'bo1': None,
|
|
'bo2': 'foo', 'bo2_display': 'foo'}
|
|
assert formdef.data_class().get(formdata.id).workflow_data is None
|
|
assert formdef.data_class().get(formdata.id).evolution[0].who is None
|
|
|
|
|
|
def test_remove(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
|
|
item = RemoveWorkflowStatusItem()
|
|
assert formdef.data_class().has_key(formdata.id)
|
|
assert item.perform(formdata) == 'http://example.net'
|
|
assert not formdef.data_class().has_key(formdata.id)
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
|
|
item = RemoveWorkflowStatusItem()
|
|
req = pub.get_request()
|
|
req.response.filter['in_backoffice'] = True
|
|
assert formdef.data_class().has_key(formdata.id)
|
|
assert item.perform(formdata) == '..'
|
|
assert not formdef.data_class().has_key(formdata.id)
|
|
req.response.filter = {}
|
|
assert req.session.message
|
|
|
|
|
|
def test_register_comment(pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = RegisterCommenterWorkflowStatusItem()
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == ''
|
|
|
|
item.comment = 'Hello world'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>Hello world</p>'
|
|
|
|
item.comment = '<div>Hello world</div>'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div>Hello world</div>'
|
|
|
|
item.comment = '{{ test }}'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == ''
|
|
|
|
item.comment = '[test]'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>[test]</p>'
|
|
|
|
item.comment = '{{ bar }}'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div>Foobar</div>'
|
|
|
|
item.comment = '[bar]'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>Foobar</p>'
|
|
|
|
item.comment = '<p>{{ foo }}</p>'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 < 3</p>'
|
|
|
|
item.comment = '<p>{{ foo|safe }}</p>'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 < 3</p>'
|
|
|
|
item.comment = '{{ foo }}'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
|
|
|
|
item.comment = '{{ foo|safe }}'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
|
|
|
|
item.comment = '[foo]'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 < 3</p>'
|
|
|
|
item.comment = '<div>{{ foo }}</div>'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
|
|
|
|
item.comment = '<div>[foo]</div>'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
|
|
|
|
|
|
def test_register_comment_django_escaping(pub, emails):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {'1': '<p>hello</p>'}
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = RegisterCommenterWorkflowStatusItem()
|
|
item.comment = '<div>{{form_var_foo}}</div>'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div><p>hello</p></div>'
|
|
|
|
# |safe
|
|
item = RegisterCommenterWorkflowStatusItem()
|
|
item.comment = '<div>{{form_var_foo|safe}}</div>'
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div><p>hello</p></div>'
|
|
|
|
|
|
def test_register_comment_attachment(pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = RegisterCommenterWorkflowStatusItem()
|
|
item.perform(formdata)
|
|
formdata.evolution[-1]._display_parts = None
|
|
assert formdata.evolution[-1].display_parts()[-1] == ''
|
|
|
|
if os.path.exists(os.path.join(get_publisher().app_dir, 'attachments')):
|
|
shutil.rmtree(os.path.join(get_publisher().app_dir, 'attachments'))
|
|
|
|
formdata.evolution[-1].parts = [AttachmentEvolutionPart('hello.txt',
|
|
fp=BytesIO(b'hello world'), varname='testfile')]
|
|
formdata.store()
|
|
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
|
|
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
|
|
assert len(subdir) == 4
|
|
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
|
|
|
|
item.comment = '{{ attachments.testfile.url }}'
|
|
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
url1 = formdata.evolution[-1].parts[-1].content
|
|
|
|
pub.substitutions.feed(formdata)
|
|
item.comment = '{{ form_attachments.testfile.url }}'
|
|
item.perform(formdata)
|
|
url2 = formdata.evolution[-1].parts[-1].content
|
|
|
|
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
|
|
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
|
|
assert len(subdir) == 4
|
|
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
|
|
assert url1 == url2
|
|
|
|
# test with a condition
|
|
item.comment = '{% if form_attachments.testfile %}file is there{% endif %}'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].parts[-1].content == '<div>file is there</div>'
|
|
item.comment = '{% if form_attachments.nope %}file is there{% endif %}'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].parts[-1].content == ''
|
|
|
|
# test with an action condition
|
|
item.condition = {'type': 'django', 'value': 'form_attachments.testfile'}
|
|
assert item.check_condition(formdata) is True
|
|
|
|
item.condition = {'type': 'django', 'value': 'form_attachments.missing'}
|
|
assert item.check_condition(formdata) is False
|
|
|
|
pub.substitutions.feed(formdata)
|
|
item.comment = '[attachments.testfile.url]'
|
|
item.perform(formdata)
|
|
url3 = formdata.evolution[-1].parts[-1].content
|
|
pub.substitutions.feed(formdata)
|
|
item.comment = '[form_attachments.testfile.url]'
|
|
item.perform(formdata)
|
|
url4 = formdata.evolution[-1].parts[-1].content
|
|
assert url3 == url4
|
|
|
|
|
|
def test_register_comment_with_attachment_file(pub):
|
|
wf = Workflow(name='comment with attachments')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
wf.backoffice_fields_formdef.fields = [
|
|
FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'),
|
|
]
|
|
st1 = wf.add_status('Status1')
|
|
wf.store()
|
|
|
|
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
|
jpg = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
|
|
upload.receive([jpg])
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
FileField(id='1', label='File', type='file', varname='frontoffice_file'),
|
|
]
|
|
formdef.workflow_id = wf.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'1': upload}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
pub.substitutions.feed(formdata)
|
|
|
|
setbo = SetBackofficeFieldsWorkflowStatusItem()
|
|
setbo.parent = st1
|
|
setbo.fields = [{'field_id': 'bo1', 'value': '=form_var_frontoffice_file_raw'}]
|
|
setbo.perform(formdata)
|
|
|
|
if os.path.exists(os.path.join(get_publisher().app_dir, 'attachments')):
|
|
shutil.rmtree(os.path.join(get_publisher().app_dir, 'attachments'))
|
|
|
|
comment_text = 'File is attached to the form history'
|
|
|
|
item = RegisterCommenterWorkflowStatusItem()
|
|
item.attachments = ['form_var_backoffice_file1_raw']
|
|
item.comment = comment_text
|
|
item.perform(formdata)
|
|
|
|
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
|
|
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
|
|
assert len(subdir) == 4
|
|
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
|
|
|
|
assert len(formdata.evolution[-1].parts) == 2
|
|
assert isinstance(formdata.evolution[-1].parts[0], AttachmentEvolutionPart)
|
|
assert formdata.evolution[-1].parts[0].orig_filename == upload.orig_filename
|
|
|
|
assert isinstance(formdata.evolution[-1].parts[1], JournalEvolutionPart)
|
|
assert len(formdata.evolution[-1].parts[1].content) > 0
|
|
comment_view = str(formdata.evolution[-1].parts[1].view())
|
|
assert comment_view == '<p>%s</p>' % comment_text
|
|
|
|
if os.path.exists(os.path.join(get_publisher().app_dir, 'attachments')):
|
|
shutil.rmtree(os.path.join(get_publisher().app_dir, 'attachments'))
|
|
|
|
formdata.evolution[-1].parts = []
|
|
formdata.store()
|
|
|
|
ws_response_varname = 'ws_response_afile'
|
|
wf_data = {
|
|
'%s_filename' % ws_response_varname: 'hello.txt',
|
|
'%s_content_type' % ws_response_varname: 'text/plain',
|
|
'%s_b64_content' % ws_response_varname: base64.encodestring(b'hello world'),
|
|
}
|
|
formdata.update_workflow_data(wf_data)
|
|
formdata.store()
|
|
assert hasattr(formdata, 'workflow_data')
|
|
assert isinstance(formdata.workflow_data, dict)
|
|
|
|
item = RegisterCommenterWorkflowStatusItem()
|
|
item.attachments = ["utils.dict_from_prefix('%s_', locals())" % ws_response_varname]
|
|
item.comment = comment_text
|
|
item.perform(formdata)
|
|
|
|
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
|
|
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
|
|
assert len(subdir) == 4
|
|
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
|
|
|
|
assert len(formdata.evolution[-1].parts) == 2
|
|
assert isinstance(formdata.evolution[-1].parts[0], AttachmentEvolutionPart)
|
|
assert formdata.evolution[-1].parts[0].orig_filename == 'hello.txt'
|
|
|
|
assert isinstance(formdata.evolution[-1].parts[1], JournalEvolutionPart)
|
|
assert len(formdata.evolution[-1].parts[1].content) > 0
|
|
comment_view = str(formdata.evolution[-1].parts[1].view())
|
|
assert comment_view == '<p>%s</p>' % comment_text
|
|
|
|
|
|
def test_email(pub, emails):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
user = pub.user_class(name='foo')
|
|
user.email = 'zorg@localhost'
|
|
user.store()
|
|
|
|
Role.wipe()
|
|
role1 = Role(name='foo')
|
|
role1.emails = ['foo@localhost']
|
|
role1.store()
|
|
|
|
role2 = Role(name='bar')
|
|
role2.emails = ['bar@localhost', 'baz@localhost']
|
|
role2.store()
|
|
|
|
# send using an uncompleted element
|
|
item = SendmailWorkflowStatusItem()
|
|
item.perform(formdata) # nothing
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
item.to = [role1.id]
|
|
item.perform(formdata) # no subject nor body
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
item.subject = 'foobar'
|
|
item.perform(formdata) # no body
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
# send for real
|
|
item.body = 'baz'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')
|
|
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost']
|
|
assert 'baz' in emails.get('foobar')['payload']
|
|
|
|
# template for subject or body (Django)
|
|
emails.empty()
|
|
item.subject = '{{ bar }}'
|
|
item.body = '{{ foo }}'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('Foobar')
|
|
assert '1 < 3' in emails.get('Foobar')['payload']
|
|
|
|
# template for subject or body (ezt)
|
|
emails.empty()
|
|
item.subject = '[bar]'
|
|
item.body = '[foo]'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('Foobar')
|
|
assert '1 < 3' in emails.get('Foobar')['payload']
|
|
|
|
# two recipients
|
|
emails.empty()
|
|
item.subject = 'foobar'
|
|
item.to = [role1.id, role2.id]
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['to'] == 'Undisclosed recipients:;'
|
|
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost',
|
|
'bar@localhost', 'baz@localhost']
|
|
|
|
# submitter as recipient, no known email address
|
|
emails.empty()
|
|
item.to = ['_submitter']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
# submitter as recipient, known email address
|
|
emails.empty()
|
|
formdata.user_id = user.id
|
|
formdata.store()
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['email_rcpt'] == ['zorg@localhost']
|
|
|
|
# computed recipient
|
|
emails.empty()
|
|
item.to = ['=email']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['email_rcpt'] == ['sub@localhost']
|
|
|
|
# computed list of recipients
|
|
emails.empty()
|
|
item.to = ['=["foo@localhost", "bar@localhost"]']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost', 'bar@localhost']
|
|
|
|
# multiple recipients in a single computed string
|
|
emails.empty()
|
|
item.to = ['="foo@localhost, bar@localhost"']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost', 'bar@localhost']
|
|
|
|
# string as recipient
|
|
emails.empty()
|
|
item.to = 'xyz@localhost'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
|
|
|
|
# string as recipient (but correctly set in a list)
|
|
emails.empty()
|
|
item.to = ['xyz@localhost']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
|
|
|
|
# multiple recipients in a static string
|
|
emails.empty()
|
|
item.to = ['foo@localhost, bar@localhost']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost', 'bar@localhost']
|
|
|
|
# invalid recipient
|
|
emails.empty()
|
|
item.to = ['=foobar']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
# empty recipient
|
|
emails.empty()
|
|
item.to = ['=None']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
# custom from email
|
|
emails.empty()
|
|
item.to = [role1.id]
|
|
item.custom_from = 'foobar@localhost'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar').get('from') == 'foobar@localhost'
|
|
|
|
# custom from email (computed)
|
|
emails.empty()
|
|
item.to = [role1.id]
|
|
item.custom_from = '="foobar@localhost"'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar').get('from') == 'foobar@localhost'
|
|
|
|
|
|
def test_email_django_escaping(pub, emails):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = SendmailWorkflowStatusItem()
|
|
item.to = ['foo@localhost']
|
|
item.subject = 'Foobar'
|
|
|
|
# explicit safe strings
|
|
emails.empty()
|
|
formdata.data = {'1': '1 < 3'}
|
|
item.body = '{{ form_var_foo|safe }}'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('Foobar')['payload'].strip() == '1 < 3'
|
|
|
|
# automatic no-escaping (because text/plain)
|
|
emails.empty()
|
|
formdata.data = {'1': '1 < 3'}
|
|
item.body = '{{ form_var_foo }}'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('Foobar')['payload'].strip() == '1 < 3'
|
|
|
|
# automatic escaping (because mail body is HTML)
|
|
emails.empty()
|
|
formdata.data = {'1': '1 < 3'}
|
|
item.body = '<p>{{ form_var_foo }}</p>'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('Foobar')
|
|
assert '<p>1 < 3</p>' in emails.get('Foobar')['payload'].strip()
|
|
|
|
# no automatic escaping for subject (even if mail body is HTML)
|
|
emails.empty()
|
|
formdata.data = {'1': '1 < 3'}
|
|
item.subject = '{{ form_var_foo }}'
|
|
item.body = '<p>{{ form_var_foo }}</p>'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('1 < 3')
|
|
|
|
|
|
def test_email_attachments(pub, emails):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
FileField(id='3', label='File', type='file', varname='file'),
|
|
]
|
|
formdef.store()
|
|
|
|
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
|
jpg = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
|
|
upload.receive([jpg])
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'3': upload}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
sendmail = SendmailWorkflowStatusItem()
|
|
sendmail.subject = 'foobar'
|
|
sendmail.body = '<p>force html</p>'
|
|
sendmail.to = ['to@example.net']
|
|
sendmail.attachments = ['form_var_file_raw']
|
|
sendmail.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.emails['foobar']['msg'].is_multipart()
|
|
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
|
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
|
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
|
|
|
# build a backoffice field
|
|
Workflow.wipe()
|
|
FormDef.wipe()
|
|
wf = Workflow(name='email with attachments')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
wf.backoffice_fields_formdef.fields = [
|
|
FileField(id='bo1-1x', label='bo field 1', type='file', varname='backoffice_file1'),
|
|
FileField(id='bo2', label='bo field 2', type='file', varname='backoffice_file2'),
|
|
]
|
|
st1 = wf.add_status('Status1')
|
|
wf.store()
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
FileField(id='1', label='File', type='file', varname='frontoffice_file'),
|
|
]
|
|
formdef.workflow_id = wf.id
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'1': upload}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
# store file in backoffice field form_fbo1_1x / form_var_backoffice_file_raw
|
|
setbo = SetBackofficeFieldsWorkflowStatusItem()
|
|
setbo.parent = st1
|
|
setbo.fields = [{'field_id': 'bo1-1x', 'value': '=form_var_frontoffice_file_raw'}]
|
|
setbo.perform(formdata)
|
|
|
|
# check compatibility with actions defined before #33366 was fixed
|
|
emails.empty()
|
|
sendmail.attachments = ['form_fbo1-1x']
|
|
sendmail.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.emails['foobar']['msg'].is_multipart()
|
|
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
|
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
|
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
|
|
|
# check with correct varname-less field
|
|
emails.empty()
|
|
sendmail.attachments = ['form_fbo1_1x']
|
|
sendmail.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.emails['foobar']['msg'].is_multipart()
|
|
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
|
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
|
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
|
|
|
# check with variable
|
|
emails.empty()
|
|
sendmail.attachments = ['form_var_backoffice_file1_raw']
|
|
sendmail.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.emails['foobar']['msg'].is_multipart()
|
|
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
|
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
|
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
|
|
|
emails.empty()
|
|
sendmail.attachments = ['form_var_backoffice_file1_raw', 'form_var_backoffice_file2_raw']
|
|
sendmail.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.emails['foobar']['msg'].is_multipart()
|
|
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
|
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
|
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
|
# backoffice_file2 is unset, no more parts :
|
|
assert len(emails.emails['foobar']['msg'].get_payload()) == 2
|
|
|
|
# set backoffice_file2 and retry
|
|
setbo.fields = [{'field_id': 'bo2',
|
|
'value': '={"content": "blah", "filename": "hello.txt", '
|
|
'"content_type": "text/plain"}'}]
|
|
setbo.perform(formdata)
|
|
emails.empty()
|
|
sendmail.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.emails['foobar']['msg'].is_multipart()
|
|
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
|
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
|
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
|
assert emails.emails['foobar']['msg'].get_payload()[2].get_content_type() == 'text/plain'
|
|
if six.PY2:
|
|
assert emails.emails['foobar']['msg'].get_payload()[2].get_payload() == 'blah'
|
|
else:
|
|
assert base64.decodestring(force_bytes(emails.emails['foobar']['msg'].get_payload()[2].get_payload())) == b'blah'
|
|
assert len(emails.emails['foobar']['msg'].get_payload()) == 3
|
|
|
|
emails.empty()
|
|
sendmail.attachments = [
|
|
'utils.attachment("Hello world")',
|
|
'utils.attachment(\'{"hello": "world"}\', content_type=\'application/json\')',
|
|
]
|
|
sendmail.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.emails['foobar']['msg'].is_multipart()
|
|
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
|
assert emails.emails['foobar']['msg'].get_payload(0).get_content_type() == 'text/html'
|
|
assert emails.emails['foobar']['msg'].get_payload(1).get_content_type() == 'application/octet-stream'
|
|
assert emails.emails['foobar']['msg'].get_payload(2).get_content_type() == 'application/json'
|
|
payload1 = emails.emails['foobar']['msg'].get_payload(1)
|
|
payload2 = emails.emails['foobar']['msg'].get_payload(2)
|
|
assert payload1.get_payload(decode=True) == b"Hello world"
|
|
assert json.loads(force_text(payload2.get_payload(decode=True))) == {'hello': 'world'}
|
|
|
|
|
|
def test_webservice_call(http_requests, pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
wf = Workflow(name='wf1')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
sterr = wf.add_status('StatusErr', 'sterr')
|
|
wf.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_id = wf.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'GET'
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post = True
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'POST'
|
|
payload = json.loads(http_requests.get_last('body'))
|
|
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
|
|
assert payload['display_id'] == formdata.get_display_id()
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post_data = {'str': 'abcd', 'one': '=1', 'django': '{{ form_number }}',
|
|
'evalme': '=form_number', 'error':'=1=3'}
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'POST'
|
|
payload = json.loads(http_requests.get_last('body'))
|
|
assert payload == {'one': 1, 'str': 'abcd',
|
|
'evalme': formdata.get_display_id(),
|
|
'django': formdata.get_display_id()}
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post = True
|
|
item.post_data = {'str': 'abcd', 'one': '=1', 'decimal': '=Decimal(2)',
|
|
'evalme': '=form_number', 'error':'=1=3'}
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'POST'
|
|
payload = json.loads(http_requests.get_last('body'))
|
|
assert payload['extra'] == {'one': 1, 'str': 'abcd',
|
|
'decimal': '2', 'evalme': formdata.get_display_id()}
|
|
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
|
|
assert payload['display_id'] == formdata.get_display_id()
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json'
|
|
item.varname = 'xxx'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data['xxx_status'] == 200
|
|
assert formdata.workflow_data['xxx_response'] == {'foo': 'bar'}
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.request_signature_key = 'xxx'
|
|
item.perform(formdata)
|
|
assert 'signature=' in http_requests.get_last('url')
|
|
assert http_requests.get_last('method') == 'GET'
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.request_signature_key = '{{ doesntexist }}'
|
|
item.perform(formdata)
|
|
assert not 'signature=' in http_requests.get_last('url')
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.request_signature_key = '{{ empty }}'
|
|
item.perform(formdata)
|
|
assert not 'signature=' in http_requests.get_last('url')
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.request_signature_key = '[empty]'
|
|
item.perform(formdata)
|
|
assert not 'signature=' in http_requests.get_last('url')
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.request_signature_key = '{{ bar }}'
|
|
item.perform(formdata)
|
|
assert 'signature=' in http_requests.get_last('url')
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.request_signature_key = '[bar]'
|
|
item.perform(formdata)
|
|
assert 'signature=' in http_requests.get_last('url')
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/204'
|
|
item.varname = 'xxx'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 204
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
assert 'xxx_error_response' not in formdata.workflow_data
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/404'
|
|
item.varname = 'xxx'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 404
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
assert 'xxx_error_response' not in formdata.workflow_data
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/404-json'
|
|
item.varname = 'xxx'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 404
|
|
assert formdata.workflow_data.get('xxx_error_response') == {'err': 1}
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/404'
|
|
item.action_on_4xx = ':pass'
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/500'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/500'
|
|
item.action_on_5xx = ':pass'
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.parent = st1
|
|
assert item.get_jump_label(st1.id) == 'Webservice'
|
|
assert item.get_jump_label('sterr') == 'Error calling webservice'
|
|
item.label = 'Plop'
|
|
assert item.get_jump_label(st1.id) == 'Webservice "Plop"'
|
|
assert item.get_jump_label('sterr') == 'Error calling webservice "Plop"'
|
|
item.url = 'http://remote.example.net/500'
|
|
item.action_on_5xx = 'sterr' # jump to status
|
|
formdata.status = 'wf-st1'
|
|
formdata.store()
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.status == 'wf-sterr'
|
|
item.action_on_5xx = 'stdeleted' # removed status
|
|
formdata.status = 'wf-st1'
|
|
formdata.store()
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.status == 'wf-st1' # unknown status acts like :stop
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.varname = 'xxx'
|
|
item.action_on_bad_data = ':stop'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 200
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
assert 'xxx_error_response' not in formdata.workflow_data
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/404'
|
|
item.record_errors = True
|
|
item.action_on_4xx = ':stop'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].parts[-1].summary == '404 whatever'
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.varname = 'xxx'
|
|
item.action_on_bad_data = ':stop'
|
|
item.record_errors = True
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
if six.PY2:
|
|
assert formdata.evolution[-1].parts[-1].summary == 'ValueError: No JSON object could be decoded\n'
|
|
else:
|
|
assert formdata.evolution[-1].parts[-1].summary == 'json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)\n'
|
|
assert formdata.workflow_data.get('xxx_status') == 200
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
assert 'xxx_error_response' not in formdata.workflow_data
|
|
formdata.workflow_data = None
|
|
|
|
# check storing response as attachment
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.varname = 'xxx'
|
|
item.response_type = 'attachment'
|
|
item.record_errors = True
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 200
|
|
assert formdata.workflow_data.get('xxx_content_type') == 'text/xml'
|
|
attachment = formdata.evolution[-1].parts[-1]
|
|
assert isinstance(attachment, AttachmentEvolutionPart)
|
|
assert attachment.base_filename == 'xxx.xml'
|
|
assert attachment.content_type == 'text/xml'
|
|
attachment.fp.seek(0)
|
|
assert attachment.fp.read(5) == b'<?xml'
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/400-json'
|
|
item.record_errors = True
|
|
item.action_on_4xx = ':stop'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
rendered = formdata.evolution[-1].parts[-1].view()
|
|
assert not rendered # empty if not in backoffice
|
|
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': '/backoffice/'})
|
|
pub._set_request(req)
|
|
rendered = formdata.evolution[-1].parts[-1].view()
|
|
assert 'Error during webservice call' in str(rendered)
|
|
assert 'Error Code: 1' in str(rendered)
|
|
assert 'Error Description: :(' in str(rendered)
|
|
|
|
item.label = 'do that'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
rendered = formdata.evolution[-1].parts[-1].view()
|
|
assert 'Error during webservice call "do that"' in str(rendered)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.method = 'GET'
|
|
item.url = 'http://remote.example.net?in_url=1'
|
|
item.qs_data = {
|
|
'str': 'abcd',
|
|
'one': '=1',
|
|
'evalme': '=form_number',
|
|
'django': '{{ form_number }}',
|
|
'ezt': '[form_number]',
|
|
'error': '=1=3',
|
|
'in_url': '2',
|
|
}
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('method') == 'GET'
|
|
qs = urlparse.parse_qs(http_requests.get_last('url').split('?')[1])
|
|
assert set(qs.keys()) == set(['in_url', 'str', 'one', 'evalme', 'django', 'ezt'])
|
|
assert qs['in_url'] == ['1', '2']
|
|
assert qs['one'] == ['1']
|
|
assert qs['evalme'] == [formdata.get_display_id()]
|
|
assert qs['django'] == [formdata.get_display_id()]
|
|
assert qs['ezt'] == [formdata.get_display_id()]
|
|
assert qs['str'] == ['abcd']
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.method = 'DELETE'
|
|
item.post = False
|
|
item.url = 'http://remote.example.net/json'
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('method') == 'DELETE'
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.method = 'PUT'
|
|
item.post = False
|
|
item.post_data = {'str': 'abcd', 'one': '=1',
|
|
'evalme': '=form_number', 'error':'=1=3'}
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'PUT'
|
|
payload = json.loads(http_requests.get_last('body'))
|
|
assert payload == {'one': 1, 'str': 'abcd', 'evalme': formdata.get_display_id()}
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.method = 'PATCH'
|
|
item.post = False
|
|
item.post_data = {'str': 'abcd', 'one': '=1',
|
|
'evalme': '=form_number', 'error':'=1=3'}
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'PATCH'
|
|
payload = json.loads(http_requests.get_last('body'))
|
|
assert payload == {'one': 1, 'str': 'abcd', 'evalme': formdata.get_display_id()}
|
|
|
|
|
|
def test_webservice_waitpoint(pub):
|
|
item = WebserviceCallStatusItem()
|
|
assert item.waitpoint
|
|
item.action_on_app_error = ':pass'
|
|
item.action_on_4xx = ':pass'
|
|
item.action_on_5xx = ':pass'
|
|
item.action_on_bad_data = ':pass'
|
|
item.action_on_network_errors = ':pass'
|
|
assert not item.waitpoint
|
|
item.action_on_network_errors = ':stop'
|
|
assert item.waitpoint
|
|
|
|
|
|
def test_webservice_call_error_handling(http_requests, pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-err1'
|
|
item.action_on_app_error = ':stop'
|
|
item.action_on_4xx = ':pass'
|
|
item.action_on_5xx = ':pass'
|
|
item.action_on_network_errors = ':pass'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-errheader1'
|
|
item.action_on_app_error = ':stop'
|
|
item.action_on_4xx = ':pass'
|
|
item.action_on_5xx = ':pass'
|
|
item.action_on_network_errors = ':pass'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-errheaderstr'
|
|
item.action_on_app_error = ':stop'
|
|
item.action_on_4xx = ':pass'
|
|
item.action_on_5xx = ':pass'
|
|
item.action_on_network_errors = ':pass'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-err0'
|
|
item.varname = 'xxx'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data['xxx_status'] == 200
|
|
assert formdata.workflow_data['xxx_app_error_code'] == 0
|
|
assert formdata.workflow_data['xxx_response'] == {'data': 'foo', 'err': 0}
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-err0'
|
|
item.varname = 'xxx'
|
|
item.action_on_app_error = ':stop'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data['xxx_status'] == 200
|
|
assert formdata.workflow_data['xxx_app_error_code'] == 0
|
|
assert formdata.workflow_data['xxx_response'] == {'data': 'foo', 'err': 0}
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-err1'
|
|
item.varname = 'xxx'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data['xxx_status'] == 200
|
|
assert formdata.workflow_data['xxx_app_error_code'] == 1
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-errstr'
|
|
item.varname = 'xxx'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data['xxx_status'] == 200
|
|
assert formdata.workflow_data['xxx_app_error_code'] == 'bug'
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-err1'
|
|
item.varname = 'xxx'
|
|
item.action_on_app_error = ':stop'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data['xxx_status'] == 200
|
|
assert formdata.workflow_data['xxx_app_error_code'] == 1
|
|
assert formdata.workflow_data['xxx_error_response'] == {'data': '', 'err': 1}
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-errheader0'
|
|
item.varname = 'xxx'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data['xxx_status'] == 200
|
|
assert formdata.workflow_data['xxx_app_error_code'] == 0
|
|
assert formdata.workflow_data['xxx_app_error_header'] == '0'
|
|
assert formdata.workflow_data['xxx_response'] == {'foo': 'bar'}
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-errheader1'
|
|
item.varname = 'xxx'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data['xxx_status'] == 200
|
|
assert formdata.workflow_data['xxx_app_error_code'] == 1
|
|
assert formdata.workflow_data['xxx_app_error_header'] == '1'
|
|
assert formdata.workflow_data['xxx_error_response'] == {'foo': 'bar'}
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-errheaderstr'
|
|
item.varname = 'xxx'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data['xxx_status'] == 200
|
|
assert formdata.workflow_data['xxx_app_error_code'] == 'bug'
|
|
assert formdata.workflow_data['xxx_app_error_header'] == 'bug'
|
|
assert formdata.workflow_data['xxx_error_response'] == {'foo': 'bar'}
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-errheader1'
|
|
item.varname = 'xxx'
|
|
item.action_on_app_error = ':stop'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data['xxx_status'] == 200
|
|
assert formdata.workflow_data['xxx_app_error_code'] == 1
|
|
assert formdata.workflow_data['xxx_app_error_header'] == '1'
|
|
assert formdata.workflow_data['xxx_error_response'] == {'foo': 'bar'}
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml-errheader'
|
|
item.varname = 'xxx'
|
|
item.response_type = 'attachment'
|
|
item.record_errors = True
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 200
|
|
assert formdata.workflow_data.get('xxx_app_error_code') == 1
|
|
assert formdata.workflow_data.get('xxx_app_error_header') == '1'
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml-errheader'
|
|
item.varname = 'xxx'
|
|
item.response_type = 'attachment'
|
|
item.record_errors = True
|
|
item.action_on_app_error = ':stop'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 200
|
|
assert formdata.workflow_data.get('xxx_app_error_code') == 1
|
|
assert formdata.workflow_data.get('xxx_app_error_header') == '1'
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml-errheader'
|
|
item.varname = 'xxx'
|
|
item.response_type = 'json' # wait for json but receive xml
|
|
item.record_errors = True
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 200
|
|
assert formdata.workflow_data.get('xxx_app_error_code') == 1
|
|
assert formdata.workflow_data.get('xxx_app_error_header') == '1'
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-err1'
|
|
item.action_on_app_error = ':stop'
|
|
item.response_type = 'attachment' # err value is not an error
|
|
item.perform(formdata) # so, everything is "ok" here
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json-errheaderstr'
|
|
item.action_on_app_error = ':stop'
|
|
item.response_type = 'attachment'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
formdata.workflow_data = None
|
|
|
|
# xml instead of json is not a app_error
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.varname = 'xxx'
|
|
item.action_on_app_error = ':stop'
|
|
item.action_on_4xx = ':pass'
|
|
item.action_on_5xx = ':pass'
|
|
item.action_on_network_errors = ':pass'
|
|
item.action_on_bad_data = ':pass'
|
|
item.perform(formdata)
|
|
formdata.workflow_data = None
|
|
|
|
# connection error
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/connection-error'
|
|
item.record_errors = True
|
|
item.action_on_network_errors = ':pass'
|
|
item.perform(formdata)
|
|
assert not formdata.workflow_data
|
|
|
|
# connection error, with varname
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/connection-error'
|
|
item.varname = 'plop'
|
|
item.record_errors = True
|
|
item.action_on_network_errors = ':pass'
|
|
item.perform(formdata)
|
|
assert 'ConnectionError: error\n' in formdata.evolution[-1].parts[-1].summary
|
|
assert formdata.workflow_data['plop_connection_error'] == 'error'
|
|
|
|
|
|
def test_webservice_call_store_in_backoffice_filefield(http_requests, pub):
|
|
wf = Workflow(name='wscall to backoffice file field')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
wf.backoffice_fields_formdef.fields = [
|
|
FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'),
|
|
]
|
|
st1 = wf.add_status('Status1')
|
|
wf.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_id = wf.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {}
|
|
formdata.store()
|
|
|
|
# check storing response in backoffice file field
|
|
item = WebserviceCallStatusItem()
|
|
item.parent = st1
|
|
item.backoffice_filefield_id = 'bo1'
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.response_type = 'attachment'
|
|
item.record_errors = True
|
|
item.perform(formdata)
|
|
|
|
assert 'bo1' in formdata.data
|
|
fbo1 = formdata.data['bo1']
|
|
assert fbo1.base_filename == 'file-bo1.xml'
|
|
assert fbo1.content_type == 'text/xml'
|
|
assert fbo1.get_content().startswith(b'<?xml')
|
|
# nothing else is stored
|
|
assert formdata.workflow_data is None
|
|
assert not formdata.evolution[-1].parts
|
|
|
|
# store in backoffice file field + varname
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
item.varname = 'xxx'
|
|
item.perform(formdata)
|
|
# backoffice file field
|
|
assert 'bo1' in formdata.data
|
|
fbo1 = formdata.data['bo1']
|
|
assert fbo1.base_filename == 'xxx.xml'
|
|
assert fbo1.content_type == 'text/xml'
|
|
assert fbo1.get_content().startswith(b'<?xml')
|
|
# varname => workflow_data and AttachmentEvolutionPart
|
|
assert formdata.workflow_data.get('xxx_status') == 200
|
|
assert formdata.workflow_data.get('xxx_content_type') == 'text/xml'
|
|
attachment = formdata.evolution[-1].parts[-1]
|
|
assert isinstance(attachment, AttachmentEvolutionPart)
|
|
assert attachment.base_filename == 'xxx.xml'
|
|
assert attachment.content_type == 'text/xml'
|
|
attachment.fp.seek(0)
|
|
assert attachment.fp.read(5) == b'<?xml'
|
|
|
|
# no more 'bo1' backoffice field: do nothing
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
wf.backoffice_fields_formdef.fields = [
|
|
FileField(id='bo2', label='bo field 2', type='file'), # id != 'bo1'
|
|
]
|
|
item.perform(formdata)
|
|
assert formdata.data == {}
|
|
# backoffice field is not a field file:
|
|
wf.backoffice_fields_formdef.fields = [
|
|
StringField(id='bo1', label='bo field 1', type='string'),
|
|
]
|
|
item.perform(formdata)
|
|
assert formdata.data == {}
|
|
# no field at all:
|
|
wf.backoffice_fields_formdef.fields = []
|
|
item.perform(formdata)
|
|
assert formdata.data == {}
|
|
|
|
|
|
def test_webservice_target_status(pub):
|
|
wf = Workflow(name='boo')
|
|
status1 = wf.add_status('Status1', 'st1')
|
|
status2 = wf.add_status('Status2', 'st2')
|
|
wf.store()
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.parent = status1
|
|
assert item.get_target_status() == [status1.id]
|
|
|
|
item.action_on_app_error = status1.id
|
|
item.action_on_4xx = status2.id
|
|
item.action_on_5xx = status2.id
|
|
targets = item.get_target_status()
|
|
assert len(item.get_target_status()) == 4
|
|
assert targets.count(status1) == 2
|
|
assert targets.count(status2) == 2
|
|
|
|
item.action_on_bad_data = 'st3' # doesn't exist
|
|
targets = item.get_target_status()
|
|
assert len(item.get_target_status()) == 4
|
|
assert targets.count(status1) == 2
|
|
assert targets.count(status2) == 2
|
|
|
|
|
|
def test_timeout(two_pubs):
|
|
workflow = Workflow(name='timeout')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = ['_submitter', '_receiver']
|
|
jump.timeout = 0.1
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
assert formdef.get_workflow().id == workflow.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
formdata_id = formdata.id
|
|
|
|
time.sleep(0.3)
|
|
_apply_timeouts(two_pubs)
|
|
|
|
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
|
|
|
|
# check there's no crash on workflow without jumps
|
|
formdef = FormDef()
|
|
formdef.name = 'xxx'
|
|
formdef.store()
|
|
_apply_timeouts(two_pubs)
|
|
|
|
|
|
def test_legacy_timeout(pub):
|
|
workflow = Workflow(name='timeout')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
|
|
jump = TimeoutWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.timeout = 0.1
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
assert formdef.get_workflow().id == workflow.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
formdata_id = formdata.id
|
|
|
|
time.sleep(0.3)
|
|
_apply_timeouts(pub)
|
|
|
|
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
|
|
|
|
|
|
def test_timeout_then_remove(two_pubs):
|
|
workflow = Workflow(name='timeout-then-remove')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = ['_submitter', '_receiver']
|
|
jump.timeout = 0.1
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
remove = RemoveWorkflowStatusItem()
|
|
st2.items.append(remove)
|
|
remove.parent = st2
|
|
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz%s' % id(two_pubs)
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
assert formdef.get_workflow().id == workflow.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
formdata_id = formdata.id
|
|
|
|
assert str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
|
|
|
|
time.sleep(0.2)
|
|
_apply_timeouts(two_pubs)
|
|
|
|
assert not str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
|
|
|
|
|
|
def test_timeout_with_mark(two_pubs):
|
|
workflow = Workflow(name='timeout')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = ['_submitter', '_receiver']
|
|
jump.timeout = 0.1
|
|
jump.status = 'st2'
|
|
jump.set_marker_on_status = True
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
assert formdef.get_workflow().id == workflow.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
formdata_id = formdata.id
|
|
|
|
time.sleep(0.3)
|
|
_apply_timeouts(two_pubs)
|
|
|
|
formdata = formdef.data_class().get(formdata_id)
|
|
assert formdata.workflow_data.get('_markers_stack') == [{'status_id': 'st1'}]
|
|
|
|
|
|
def test_sms(pub, sms_mocking):
|
|
pub.cfg['sms'] = {'mode': 'xxx'}
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = SendSMSWorkflowStatusItem()
|
|
item.to = ['000']
|
|
item.body = 'XXX'
|
|
assert len(sms_mocking.sms) == 0
|
|
item.perform(formdata)
|
|
assert len(sms_mocking.sms) == 1
|
|
assert sms_mocking.sms[0]['destinations'] == ['000']
|
|
assert sms_mocking.sms[0]['text'] == 'XXX'
|
|
|
|
# check None as recipient is not passed to the SMS backend
|
|
item.to = [None]
|
|
item.perform(formdata) # nothing
|
|
assert len(sms_mocking.sms) == 1
|
|
|
|
item.to = ['000', None]
|
|
item.perform(formdata)
|
|
assert len(sms_mocking.sms) == 2
|
|
assert sms_mocking.sms[1]['destinations'] == ['000']
|
|
assert sms_mocking.sms[1]['text'] == 'XXX'
|
|
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
item.body = 'dj{{ bar }}'
|
|
item.perform(formdata)
|
|
assert len(sms_mocking.sms) == 3
|
|
assert sms_mocking.sms[2]['destinations'] == ['000']
|
|
assert sms_mocking.sms[2]['text'] == 'djFoobar'
|
|
|
|
item.body = 'ezt[bar]'
|
|
item.perform(formdata)
|
|
assert len(sms_mocking.sms) == 4
|
|
assert sms_mocking.sms[3]['destinations'] == ['000']
|
|
assert sms_mocking.sms[3]['text'] == 'eztFoobar'
|
|
|
|
# disable SMS system
|
|
pub.cfg['sms'] = {'mode': 'none'}
|
|
item.to = ['000']
|
|
item.body = 'XXX'
|
|
item.perform(formdata) # nothing
|
|
assert len(sms_mocking.sms) == 4
|
|
|
|
|
|
def test_sms_with_passerelle(pub):
|
|
pub.cfg['sms'] = {'mode': 'passerelle',
|
|
'passerelle_url': 'http://passerelle.example.com/send?nostop=1',
|
|
'sender': 'Passerelle'}
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = SendSMSWorkflowStatusItem()
|
|
item.to = ['1234']
|
|
item.body = 'my message'
|
|
with mock.patch('wcs.wscalls.get_secret_and_orig') as mocked_secret_and_orig:
|
|
mocked_secret_and_orig.return_value = ('secret', 'localhost')
|
|
with mock.patch('wcs.qommon.misc._http_request') as mocked_http_post:
|
|
mocked_http_post.return_value = ('response', '200', 'data', 'headers')
|
|
item.perform(formdata)
|
|
url = mocked_http_post.call_args[0][0]
|
|
payload = mocked_http_post.call_args[1]['body']
|
|
assert 'http://passerelle.example.com' in url
|
|
assert '?nostop=1' in url
|
|
assert 'orig=localhost' in url
|
|
assert 'signature=' in url
|
|
json_payload = json.loads(payload)
|
|
assert 'message' in json_payload
|
|
assert json_payload['message'] == 'my message'
|
|
assert json_payload['to'] == ['1234']
|
|
assert json_payload['from'] == 'Passerelle'
|
|
|
|
|
|
def test_display_form(two_pubs):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
display_form = FormWorkflowStatusItem()
|
|
display_form.id = '_x'
|
|
display_form.varname = 'xxx'
|
|
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
|
display_form.formdef.fields.append(StringField(id='1', label='Test',
|
|
type='string'))
|
|
display_form.formdef.fields.append(DateField(id='2', label='Date',
|
|
type='date', varname='date'))
|
|
st1.items.append(display_form)
|
|
display_form.parent = st1
|
|
|
|
form = Form(action='#', use_tokens=False)
|
|
display_form.fill_form(form, formdata, None)
|
|
assert form.widgets[0].title == 'Test'
|
|
assert form.widgets[1].title == 'Date'
|
|
|
|
two_pubs.get_request().environ['REQUEST_METHOD'] = 'POST'
|
|
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '2015-05-12', 'submit': 'submit'}
|
|
display_form.submit_form(form, formdata, None, None)
|
|
|
|
assert formdata.get_substitution_variables()['xxx_var_date'] == '2015-05-12'
|
|
|
|
two_pubs.cfg['language'] = {'language': 'fr'}
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
form = Form(action='#', use_tokens=False)
|
|
display_form.fill_form(form, formdata, None)
|
|
two_pubs.get_request().environ['REQUEST_METHOD'] = 'POST'
|
|
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '12/05/2015', 'submit': 'submit'}
|
|
display_form.submit_form(form, formdata, None, None)
|
|
assert formdata.get_substitution_variables()['xxx_var_date'] == '12/05/2015'
|
|
|
|
assert formdata.get_substitution_variables()['xxx_var_date_raw'] == \
|
|
time.strptime('2015-05-12', '%Y-%m-%d')
|
|
|
|
two_pubs.cfg['language'] = {'language': 'en'}
|
|
|
|
|
|
def test_display_form_and_comment(pub):
|
|
role = Role(name='bar1')
|
|
role.store()
|
|
|
|
user = pub.user_class()
|
|
user.roles = [role.id]
|
|
user.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
display_form = FormWorkflowStatusItem()
|
|
display_form.by = [role.id]
|
|
display_form.id = '_x'
|
|
display_form.varname = 'xxx'
|
|
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
|
display_form.formdef.fields.append(CommentField(id='1', label='Test',
|
|
type='comment'))
|
|
st1.items.append(display_form)
|
|
display_form.parent = st1
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.by = [role.id]
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
wf.store()
|
|
|
|
formdef.workflow = wf
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
assert formdata.get_status().name == 'Status1'
|
|
|
|
form = formdata.get_workflow_form(user)
|
|
assert 'Test' in str(form.widgets[0].render())
|
|
assert '<textarea' in str(form.widgets[1].render())
|
|
|
|
|
|
def test_display_form_migration(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
display_form = FormWorkflowStatusItem()
|
|
display_form.id = '_x'
|
|
display_form.varname = 'xxx'
|
|
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
|
display_form.formdef.fields = [
|
|
ItemField(id='1', label='Test', type='item')
|
|
]
|
|
st1.items.append(display_form)
|
|
display_form.parent = st1
|
|
|
|
display_form.formdef.fields[0].show_as_radio = True
|
|
wf.store()
|
|
|
|
wf = Workflow.get(wf.id)
|
|
assert wf.possible_status[0].items[0].formdef.fields[0].display_mode == 'radio'
|
|
|
|
|
|
def test_choice_button_no_label(pub):
|
|
role = Role(name='bar1')
|
|
role.store()
|
|
|
|
user = pub.user_class()
|
|
user.roles = [role.id]
|
|
user.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
choice = ChoiceWorkflowStatusItem()
|
|
choice.by = [role.id]
|
|
choice.id = '_x'
|
|
st1.items.append(choice)
|
|
choice.parent = st1
|
|
|
|
choice2 = ChoiceWorkflowStatusItem()
|
|
choice2.label = 'TEST'
|
|
choice2.by = [role.id]
|
|
choice2.id = '_x2'
|
|
st1.items.append(choice2)
|
|
choice2.parent = st1
|
|
|
|
wf.store()
|
|
|
|
formdef.workflow = wf
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
assert formdata.get_status().name == 'Status1'
|
|
|
|
form = formdata.get_workflow_form(user)
|
|
form.render()
|
|
assert str(form.render()).count('<button') == 1
|
|
assert '>TEST</button>' in str(form.render())
|
|
|
|
|
|
def test_workflow_role_type_migration(pub):
|
|
workflow = Workflow(name='role migration')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = [1, 2]
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
workflow.store()
|
|
|
|
reloaded_workflow = Workflow.get(workflow.id)
|
|
assert reloaded_workflow.possible_status[0].items[0].by == ['1', '2']
|
|
|
|
|
|
def test_workflow_display_message(pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
workflow = Workflow(name='display message')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
formdata.id = '1'
|
|
|
|
display_message = DisplayMessageWorkflowStatusItem()
|
|
display_message.parent = st1
|
|
|
|
display_message.message = 'test'
|
|
assert display_message.get_message(formdata) == 'test'
|
|
|
|
display_message.message = '{{ number }}'
|
|
assert display_message.get_message(formdata) == str(formdata.id)
|
|
|
|
display_message.message = '[number]'
|
|
assert display_message.get_message(formdata) == str(formdata.id)
|
|
|
|
display_message.message = '{{ bar }}'
|
|
assert display_message.get_message(formdata) == 'Foobar'
|
|
|
|
display_message.message = '[bar]'
|
|
assert display_message.get_message(formdata) == 'Foobar'
|
|
|
|
# makes sure the string is correctly escaped for HTML
|
|
display_message.message = '{{ foo }}'
|
|
assert display_message.get_message(formdata) == '1 < 3'
|
|
display_message.message = '[foo]'
|
|
assert display_message.get_message(formdata) == '1 < 3'
|
|
|
|
|
|
def test_workflow_display_message_to(pub):
|
|
workflow = Workflow(name='display message to')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
|
|
role = Role(name='foorole')
|
|
role.store()
|
|
role2 = Role(name='no-one-role')
|
|
role2.store()
|
|
user = pub.user_class(name='baruser')
|
|
user.roles = []
|
|
user.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.url_name = 'foobar'
|
|
formdef._workflow = workflow
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.status = 'wf-st1'
|
|
|
|
display_message = DisplayMessageWorkflowStatusItem()
|
|
display_message.parent = st1
|
|
st1.items.append(display_message)
|
|
|
|
display_message.message = 'all'
|
|
display_message.to = None
|
|
assert display_message.get_message(formdata) == 'all'
|
|
assert formdata.get_workflow_messages() == ['all']
|
|
|
|
display_message.message = 'to-role'
|
|
display_message.to = [role.id]
|
|
assert display_message.get_message(formdata) == ''
|
|
assert formdata.get_workflow_messages() == []
|
|
|
|
pub._request._user = user
|
|
display_message.message = 'to-role'
|
|
display_message.to = [role.id]
|
|
assert display_message.get_message(formdata) == ''
|
|
assert formdata.get_workflow_messages() == []
|
|
user.roles = [role.id]
|
|
assert display_message.get_message(formdata) == 'to-role'
|
|
assert formdata.get_workflow_messages() == ['to-role']
|
|
|
|
user.roles = []
|
|
display_message.message = 'to-submitter'
|
|
display_message.to = ['_submitter']
|
|
assert display_message.get_message(formdata) == ''
|
|
assert formdata.get_workflow_messages() == []
|
|
formdata.user_id = user.id
|
|
assert display_message.get_message(formdata) == 'to-submitter'
|
|
assert formdata.get_workflow_messages() == ['to-submitter']
|
|
|
|
display_message.message = 'to-role-or-submitter'
|
|
display_message.to = [role.id, '_submitter']
|
|
assert display_message.get_message(formdata) == 'to-role-or-submitter'
|
|
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
|
|
formdata.user_id = None
|
|
assert display_message.get_message(formdata) == ''
|
|
assert formdata.get_workflow_messages() == []
|
|
user.roles = [role.id]
|
|
assert display_message.get_message(formdata) == 'to-role-or-submitter'
|
|
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
|
|
formdata.user_id = user.id
|
|
assert display_message.get_message(formdata) == 'to-role-or-submitter'
|
|
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
|
|
|
|
display_message.to = [role2.id]
|
|
assert display_message.get_message(formdata) == ''
|
|
assert formdata.get_workflow_messages() == []
|
|
|
|
display_message.message = 'd1'
|
|
display_message2 = DisplayMessageWorkflowStatusItem()
|
|
display_message2.parent = st1
|
|
st1.items.append(display_message2)
|
|
display_message2.message = 'd2'
|
|
display_message2.to = [role.id, '_submitter']
|
|
assert formdata.get_workflow_messages() == ['d2']
|
|
user.roles = [role.id, role2.id]
|
|
assert 'd1' in formdata.get_workflow_messages()
|
|
assert 'd2' in formdata.get_workflow_messages()
|
|
|
|
|
|
def test_workflow_display_message_line_details(pub):
|
|
workflow = Workflow(name='display message to')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
display_message = DisplayMessageWorkflowStatusItem()
|
|
display_message.parent = st1
|
|
|
|
assert display_message.get_line_details() == 'top of page'
|
|
display_message.position = 'top'
|
|
assert display_message.get_line_details() == 'top of page'
|
|
display_message.position = 'bottom'
|
|
assert display_message.get_line_details() == 'bottom of page'
|
|
display_message.position = 'actions'
|
|
assert display_message.get_line_details() == 'with actions'
|
|
|
|
role = Role(name='foorole')
|
|
role.store()
|
|
display_message.to = [role.id]
|
|
assert display_message.get_line_details() == 'with actions, for foorole'
|
|
|
|
|
|
def test_workflow_roles(pub, emails):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
user = pub.user_class(name='foo')
|
|
user.email = 'zorg@localhost'
|
|
user.store()
|
|
|
|
Role.wipe()
|
|
role1 = Role(name='foo')
|
|
role1.emails = ['foo@localhost']
|
|
role1.details = 'Hello World'
|
|
role1.store()
|
|
|
|
role2 = Role(name='bar')
|
|
role2.emails = ['bar@localhost', 'baz@localhost']
|
|
role2.store()
|
|
|
|
workflow = Workflow(name='wf roles')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
item = SendmailWorkflowStatusItem()
|
|
item.to = ['_receiver', '_other']
|
|
item.subject = 'Foobar'
|
|
item.body = 'Hello'
|
|
st1.items.append(item)
|
|
item.parent = st1
|
|
workflow.roles['_other'] = 'Other Function'
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_roles = {'_receiver': role1.id, '_other': role2.id}
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
emails.empty()
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('Foobar')
|
|
assert set(emails.get('Foobar')['email_rcpt']) == set(
|
|
['foo@localhost', 'bar@localhost', 'baz@localhost'])
|
|
|
|
workflow.roles['_slug-with-dash'] = 'Dashed Function'
|
|
workflow.store()
|
|
formdef.workflow_roles['_slug-with-dash'] = role1.id
|
|
formdef.store()
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars.get('form_role_other_name') == 'bar'
|
|
assert substvars.get('form_role_slug_with_dash_name') == 'foo'
|
|
assert substvars.get('form_role_slug_with_dash_details') == 'Hello World'
|
|
|
|
|
|
def test_criticality(pub):
|
|
FormDef.wipe()
|
|
|
|
workflow = Workflow(name='criticality')
|
|
workflow.criticality_levels = [
|
|
WorkflowCriticalityLevel(name='green'),
|
|
WorkflowCriticalityLevel(name='yellow'),
|
|
WorkflowCriticalityLevel(name='red'),
|
|
]
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
item = ModifyCriticalityWorkflowStatusItem()
|
|
|
|
formdata = formdef.data_class()()
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'yellow'
|
|
|
|
formdata = formdef.data_class()()
|
|
item.mode = MODE_INC
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'yellow'
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'red'
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'red'
|
|
|
|
item.mode = MODE_DEC
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'yellow'
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'green'
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'green'
|
|
|
|
item.mode = MODE_SET
|
|
item.absolute_value = 2
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'red'
|
|
item.absolute_value = 0
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'green'
|
|
|
|
|
|
def test_geolocate_address(pub):
|
|
formdef = FormDef()
|
|
formdef.geolocations = {'base': 'bla'}
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
StringField(id='1', label='String', type='string', varname='string'),
|
|
]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'1': '169 rue du chateau'}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = GeolocateWorkflowStatusItem()
|
|
item.method = 'address_string'
|
|
item.address_string = '[form_var_string], paris, france'
|
|
|
|
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
|
http_get_page.return_value = (None, 200,
|
|
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
|
|
item.perform(formdata)
|
|
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0]
|
|
assert urlparse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
|
|
assert int(formdata.geolocations['base']['lat']) == 48
|
|
assert int(formdata.geolocations['base']['lon']) == 2
|
|
|
|
pub.load_site_options()
|
|
pub.site_options.set('options', 'nominatim_key', 'KEY')
|
|
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
|
http_get_page.return_value = (None, 200,
|
|
json.dumps([{'lat':'48.8337085', 'lon':'2.3233693'}]), None)
|
|
item.perform(formdata)
|
|
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0]
|
|
assert urlparse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
|
|
assert 'key=KEY' in http_get_page.call_args[0][0]
|
|
assert int(formdata.geolocations['base']['lat']) == 48
|
|
assert int(formdata.geolocations['base']['lon']) == 2
|
|
|
|
pub.load_site_options()
|
|
pub.site_options.set('options', 'geocoding_service_url', 'http://example.net/')
|
|
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
|
http_get_page.return_value = (None, 200,
|
|
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
|
|
item.perform(formdata)
|
|
assert 'http://example.net/?q=' in http_get_page.call_args[0][0]
|
|
|
|
pub.site_options.set('options', 'geocoding_service_url', 'http://example.net/?param=value')
|
|
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
|
http_get_page.return_value = (None, 200,
|
|
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
|
|
item.perform(formdata)
|
|
assert 'http://example.net/?param=value&' in http_get_page.call_args[0][0]
|
|
|
|
# check for invalid ezt
|
|
item.address_string = '[if-any], paris, france'
|
|
formdata.geolocations = None
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
# check for None
|
|
item.address_string = '=None'
|
|
formdata.geolocations = None
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
# check for nominatim server error
|
|
formdata.geolocations = None
|
|
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
|
http_get_page.return_value = (None, 500,
|
|
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
# check for nominatim returning an empty result set
|
|
formdata.geolocations = None
|
|
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
|
http_get_page.return_value = (None, 200, json.dumps([]), None)
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
# check for nominatim bad json
|
|
formdata.geolocations = None
|
|
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
|
http_get_page.return_value = (None, 200, 'bad json', None)
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
# check for nominatim connection error
|
|
formdata.geolocations = None
|
|
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
|
http_get_page.side_effect = ConnectionError
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
|
|
def test_geolocate_image(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.geolocations = {'base': 'bla'}
|
|
formdef.fields = [
|
|
FileField(id='3', label='File', type='file', varname='file'),
|
|
]
|
|
formdef.store()
|
|
|
|
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
|
upload.receive([open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()])
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'3': upload}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = GeolocateWorkflowStatusItem()
|
|
item.method = 'photo_variable'
|
|
|
|
item.photo_variable = '=form_var_file_raw'
|
|
item.perform(formdata)
|
|
assert int(formdata.geolocations['base']['lat']) == -1
|
|
assert int(formdata.geolocations['base']['lon']) == 6
|
|
|
|
# invalid expression
|
|
formdata.geolocations = None
|
|
item.photo_variable = '=1/0'
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
# invalid type
|
|
formdata.geolocations = None
|
|
item.photo_variable = '="bla"'
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
# invalid photo
|
|
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
|
upload.receive([open(os.path.join(os.path.dirname(__file__), 'template.odt'), 'rb').read()])
|
|
formdata.data = {'3': upload}
|
|
formdata.geolocations = None
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
|
|
def test_geolocate_map(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.geolocations = {'base': 'bla'}
|
|
formdef.fields = [
|
|
MapField(id='2', label='Map', type='map', varname='map'),
|
|
]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'2': '48.8337085;2.3233693'}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = GeolocateWorkflowStatusItem()
|
|
item.method = 'map_variable'
|
|
item.map_variable = '=form_var_map'
|
|
|
|
item.perform(formdata)
|
|
assert int(formdata.geolocations['base']['lat']) == 48
|
|
assert int(formdata.geolocations['base']['lon']) == 2
|
|
|
|
# invalid data
|
|
formdata.geolocations = None
|
|
item.map_variable = '=form_var'
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
|
|
def test_geolocate_overwrite(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.geolocations = {'base': 'bla'}
|
|
formdef.fields = [
|
|
MapField(id='2', label='Map', type='map', varname='map'),
|
|
]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'2': '48.8337085;2.3233693'}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = GeolocateWorkflowStatusItem()
|
|
item.method = 'map_variable'
|
|
item.map_variable = '=form_var_map'
|
|
|
|
item.perform(formdata)
|
|
assert int(formdata.geolocations['base']['lat']) == 48
|
|
assert int(formdata.geolocations['base']['lon']) == 2
|
|
|
|
formdata.data = {'2': '48.8337085;3.3233693'}
|
|
item.perform(formdata)
|
|
assert int(formdata.geolocations['base']['lat']) == 48
|
|
assert int(formdata.geolocations['base']['lon']) == 3
|
|
|
|
formdata.data = {'2': '48.8337085;4.3233693'}
|
|
item.overwrite = False
|
|
item.perform(formdata)
|
|
assert int(formdata.geolocations['base']['lat']) == 48
|
|
assert int(formdata.geolocations['base']['lon']) == 3
|
|
|
|
|
|
@pytest.mark.skipif(transform_to_pdf is None, reason='libreoffice not found')
|
|
def test_transform_to_pdf():
|
|
instream = open(os.path.join(os.path.dirname(__file__), 'template.odt'), 'rb')
|
|
outstream = transform_to_pdf(instream)
|
|
assert outstream is not False
|
|
assert outstream.read(10).startswith(b'%PDF-')
|
|
|
|
|
|
def test_export_to_model_image(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
FileField(id='3', label='File', type='file', varname='image'),
|
|
]
|
|
formdef.store()
|
|
|
|
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
|
image_data = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
|
|
upload.receive([image_data])
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'3': upload}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = ExportToModel()
|
|
item.convert_to_pdf = False
|
|
item.method = 'non-interactive'
|
|
template_filename = os.path.join(os.path.dirname(__file__), 'template-with-image.odt')
|
|
template = open(template_filename, 'rb').read()
|
|
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
|
|
upload.fp = BytesIO()
|
|
upload.fp.write(template)
|
|
upload.fp.seek(0)
|
|
item.model_file = UploadedFile(pub.app_dir, None, upload)
|
|
item.attach_to_history = True
|
|
|
|
item.perform(formdata)
|
|
|
|
assert formdata.evolution[-1].parts[-1].base_filename == 'template.odt'
|
|
zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
|
|
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
|
|
# check the image has been replaced by the one from the formdata
|
|
assert zinfo.file_size == len(image_data)
|
|
|
|
# check with missing data or wrong kind of data
|
|
for field_value in (None, 'wrong kind'):
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'3': field_value}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item.perform(formdata)
|
|
|
|
zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
|
|
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
|
|
# check the original image has been left
|
|
assert zinfo.file_size == 580
|
|
|
|
item.filename = 'formulaire-{{form_number}}/2.odt'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].parts[-1].base_filename == 'formulaire-%s-%s-2.odt' % (formdef.id,
|
|
formdata.id)
|
|
|
|
|
|
def test_export_to_model_backoffice_field(pub):
|
|
wf = Workflow(name='email with attachments')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
wf.backoffice_fields_formdef.fields = [
|
|
FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'),
|
|
]
|
|
st1 = wf.add_status('Status1')
|
|
wf.store()
|
|
formdef = FormDef()
|
|
formdef.name = 'foo-export-to-bofile'
|
|
formdef.fields = [
|
|
StringField(id='1', label='String', type='string', varname='string'),
|
|
]
|
|
formdef.workflow_id = wf.id
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = ExportToModel()
|
|
item.method = 'non-interactive'
|
|
item.convert_to_pdf = False
|
|
template_filename = os.path.join(os.path.dirname(__file__), 'template.odt')
|
|
template = open(template_filename, 'rb').read()
|
|
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
|
|
upload.fp = BytesIO()
|
|
upload.fp.write(template)
|
|
upload.fp.seek(0)
|
|
item.model_file = UploadedFile(pub.app_dir, None, upload)
|
|
item.parent = st1
|
|
item.backoffice_filefield_id = 'bo1'
|
|
item.perform(formdata)
|
|
|
|
assert 'bo1' in formdata.data
|
|
fbo1 = formdata.data['bo1']
|
|
assert fbo1.base_filename == 'template.odt'
|
|
assert fbo1.content_type == 'application/octet-stream'
|
|
zfile = zipfile.ZipFile(fbo1.get_file())
|
|
assert b'foo-export-to-bofile' in zfile.read('content.xml')
|
|
|
|
# no more 'bo1' backoffice field: do nothing
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
# id is not bo1:
|
|
wf.backoffice_fields_formdef.fields = [
|
|
FileField(id='bo2', label='bo field 2', type='file'),
|
|
]
|
|
item.perform(formdata)
|
|
assert formdata.data == {}
|
|
# field is not a field file:
|
|
wf.backoffice_fields_formdef.fields = [
|
|
StringField(id='bo1', label='bo field 1', type='string'),
|
|
]
|
|
item.perform(formdata)
|
|
assert formdata.data == {}
|
|
# no field at all:
|
|
wf.backoffice_fields_formdef.fields = []
|
|
item.perform(formdata)
|
|
assert formdata.data == {}
|
|
|
|
|
|
def test_export_to_model_django_template(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'foo-export-to-template-with-django'
|
|
formdef.fields = [
|
|
StringField(id='1', label='String', type='string', varname='string'),
|
|
]
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = ExportToModel()
|
|
item.method = 'non-interactive'
|
|
item.attach_to_history = True
|
|
template_filename = os.path.join(os.path.dirname(__file__), 'template-django.odt')
|
|
template = open(template_filename, 'rb').read()
|
|
upload = QuixoteUpload('/foo/template-django.odt', content_type='application/octet-stream')
|
|
upload.fp = BytesIO()
|
|
upload.fp.write(template)
|
|
upload.fp.seek(0)
|
|
item.model_file = UploadedFile(pub.app_dir, None, upload)
|
|
item.convert_to_pdf = False
|
|
item.perform(formdata)
|
|
|
|
new_content = zipfile.ZipFile(open(formdata.evolution[0].parts[0].filename, 'rb')).read('content.xml')
|
|
assert b'>foo-export-to-template-with-django<' in new_content
|
|
|
|
formdef.name = 'Name with a \' simple quote'
|
|
formdef.store()
|
|
item.perform(formdata)
|
|
|
|
new_content = zipfile.ZipFile(open(formdata.evolution[0].parts[1].filename, 'rb')).read('content.xml')
|
|
assert b'>Name with a \' simple quote<' in new_content
|
|
|
|
formdef.name = 'A <> name'
|
|
formdef.store()
|
|
item.perform(formdata)
|
|
|
|
new_content = zipfile.ZipFile(open(formdata.evolution[0].parts[2].filename, 'rb')).read('content.xml')
|
|
assert b'>A <> name<' in new_content
|
|
|
|
|
|
def test_global_timeouts(two_pubs):
|
|
pub = two_pubs
|
|
FormDef.wipe()
|
|
Workflow.wipe()
|
|
|
|
workflow = Workflow(name='global-timeouts')
|
|
workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
|
|
workflow.criticality_levels = [
|
|
WorkflowCriticalityLevel(name='green'),
|
|
WorkflowCriticalityLevel(name='yellow'),
|
|
WorkflowCriticalityLevel(name='red'),
|
|
]
|
|
action = workflow.add_global_action('Timeout Test')
|
|
item = action.append_item('modify_criticality')
|
|
trigger = action.append_trigger('timeout')
|
|
trigger.anchor = 'creation'
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
formdata1 = formdef.data_class()()
|
|
formdata1.just_created()
|
|
formdata1.store()
|
|
|
|
# delay isn't set yet, no crash
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
|
|
# delay didn't expire yet, no change
|
|
trigger.timeout = '2'
|
|
workflow.store()
|
|
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
|
|
formdata1.receipt_time = time.localtime(time.time()-3*86400)
|
|
formdata1.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
|
|
# make sure it's not triggered a second time
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
|
|
# change id so it's triggered again
|
|
trigger.id = 'XXX1'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'red'
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'red'
|
|
|
|
# reset formdata to initial state
|
|
formdata1.store()
|
|
|
|
trigger.anchor = '1st-arrival'
|
|
trigger.anchor_status_first = None
|
|
workflow.store()
|
|
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
|
|
formdata1.evolution[-1].time = time.localtime(time.time()-3*86400)
|
|
formdata1.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
|
|
formdata1.store() # reset
|
|
|
|
# bad (obsolete) status: do nothing
|
|
trigger.anchor_status_first = 'wf-foobar'
|
|
workflow.store()
|
|
formdata1.evolution[-1].time = time.localtime(time.time()-3*86400)
|
|
formdata1.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
formdata1.store()
|
|
|
|
trigger.anchor = 'latest-arrival'
|
|
trigger.anchor_status_latest = None
|
|
workflow.store()
|
|
|
|
formdata1.evolution[-1].time = time.localtime()
|
|
formdata1.store()
|
|
formdata1.jump_status('new')
|
|
formdata1.evolution[-1].time = time.localtime(time.time()-7*86400)
|
|
formdata1.jump_status('accepted')
|
|
formdata1.jump_status('new')
|
|
formdata1.evolution[-1].time = time.localtime(time.time()-1*86400)
|
|
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
|
|
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
|
|
formdata1.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
formdata1.store()
|
|
|
|
# limit trigger to formdata with "accepted" status
|
|
trigger.anchor_status_latest = 'wf-accepted'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
formdata1.store()
|
|
|
|
# limit trigger to formdata with "new" status
|
|
trigger.anchor_status_latest = 'wf-new'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
formdata1.store()
|
|
|
|
# bad (obsolete) status: do nothing
|
|
trigger.anchor_status_latest = 'wf-foobar'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
formdata1.store()
|
|
|
|
# check trigger is not run on finalized formdata
|
|
formdata1.jump_status('finished')
|
|
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
|
|
formdata1.store()
|
|
trigger.anchor = 'creation'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
formdata1.store()
|
|
|
|
# check trigger is run on finalized formdata when anchor status is an
|
|
# endpoint
|
|
formdata1.jump_status('finished')
|
|
formdata1.evolution[-1].last_jump_datetime = None
|
|
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
|
|
formdata1.store()
|
|
trigger.anchor = 'latest-arrival'
|
|
trigger.anchor_status_latest = 'wf-finished'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
formdata1.store()
|
|
|
|
# check "finalized" anchor
|
|
trigger.anchor = 'finalized'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
formdata1.store()
|
|
|
|
# use python expression as anchor
|
|
# timestamp
|
|
formdata1.jump_status('new')
|
|
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
|
|
formdata1.evolution[-1].last_jump_datetime = None
|
|
formdata1.store()
|
|
|
|
trigger.anchor = 'python'
|
|
trigger.anchor_expression = repr(time.time())
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
|
|
trigger.anchor = 'python'
|
|
trigger.anchor_expression = repr(time.time() - 10*86400)
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
formdata1.store()
|
|
|
|
# datetime object
|
|
trigger.anchor = 'python'
|
|
trigger.anchor_expression = 'datetime.datetime(%s, %s, %s, %s, %s)' % (
|
|
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:5]
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
formdata1.store()
|
|
|
|
# datetime object
|
|
trigger.anchor = 'python'
|
|
trigger.anchor_expression = 'datetime.date(%s, %s, %s)' % (
|
|
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:3]
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
formdata1.store()
|
|
|
|
# string object
|
|
trigger.anchor = 'python'
|
|
trigger.anchor_expression = '"%04d-%02d-%02d"' % (
|
|
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:3]
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
formdata1.store()
|
|
|
|
# invalid variable
|
|
trigger.anchor = 'python'
|
|
trigger.anchor_expression = 'Ellipsis'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
formdata1.store()
|
|
|
|
# invalid expression
|
|
trigger.anchor = 'python'
|
|
trigger.anchor_expression = 'XXX'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
formdata1.store()
|
|
|
|
|
|
def test_profile(two_pubs):
|
|
User = two_pubs.user_class
|
|
user = User()
|
|
user.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
StringField(id='1', label='Test', type='string', varname='foo'),
|
|
]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.user_id = user.id
|
|
formdata.data = {'1': 'bar@localhost'}
|
|
|
|
item = UpdateUserProfileStatusItem()
|
|
item.fields = [{'field_id': '__email', 'value': '=form_var_foo'}]
|
|
item.perform(formdata)
|
|
assert User.get(user.id).email == 'bar@localhost'
|
|
|
|
formdata.data = {'1': 'Plop'}
|
|
item.fields = [{'field_id': '__name', 'value': '=form_var_foo'}]
|
|
item.perform(formdata)
|
|
assert User.get(user.id).name == 'Plop'
|
|
|
|
item.fields = [{'field_id': '__name', 'value': 'dj{{form_var_foo}}'}]
|
|
item.perform(formdata)
|
|
assert User.get(user.id).name == 'djPlop'
|
|
item.fields = [{'field_id': '__name', 'value': 'ezt[form_var_foo]'}]
|
|
item.perform(formdata)
|
|
assert User.get(user.id).name == 'eztPlop'
|
|
|
|
from wcs.admin.settings import UserFieldsFormDef
|
|
formdef = UserFieldsFormDef(two_pubs)
|
|
formdef.fields = [
|
|
StringField(id='3', label='test', type='string', varname='plop'),
|
|
DateField(id='4', label='Date', type='date', varname='bar'),
|
|
]
|
|
formdef.store()
|
|
|
|
item.fields = [{'field_id': 'plop', 'value': '=form_var_foo'}]
|
|
item.perform(formdata)
|
|
assert User.get(user.id).form_data.get('3') == 'Plop'
|
|
assert not User.get(user.id).form_data.get('4')
|
|
|
|
# check transmission to IdP
|
|
get_publisher().cfg['sp'] = {'idp-manage-user-attributes': True}
|
|
get_publisher().cfg['idp'] = {'xxx': {'metadata_url': 'http://idp.example.net/idp/saml2/metadata'}}
|
|
get_publisher().write_cfg()
|
|
|
|
user = User.get(user.id)
|
|
user.name_identifiers = ['xyz']
|
|
user.store()
|
|
|
|
for date_value in (
|
|
'20/03/2018',
|
|
'=utils.make_date("20/03/2018")',
|
|
'=utils.make_date("20/03/2018").timetuple()'):
|
|
|
|
# check local value
|
|
item.fields = [{'field_id': 'bar', 'value': date_value}]
|
|
item.perform(formdata)
|
|
assert User.get(user.id).form_data.get('3') == 'Plop'
|
|
assert User.get(user.id).form_data.get('4').tm_year == 2018
|
|
|
|
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
|
|
http_patch_request.return_value = (None, 200, '', None)
|
|
get_response().process_after_jobs()
|
|
assert http_patch_request.call_count == 1
|
|
assert http_patch_request.call_args[0][1] == '{"bar": "2018-03-20"}'
|
|
|
|
for date_value in ('baddate', '', {}, [], None):
|
|
# reset date to a known value
|
|
user.form_data['4'] = datetime.datetime.now().timetuple()
|
|
user.store()
|
|
year = User.get(user.id).form_data.get('4').tm_year
|
|
# perform action
|
|
item.fields = [{'field_id': 'bar', 'value': date_value}]
|
|
item.perform(formdata)
|
|
if date_value not in (None, ''): # bad value : do nothing
|
|
assert User.get(user.id).form_data.get('4').tm_year == year
|
|
else: # empty value : empty field
|
|
assert User.get(user.id).form_data.get('4') == None
|
|
|
|
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
|
|
http_patch_request.return_value = (None, 200, '', None)
|
|
get_response().process_after_jobs()
|
|
assert http_patch_request.call_count == 1
|
|
if date_value not in (None, ''): # bad value : do nothing
|
|
assert http_patch_request.call_args[0][1] == '{}'
|
|
else: # empty value : null field
|
|
assert http_patch_request.call_args[0][1] == '{"bar": null}'
|
|
|
|
# out of http request/response cycle (cron, after_job)
|
|
two_pubs._set_request(None)
|
|
item.fields = [{'field_id': 'bar', 'value': '01/01/2020'}]
|
|
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
|
|
http_patch_request.return_value = (None, 200, '', None)
|
|
item.perform(formdata)
|
|
assert User.get(user.id).form_data.get('4').tm_year == 2020
|
|
assert http_patch_request.call_count == 1
|
|
assert http_patch_request.call_args[0][1] == '{"bar": "2020-01-01"}'
|
|
|
|
|
|
def test_set_backoffice_field(http_requests, two_pubs):
|
|
Workflow.wipe()
|
|
FormDef.wipe()
|
|
LoggedError.wipe()
|
|
wf = Workflow(name='xxx')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
wf.backoffice_fields_formdef.fields = [
|
|
StringField(id='bo1', label='1st backoffice field',
|
|
type='string', varname='backoffice_blah'),
|
|
]
|
|
st1 = wf.add_status('Status1')
|
|
wf.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
StringField(id='00', label='String', type='string', varname='string'),
|
|
StringField(id='01', label='Other string', type='string', varname='other'),
|
|
]
|
|
formdef.workflow_id = wf.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'00': 'HELLO'}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
two_pubs.substitutions.feed(formdata)
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
|
|
assert item.render_as_line() == 'Backoffice Data'
|
|
assert item.get_jump_label('plop') == 'Backoffice Data'
|
|
item.label = 'label'
|
|
assert item.render_as_line() == 'Backoffice Data (label)'
|
|
assert item.get_jump_label('plop') == 'Backoffice Data "label"'
|
|
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data.get('bo1') == None
|
|
|
|
item.fields = [{'field_id': 'bo1', 'value': '=form_var_string'}]
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == 'HELLO'
|
|
|
|
item.fields = [{'field_id': 'bo1', 'value': '{{ form_var_string }} WORLD'}]
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == 'HELLO WORLD'
|
|
|
|
item.fields = [{'field_id': 'bo1', 'value': '[form_var_string] GOODBYE'}]
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == 'HELLO GOODBYE'
|
|
|
|
item.fields = [{'field_id': 'bo1', 'value': '{{ form.var.string }} LAZY'}]
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == 'HELLO LAZY'
|
|
|
|
item.fields = [{'field_id': 'bo1', 'value': '=form.var.string'}] # lazy python
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == 'HELLO'
|
|
|
|
item.fields = [{'field_id': 'bo1', 'value': '=vars().get("form_var_string") + " PLOP"'}]
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == 'HELLO PLOP'
|
|
|
|
item.fields = [{'field_id': 'bo1',
|
|
'value': '=vars().get("form_var_other") or vars().get("form_var_string")'}]
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == 'HELLO'
|
|
|
|
item.fields = [{'field_id': 'bo1', 'value': '=vars().get("form_var_bouh", "X") + " PLOP"'}]
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == 'X PLOP'
|
|
|
|
assert LoggedError.count() == 0
|
|
|
|
item.fields = [{'field_id': 'bo1', 'value': '= ~ invalid python ~'}]
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert LoggedError.count() == 1
|
|
logged_error = LoggedError.select()[0]
|
|
assert logged_error.summary == 'Failed to compute Python expression'
|
|
assert logged_error.formdata_id == str(formdata.id)
|
|
assert logged_error.expression == ' ~ invalid python ~'
|
|
assert logged_error.expression_type == 'python'
|
|
assert logged_error.exception_class == 'SyntaxError'
|
|
assert logged_error.exception_message == 'invalid syntax (<string>, line 1)'
|
|
|
|
LoggedError.wipe()
|
|
item.fields = [{'field_id': 'bo1', 'value': '{% if bad django %}'}]
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert LoggedError.count() == 1
|
|
logged_error = LoggedError.select()[0]
|
|
assert logged_error.summary == 'Failed to compute template'
|
|
assert logged_error.formdata_id == str(formdata.id)
|
|
assert logged_error.expression == '{% if bad django %}'
|
|
assert logged_error.expression_type == 'template'
|
|
assert logged_error.exception_class == 'TemplateError'
|
|
assert logged_error.exception_message.startswith('syntax error in Django template')
|
|
|
|
|
|
def test_set_backoffice_field_file(http_requests, two_pubs):
|
|
Workflow.wipe()
|
|
FormDef.wipe()
|
|
wf = Workflow(name='xxx')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
wf.backoffice_fields_formdef.fields = [
|
|
FileField(id='bo1', label='1st backoffice field',
|
|
type='file', varname='backoffice_file'),
|
|
]
|
|
st1 = wf.add_status('Status1')
|
|
wf.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
FileField(id='00', label='File', type='file', varname='file'),
|
|
]
|
|
formdef.workflow_id = wf.id
|
|
formdef.store()
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': '=locals().get("form_var_file_raw")'}]
|
|
|
|
# the file does not exist
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
two_pubs.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == None
|
|
|
|
# store a PiclableUpload
|
|
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
|
upload.receive([open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()])
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'00': upload}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
two_pubs.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'].base_filename == 'test.jpeg'
|
|
assert formdata.data['bo1'].content_type == 'image/jpeg'
|
|
assert formdata.data['bo1'].get_content() == open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
|
|
|
|
# same test with PicklableUpload wcs.qommon.form
|
|
from wcs.qommon.form import PicklableUpload as PicklableUpload2
|
|
upload2 = PicklableUpload2('test2.odt', 'application/vnd.oasis.opendocument.text')
|
|
upload2.receive([open(os.path.join(os.path.dirname(__file__), 'template.odt'), 'rb').read()])
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'00': upload2}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
two_pubs.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'].base_filename == 'test2.odt'
|
|
assert formdata.data['bo1'].content_type == 'application/vnd.oasis.opendocument.text'
|
|
assert formdata.data['bo1'].get_content() == open(os.path.join(os.path.dirname(__file__), 'template.odt'), 'rb').read()
|
|
|
|
# check storing response as attachment
|
|
two_pubs.substitutions.feed(formdata)
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.varname = 'xxx'
|
|
item.response_type = 'attachment'
|
|
item.record_errors = True
|
|
item.perform(formdata)
|
|
attachment = formdata.evolution[-1].parts[-1]
|
|
assert isinstance(attachment, AttachmentEvolutionPart)
|
|
assert attachment.base_filename == 'xxx.xml'
|
|
assert attachment.content_type == 'text/xml'
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
two_pubs.substitutions.feed(formdata)
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': '=attachments.xxx'}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'].base_filename == 'xxx.xml'
|
|
|
|
# check storing a file from an assembled dictionary
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1',
|
|
'value': '={"content": "hello world", "filename": "hello.txt"}'}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'].base_filename == 'hello.txt'
|
|
assert formdata.data['bo1'].get_content() == b'hello world'
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1',
|
|
'value': '={"b64_content": "SEVMTE8gV09STEQ=", "filename": "hello.txt"}'}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'].base_filename == 'hello.txt'
|
|
assert formdata.data['bo1'].get_content() == b'HELLO WORLD'
|
|
|
|
hello_world = formdata.data['bo1']
|
|
# check wrong value, or None (no file)
|
|
for value in ('="HELLO"', '', 'BAD', '={}', '=[]', None):
|
|
formdata.data['bo1'] = hello_world
|
|
formdata.store()
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': value}]
|
|
|
|
LoggedError.wipe()
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
if value is not None: # wrong value : do nothing
|
|
assert formdata.data['bo1'].base_filename == 'hello.txt'
|
|
assert formdata.data['bo1'].get_content() == b'HELLO WORLD'
|
|
assert LoggedError.count() == 1
|
|
logged_error = LoggedError.select()[0]
|
|
assert logged_error.summary.startswith('Failed to convert')
|
|
assert logged_error.formdata_id == str(formdata.id)
|
|
assert logged_error.exception_class == 'ValueError'
|
|
else: # empty value : remove field
|
|
assert formdata.data['bo1'] is None
|
|
assert LoggedError.count() == 0
|
|
|
|
# check wrong field
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo3', 'value': '=form_var_file_raw'}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data.get('bo1') is None
|
|
assert formdata.data.get('bo3') is None
|
|
|
|
|
|
def test_set_backoffice_field_item(two_pubs):
|
|
Workflow.wipe()
|
|
FormDef.wipe()
|
|
wf = Workflow(name='xxx')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
st1 = wf.add_status('Status1')
|
|
wf.backoffice_fields_formdef.fields = [
|
|
ItemField(id='bo1', label='1st backoffice field',
|
|
type='item', varname='backoffice_item',
|
|
items=['a', 'b', 'c']),
|
|
]
|
|
wf.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_id = wf.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': 'a'}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == 'a'
|
|
assert formdata.data['bo1_display'] == 'a'
|
|
|
|
datasource = {'type': 'formula',
|
|
'value': repr([('a', 'aa'), ('b', 'bb'), ('c', 'cc')])}
|
|
|
|
wf.backoffice_fields_formdef.fields = [
|
|
ItemField(id='bo1', label='1st backoffice field',
|
|
type='item', varname='backoffice_item',
|
|
data_source=datasource),
|
|
]
|
|
wf.store()
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': 'a'}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == 'a'
|
|
assert formdata.data['bo1_display'] == 'aa'
|
|
|
|
datasource = {'type': 'formula',
|
|
'value': repr([{'id': 'a', 'text': 'aa', 'more': 'aaa'},
|
|
{'id': 'b', 'text': 'bb', 'more': 'bbb'}])}
|
|
|
|
wf.backoffice_fields_formdef.fields = [
|
|
ItemField(id='bo1', label='1st backoffice field',
|
|
type='item', varname='backoffice_item',
|
|
data_source=datasource),
|
|
]
|
|
wf.store()
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': 'a'}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == 'a'
|
|
assert formdata.data['bo1_display'] == 'aa'
|
|
assert formdata.data['bo1_structured'] == {'id': 'a', 'more': 'aaa', 'text': 'aa'}
|
|
|
|
# check when assigning using the display value
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': 'aa'}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == 'a'
|
|
assert formdata.data['bo1_display'] == 'aa'
|
|
assert formdata.data['bo1_structured'] == {'id': 'a', 'more': 'aaa', 'text': 'aa'}
|
|
|
|
# check with unknown value
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': 'foobar'}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == 'foobar'
|
|
assert formdata.data.get('bo1_display') is None
|
|
assert formdata.data.get('bo1_structured') is None
|
|
|
|
|
|
def test_set_backoffice_field_items(two_pubs):
|
|
Workflow.wipe()
|
|
FormDef.wipe()
|
|
wf = Workflow(name='xxx')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
st1 = wf.add_status('Status1')
|
|
wf.backoffice_fields_formdef.fields = [
|
|
ItemsField(id='bo1', label='1st backoffice field',
|
|
type='items', varname='backoffice_item',
|
|
items=['a', 'b', 'c']),
|
|
]
|
|
wf.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_id = wf.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': "=['a', 'b']"}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == ['a', 'b']
|
|
assert formdata.data['bo1_display'] == 'a, b'
|
|
|
|
datasource = {'type': 'formula',
|
|
'value': repr([('a', 'aa'), ('b', 'bb'), ('c', 'cc')])}
|
|
|
|
wf.backoffice_fields_formdef.fields = [
|
|
ItemsField(id='bo1', label='1st backoffice field',
|
|
type='items', varname='backoffice_item',
|
|
data_source=datasource),
|
|
]
|
|
wf.store()
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': "=['a', 'b']"}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == ['a', 'b']
|
|
assert formdata.data['bo1_display'] == 'aa, bb'
|
|
|
|
datasource = {'type': 'formula',
|
|
'value': repr([{'id': 'a', 'text': 'aa', 'more': 'aaa'},
|
|
{'id': 'b', 'text': 'bb', 'more': 'bbb'},
|
|
{'id': 'c', 'text': 'cc', 'more': 'ccc'}])}
|
|
|
|
wf.backoffice_fields_formdef.fields = [
|
|
ItemsField(id='bo1', label='1st backoffice field',
|
|
type='items', varname='backoffice_item',
|
|
data_source=datasource),
|
|
]
|
|
wf.store()
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': "=['a', 'c']"}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] == ['a', 'c']
|
|
assert formdata.data['bo1_display'] == 'aa, cc'
|
|
assert len(formdata.data['bo1_structured']) == 2
|
|
assert {'id': 'a', 'more': 'aaa', 'text': 'aa'} in formdata.data['bo1_structured']
|
|
assert {'id': 'c', 'more': 'ccc', 'text': 'cc'} in formdata.data['bo1_structured']
|
|
|
|
|
|
def test_set_backoffice_field_date(two_pubs):
|
|
Workflow.wipe()
|
|
FormDef.wipe()
|
|
LoggedError.wipe()
|
|
wf = Workflow(name='xxx')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
st1 = wf.add_status('Status1')
|
|
wf.backoffice_fields_formdef.fields = [
|
|
DateField(id='bo1', label='1st backoffice field',
|
|
type='date', varname='backoffice_date'),
|
|
]
|
|
wf.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_id = wf.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': "=utils.today()"}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date.today()
|
|
|
|
formdata.data['bo1'] = None
|
|
formdata.store()
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': '{% now "j/n/Y" %}'}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date.today()
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': "23/3/2017"}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
|
|
|
|
# invalid values => do nothing
|
|
assert LoggedError.count() == 0
|
|
for value in ('plop', {}, []):
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': value}]
|
|
|
|
LoggedError.wipe()
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
|
|
assert LoggedError.count() == 1
|
|
assert LoggedError.select()[0].summary.startswith('Failed to convert')
|
|
|
|
# None : empty date
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
item.fields = [{'field_id': 'bo1', 'value': None}]
|
|
item.perform(formdata)
|
|
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data['bo1'] is None
|
|
|
|
|
|
def test_set_backoffice_field_immediate_use(http_requests, two_pubs):
|
|
Workflow.wipe()
|
|
FormDef.wipe()
|
|
wf = Workflow(name='xxx')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
wf.backoffice_fields_formdef.fields = [
|
|
StringField(id='bo1', label='1st backoffice field',
|
|
type='string', varname='backoffice_blah'),
|
|
StringField(id='bo2', label='2nd backoffice field',
|
|
type='string', varname='backoffice_barr'),
|
|
]
|
|
st1 = wf.add_status('Status1')
|
|
wf.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
StringField(id='00', label='String', type='string', varname='string'),
|
|
]
|
|
formdef.workflow_id = wf.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'00': 'HELLO'}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.parent = st1
|
|
|
|
item.fields = [
|
|
{'field_id': 'bo1', 'value': '{{form_var_string}}'},
|
|
]
|
|
two_pubs.substitutions.reset()
|
|
two_pubs.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data.get('bo1') == 'HELLO'
|
|
|
|
item.fields = [
|
|
{'field_id': 'bo1', 'value': 'WORLD'},
|
|
]
|
|
two_pubs.substitutions.reset()
|
|
two_pubs.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data.get('bo1') == 'WORLD'
|
|
|
|
item.fields = [
|
|
{'field_id': 'bo1', 'value': 'X{{form_var_string}}X'},
|
|
{'field_id': 'bo2', 'value': "Y{{form_var_backoffice_blah}}Y"},
|
|
]
|
|
two_pubs.substitutions.reset()
|
|
two_pubs.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
assert formdata.data.get('bo1') == 'XHELLOX'
|
|
assert formdata.data.get('bo2') == 'YXHELLOXY'
|
|
|
|
|
|
def test_redirect_to_url(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
StringField(id='1', label='Test', type='string', varname='foo'),
|
|
]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'1': 'bar'}
|
|
|
|
item = RedirectToUrlWorkflowStatusItem()
|
|
assert item.render_as_line() == 'Web Redirection (not configured)'
|
|
item.url = 'https://www.example.net/?foo=[form_var_foo]'
|
|
assert item.render_as_line() == 'Web Redirection (to https://www.example.net/?foo=[form_var_foo])'
|
|
pub.substitutions.feed(formdata)
|
|
assert item.perform(formdata) == 'https://www.example.net/?foo=bar'
|
|
|
|
item.url = 'https://www.example.net/?django={{ form_var_foo }}'
|
|
assert item.render_as_line() == 'Web Redirection (to https://www.example.net/?django={{ form_var_foo }})'
|
|
pub.substitutions.feed(formdata)
|
|
assert item.perform(formdata) == 'https://www.example.net/?django=bar'
|
|
|
|
item.url = '[if-any nada]https://www.example.net/[end]'
|
|
pub.substitutions.feed(formdata)
|
|
assert item.perform(formdata) == None
|
|
|
|
item.url = '{% if nada %}https://www.example.net/{% endif %}'
|
|
pub.substitutions.feed(formdata)
|
|
assert item.perform(formdata) == None
|
|
|
|
|
|
def test_workflow_jump_condition_migration(pub):
|
|
workflow = Workflow(name='jump condition migration')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
workflow.store()
|
|
|
|
reloaded_workflow = Workflow.get(workflow.id)
|
|
assert reloaded_workflow.possible_status[0].items[0].condition is None
|
|
|
|
jump.condition = 'foobar'
|
|
workflow.store()
|
|
reloaded_workflow = Workflow.get(workflow.id)
|
|
assert reloaded_workflow.possible_status[0].items[0].condition == {
|
|
'type': 'python', 'value': 'foobar'}
|
|
|
|
|
|
def test_workflow_action_condition(two_pubs):
|
|
pub = two_pubs
|
|
pub._set_request(None) # to avoid after jobs
|
|
workflow = Workflow(name='jump condition migration')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
workflow.store()
|
|
|
|
role = Role(name='bar1')
|
|
role.store()
|
|
|
|
user = pub.user_class()
|
|
user.roles = [role.id]
|
|
user.store()
|
|
|
|
choice = ChoiceWorkflowStatusItem()
|
|
choice.by = [role.id]
|
|
choice.id = '_x'
|
|
st1.items.append(choice)
|
|
choice.parent = st1
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
formdata1 = formdef.data_class()()
|
|
formdata1.data = {'1': 'foo'}
|
|
formdata1.just_created()
|
|
formdata1.store()
|
|
|
|
formdata2 = formdef.data_class()()
|
|
formdata2.data = {'2': 'bar'}
|
|
formdata2.just_created()
|
|
formdata2.store()
|
|
|
|
assert formdata1.get_actions_roles() == set([role.id])
|
|
assert formdata2.get_actions_roles() == set([role.id])
|
|
|
|
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 2
|
|
|
|
choice.condition = {'type': 'python', 'value': 'form_var_foo == "foo"'}
|
|
workflow.store()
|
|
|
|
with pub.substitutions.temporary_feed(formdata1):
|
|
assert FormDef.get(formdef.id).data_class().get(formdata1.id).get_actions_roles() == set([role.id])
|
|
with pub.substitutions.temporary_feed(formdata2):
|
|
assert FormDef.get(formdef.id).data_class().get(formdata2.id).get_actions_roles() == set()
|
|
|
|
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 1
|
|
|
|
# check with a formdef condition
|
|
choice.condition = {'type': 'python', 'value': 'form_name == "test"'}
|
|
workflow.store()
|
|
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 0
|
|
|
|
choice.condition = {'type': 'python', 'value': 'form_name == "baz"'}
|
|
workflow.store()
|
|
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 2
|
|
|
|
# bad condition
|
|
LoggedError.wipe()
|
|
choice.condition = {'type': 'python', 'value': 'foobar == barfoo'}
|
|
workflow.store()
|
|
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 0
|
|
assert LoggedError.count() == 1
|
|
logged_error = LoggedError.select()[0]
|
|
assert logged_error.occurences_count > 1 # should be 2... == 12 with pickle, 4 with sql
|
|
assert logged_error.summary == 'Failed to evaluate condition'
|
|
assert logged_error.exception_class == 'NameError'
|
|
assert logged_error.exception_message == "name 'foobar' is not defined"
|
|
assert logged_error.expression == 'foobar == barfoo'
|
|
assert logged_error.expression_type == 'python'
|
|
|
|
|
|
def test_notifications(pub, http_requests):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
assert not SendNotificationWorkflowStatusItem.is_available()
|
|
|
|
if not pub.site_options.has_section('variables'):
|
|
pub.site_options.add_section('variables')
|
|
pub.site_options.set('variables', 'portal_url', 'https://portal/')
|
|
assert SendNotificationWorkflowStatusItem.is_available()
|
|
|
|
item = SendNotificationWorkflowStatusItem()
|
|
assert item.to == ['_submitter']
|
|
item.title = 'xxx'
|
|
item.body = 'XXX'
|
|
|
|
# no user
|
|
http_requests.empty()
|
|
item.perform(formdata)
|
|
assert http_requests.count() == 0
|
|
|
|
# user
|
|
http_requests.empty()
|
|
user = pub.user_class()
|
|
user.name_identifiers = ['xxx']
|
|
user.store()
|
|
formdata.user_id = user.id
|
|
formdata.store()
|
|
|
|
item.perform(formdata)
|
|
assert http_requests.count() == 1
|
|
assert http_requests.get_last('url') == 'https://portal/api/notification/add/?NameID=xxx'
|
|
assert json.loads(http_requests.get_last('body')) == {
|
|
'body': 'XXX',
|
|
'url': formdata.get_url(),
|
|
'id': 'formdata:%s' % formdata.get_display_id(),
|
|
'origin': '',
|
|
'summary': 'xxx'
|
|
}
|
|
|
|
# roles (not exposed in current UI)
|
|
http_requests.empty()
|
|
|
|
role = Role(name='blah')
|
|
role.store()
|
|
|
|
user1 = pub.user_class()
|
|
user1.roles = [role.id]
|
|
user1.name_identifiers = ['xxy1']
|
|
user1.store()
|
|
user2 = pub.user_class()
|
|
user2.roles = [role.id]
|
|
user2.name_identifiers = ['xxy2']
|
|
user2.store()
|
|
|
|
formdef.workflow_roles = {'_receiver': role.id}
|
|
|
|
item.to = ['_receiver']
|
|
item.perform(formdata)
|
|
assert http_requests.count() == 2
|
|
assert set(x['url'] for x in http_requests.requests) == set([
|
|
'https://portal/api/notification/add/?NameID=xxy1',
|
|
'https://portal/api/notification/add/?NameID=xxy2'])
|
|
|
|
|
|
def test_workflow_field_migration(pub):
|
|
Workflow.wipe()
|
|
wf = Workflow(name='wf with backoffice field')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
wf.backoffice_fields_formdef.fields = [
|
|
StringField(id='bo1', label='bo field 1', type='string', in_listing=True),
|
|
]
|
|
wf.add_status('Status1')
|
|
wf.store()
|
|
|
|
wf = Workflow.get(wf.id)
|
|
assert wf.backoffice_fields_formdef.fields[0].display_locations == ['validation', 'summary', 'listings']
|