6121 lines
212 KiB
Python
6121 lines
212 KiB
Python
import base64
|
||
import json
|
||
import datetime
|
||
import os
|
||
import pytest
|
||
import shutil
|
||
import time
|
||
import zipfile
|
||
|
||
import mock
|
||
|
||
try:
|
||
from PIL import Image
|
||
except ImportError:
|
||
Image = None
|
||
|
||
from django.utils import six
|
||
from django.utils.encoding import force_bytes, force_text
|
||
from django.utils.six import BytesIO, StringIO
|
||
from django.utils.six.moves.urllib import parse as urlparse
|
||
|
||
from quixote import cleanup, get_response
|
||
from wcs.qommon.errors import ConnectionError
|
||
from quixote.http_request import Upload as QuixoteUpload
|
||
from wcs.qommon.http_request import HTTPRequest
|
||
from wcs.qommon.form import PicklableUpload
|
||
from wcs.qommon.form import *
|
||
|
||
from wcs.blocks import BlockDef
|
||
from wcs.formdef import FormDef
|
||
from wcs.carddef import CardDef
|
||
from wcs import sessions
|
||
from wcs.fields import (StringField, DateField, MapField, FileField, ItemField,
|
||
ItemsField, CommentField, EmailField, PageField, TitleField,
|
||
SubtitleField, TextField, BoolField, TableField, BlockField)
|
||
from wcs.formdata import Evolution
|
||
from wcs.roles import Role
|
||
from wcs.workflows import (Workflow, WorkflowStatusItem,
|
||
SendmailWorkflowStatusItem, SendSMSWorkflowStatusItem,
|
||
CommentableWorkflowStatusItem, ChoiceWorkflowStatusItem,
|
||
DisplayMessageWorkflowStatusItem,
|
||
AbortActionException, WorkflowCriticalityLevel,
|
||
AttachmentEvolutionPart, WorkflowBackofficeFieldsFormDef,
|
||
WorkflowVariablesFieldsFormDef,
|
||
perform_items)
|
||
from wcs.wf.aggregation_email import (AggregationEmailWorkflowStatusItem,
|
||
AggregationEmail, send_aggregation_emails)
|
||
from wcs.wf.anonymise import AnonymiseWorkflowStatusItem
|
||
from wcs.wf.criticality import ModifyCriticalityWorkflowStatusItem, MODE_INC, MODE_DEC, MODE_SET
|
||
from wcs.wf.dispatch import DispatchWorkflowStatusItem
|
||
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
|
||
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
|
||
from wcs.wf.timeout_jump import TimeoutWorkflowStatusItem
|
||
from wcs.wf.profile import UpdateUserProfileStatusItem
|
||
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem, JournalEvolutionPart
|
||
from wcs.wf.remove import RemoveWorkflowStatusItem
|
||
from wcs.wf.roles import AddRoleWorkflowStatusItem, RemoveRoleWorkflowStatusItem
|
||
from wcs.wf.wscall import WebserviceCallStatusItem
|
||
from wcs.wf.export_to_model import ExportToModel, transform_to_pdf
|
||
from wcs.wf.geolocate import GeolocateWorkflowStatusItem
|
||
from wcs.wf.backoffice_fields import SetBackofficeFieldsWorkflowStatusItem
|
||
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
|
||
from wcs.wf.notification import SendNotificationWorkflowStatusItem
|
||
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
|
||
from wcs.wf.create_carddata import CreateCarddataWorkflowStatusItem
|
||
from wcs.wf.edit_carddata import EditCarddataWorkflowStatusItem
|
||
from wcs.wf.external_workflow import ExternalWorkflowGlobalAction
|
||
|
||
|
||
from utilities import (create_temporary_pub, MockSubstitutionVariables,
|
||
clean_temporary_pub)
|
||
|
||
|
||
def setup_module(module):
|
||
cleanup()
|
||
|
||
|
||
def teardown_module(module):
|
||
clean_temporary_pub()
|
||
|
||
|
||
def pytest_generate_tests(metafunc):
|
||
if 'two_pubs' in metafunc.fixturenames:
|
||
metafunc.parametrize('two_pubs', ['pickle', 'sql'], indirect=True)
|
||
|
||
|
||
@pytest.fixture
|
||
def pub(request):
|
||
pub = create_temporary_pub()
|
||
pub.cfg['language'] = {'language': 'en'}
|
||
pub.write_cfg()
|
||
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
|
||
req.response.filter = {}
|
||
req._user = None
|
||
pub._set_request(req)
|
||
req.session = sessions.BasicSession(id=1)
|
||
pub.set_config(req)
|
||
return pub
|
||
|
||
|
||
@pytest.fixture
|
||
def two_pubs(request):
|
||
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
|
||
pub.cfg['language'] = {'language': 'en'}
|
||
pub.write_cfg()
|
||
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
|
||
req.response.filter = {}
|
||
req._user = None
|
||
pub._set_request(req)
|
||
req.session = sessions.BasicSession(id=1)
|
||
pub.set_config(req)
|
||
return pub
|
||
|
||
|
||
def test_get_json_export_dict(pub):
|
||
workflow = Workflow(name='wf')
|
||
st1 = workflow.add_status('Status1', 'st1')
|
||
st2 = workflow.add_status('Status2', 'st2')
|
||
st2.forced_endpoint = True
|
||
|
||
jump = JumpWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
jump.by = ['_submitter', '_receiver']
|
||
jump.timeout = 0.1
|
||
jump.status = 'st2'
|
||
st1.items.append(jump)
|
||
jump.parent = st1
|
||
|
||
workflow.roles['_other'] = 'Other Function'
|
||
root = workflow.get_json_export_dict()
|
||
assert set(root.keys()) >= set(['statuses', 'name', 'functions'])
|
||
|
||
assert root['name'] == 'wf'
|
||
assert len(root['statuses']) == 2
|
||
assert set(st['id'] for st in root['statuses']) == set(['st1', 'st2'])
|
||
assert all(set(status.keys()) >= set(['id', 'name', 'forced_endpoint']) for status in
|
||
root['statuses'])
|
||
assert root['statuses'][0]['id'] == 'st1'
|
||
assert root['statuses'][0]['name'] == 'Status1'
|
||
assert root['statuses'][0]['forced_endpoint'] is False
|
||
assert root['statuses'][0]['endpoint'] is False
|
||
assert root['statuses'][1]['id'] == 'st2'
|
||
assert root['statuses'][1]['name'] == 'Status2'
|
||
assert root['statuses'][1]['forced_endpoint'] is True
|
||
assert root['statuses'][1]['endpoint'] is True
|
||
|
||
|
||
def test_variable_compute(pub):
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'foobar'
|
||
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'1': 'hello'}
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item = JumpWorkflowStatusItem()
|
||
|
||
# straight string
|
||
assert item.compute('blah') == 'blah'
|
||
|
||
# django template
|
||
assert item.compute('{{ form_var_foo }}') == 'hello'
|
||
assert item.compute('{{ form_var_foo }}', render=False) == '{{ form_var_foo }}'
|
||
assert item.compute('{% if form_var_foo %}its here{% endif %}') == 'its here'
|
||
assert item.compute('{% if form_var_foo %}') == '{% if form_var_foo %}'
|
||
with pytest.raises(Exception):
|
||
item.compute('{% if form_var_foo %}', raises=True)
|
||
|
||
# ezt string
|
||
assert item.compute('[form_var_foo]') == 'hello'
|
||
# ezt string, but not ezt asked
|
||
assert item.compute('[form_var_foo]', render=False) == '[form_var_foo]'
|
||
# ezt string, with an error
|
||
assert item.compute('[end]', raises=False) == '[end]'
|
||
with pytest.raises(Exception):
|
||
item.compute('[end]', raises=True)
|
||
|
||
# python expression
|
||
assert item.compute('=form_var_foo') == 'hello'
|
||
# python expression, with an error
|
||
assert item.compute('=1/0', raises=False) == '=1/0'
|
||
with pytest.raises(Exception):
|
||
item.compute('=1/0', raises=True)
|
||
|
||
# with context
|
||
assert item.compute('{{ form_var_foo }} {{ bar }}', context={'bar': 'world'}) == 'hello world'
|
||
assert item.compute('[form_var_foo] [bar]', context={'bar': 'world'}) == 'hello world'
|
||
assert item.compute('=form_var_foo + " " + bar', context={'bar': 'world'}) == 'hello world'
|
||
|
||
# django wins
|
||
assert item.compute('{{ form_var_foo }} [bar]', context={'bar': 'world'}) == 'hello [bar]'
|
||
|
||
# django template, no escaping by default
|
||
formdata.data = {'1': '<b>hello</b>'}
|
||
formdata.store()
|
||
assert item.compute('{{ form_var_foo }}') == '<b>hello</b>' # autoescape off by default
|
||
assert item.compute('{{ form_var_foo|safe }}') == '<b>hello</b>' # no escaping (implicit |safe)
|
||
assert item.compute('{{ form_var_foo|escape }}') == '<b>hello</b>' #escaping
|
||
|
||
|
||
def test_variable_compute_dates(pub):
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'foobar'
|
||
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'1': '2017-07-17'}
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item = JumpWorkflowStatusItem()
|
||
|
||
assert item.compute('=date(form_var_foo)') == datetime.date(2017, 7, 17)
|
||
assert item.compute('=date(form_var_foo) + days(1)') == datetime.date(2017, 7, 18)
|
||
assert item.compute('=date(2017, 7, 18)') == datetime.date(2017, 7, 18)
|
||
|
||
|
||
def test_jump_nothing(pub):
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'foobar'
|
||
formdef.store()
|
||
formdata = formdef.data_class()()
|
||
item = JumpWorkflowStatusItem()
|
||
assert item.must_jump(formdata) is True
|
||
|
||
|
||
def test_jump_datetime_condition(pub):
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'foobar'
|
||
formdef.store()
|
||
formdata = formdef.data_class()()
|
||
item = JumpWorkflowStatusItem()
|
||
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
||
item.condition = {'type': 'python', 'value': 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
|
||
yesterday.timetuple()[:3]}
|
||
assert item.must_jump(formdata) is True
|
||
|
||
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
|
||
item.condition = {'type': 'python', 'value': 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
|
||
tomorrow.timetuple()[:3]}
|
||
assert item.must_jump(formdata) is False
|
||
|
||
|
||
def test_jump_date_conditions(pub):
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'foobar'
|
||
formdef.fields = [DateField(id='2', label='Date', type='date', varname='date')]
|
||
formdef.store()
|
||
|
||
# create/store/get, to make sure the date format is acceptable
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'2': DateField().convert_value_from_str('2015-01-04')}
|
||
formdata.store()
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item = JumpWorkflowStatusItem()
|
||
item.condition = {
|
||
'type': 'python',
|
||
'value': 'utils.make_date(form_var_date) == utils.make_date("2015-01-04")'}
|
||
assert item.must_jump(formdata) is True
|
||
|
||
item = JumpWorkflowStatusItem()
|
||
item.condition = {
|
||
'type': 'python',
|
||
'value': 'utils.time_delta(form_var_date, "2015-01-04").days == 0'}
|
||
assert item.must_jump(formdata) is True
|
||
|
||
item = JumpWorkflowStatusItem()
|
||
item.condition = {
|
||
'type': 'python',
|
||
'value': 'utils.time_delta(utils.today(), "2015-01-04").days > 0'}
|
||
assert item.must_jump(formdata) is True
|
||
|
||
item = JumpWorkflowStatusItem()
|
||
item.condition = {
|
||
'type': 'python',
|
||
'value': 'utils.time_delta(datetime.datetime.now(), "2015-01-04").days > 0'}
|
||
assert item.must_jump(formdata) is True
|
||
|
||
item = JumpWorkflowStatusItem()
|
||
item.condition = {
|
||
'type': 'python',
|
||
'value': 'utils.time_delta(utils.time.localtime(), "2015-01-04").days > 0'}
|
||
assert item.must_jump(formdata) is True
|
||
|
||
|
||
def test_jump_count_condition(pub):
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'foobar'
|
||
formdef.store()
|
||
pub.substitutions.feed(formdef)
|
||
formdef.data_class().wipe()
|
||
formdata = formdef.data_class()()
|
||
item = JumpWorkflowStatusItem()
|
||
item.condition = {'type': 'python', 'value': 'form_objects.count < 2'}
|
||
assert item.must_jump(formdata) is True
|
||
|
||
for i in range(10):
|
||
formdata = formdef.data_class()()
|
||
formdata.store()
|
||
|
||
item.condition = {'type': 'python', 'value': 'form_objects.count < 2'}
|
||
assert item.must_jump(formdata) is False
|
||
|
||
|
||
def test_jump_bad_python_condition(two_pubs):
|
||
if not two_pubs.is_using_postgresql():
|
||
pytest.skip('this requires SQL')
|
||
return
|
||
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'foobar'
|
||
formdef.store()
|
||
two_pubs.substitutions.feed(formdef)
|
||
formdef.data_class().wipe()
|
||
formdata = formdef.data_class()()
|
||
item = JumpWorkflowStatusItem()
|
||
|
||
two_pubs.loggederror_class.wipe()
|
||
item.condition = {'type': 'python', 'value': 'form_var_foobar == 0'}
|
||
assert item.must_jump(formdata) is False
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
logged_error = two_pubs.loggederror_class.select()[0]
|
||
assert logged_error.summary == 'Failed to evaluate condition'
|
||
assert logged_error.exception_class == 'NameError'
|
||
assert logged_error.exception_message == "name 'form_var_foobar' is not defined"
|
||
assert logged_error.expression == 'form_var_foobar == 0'
|
||
assert logged_error.expression_type == 'python'
|
||
|
||
two_pubs.loggederror_class.wipe()
|
||
item.condition = {'type': 'python', 'value': '~ invalid ~'}
|
||
assert item.must_jump(formdata) is False
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
logged_error = two_pubs.loggederror_class.select()[0]
|
||
assert logged_error.summary == 'Failed to evaluate condition'
|
||
assert logged_error.exception_class == 'SyntaxError'
|
||
assert logged_error.exception_message == 'unexpected EOF while parsing (<string>, line 1)'
|
||
assert logged_error.expression == '~ invalid ~'
|
||
assert logged_error.expression_type == 'python'
|
||
|
||
|
||
def test_jump_django_conditions(two_pubs):
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'foobar'
|
||
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
|
||
formdef.store()
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'1': 'hello'}
|
||
two_pubs.substitutions.feed(formdata)
|
||
item = JumpWorkflowStatusItem()
|
||
|
||
if two_pubs.is_using_postgresql():
|
||
two_pubs.loggederror_class.wipe()
|
||
item.condition = {'type': 'django', 'value': '1 < 2'}
|
||
assert item.must_jump(formdata) is True
|
||
|
||
item.condition = {'type': 'django', 'value': 'form_var_foo == "hello"'}
|
||
assert item.must_jump(formdata) is True
|
||
|
||
item.condition = {'type': 'django', 'value': 'form_var_foo|first|upper == "H"'}
|
||
assert item.must_jump(formdata) is True
|
||
|
||
item.condition = {'type': 'django', 'value': 'form_var_foo|first|upper == "X"'}
|
||
assert item.must_jump(formdata) is False
|
||
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 0
|
||
|
||
item.condition = {'type': 'django', 'value': '~ invalid ~'}
|
||
assert item.must_jump(formdata) is False
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
logged_error = two_pubs.loggederror_class.select()[0]
|
||
assert logged_error.summary == 'Failed to evaluate condition'
|
||
assert logged_error.exception_class == 'TemplateSyntaxError'
|
||
assert logged_error.exception_message == "Could not parse the remainder: '~' from '~'"
|
||
assert logged_error.expression == '~ invalid ~'
|
||
assert logged_error.expression_type == 'django'
|
||
|
||
|
||
def test_check_auth(pub):
|
||
user = pub.user_class(name='foo')
|
||
user.store()
|
||
|
||
role = Role(name='bar1')
|
||
role.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
|
||
status_item = WorkflowStatusItem()
|
||
assert status_item.check_auth(formdata, user) is True
|
||
|
||
status_item.by = []
|
||
assert status_item.check_auth(formdata, user) is False
|
||
|
||
status_item.by = ['logged-users']
|
||
assert status_item.check_auth(formdata, user) is True
|
||
|
||
status_item.by = [role.id]
|
||
assert status_item.check_auth(formdata, user) is False
|
||
status_item.by = [int(role.id)]
|
||
assert status_item.check_auth(formdata, user) is False
|
||
|
||
user.roles = [role.id]
|
||
status_item.by = [role.id]
|
||
assert status_item.check_auth(formdata, user) is True
|
||
status_item.by = [int(role.id)]
|
||
assert status_item.check_auth(formdata, user) is True
|
||
|
||
status_item.by = ['_submitter']
|
||
assert status_item.check_auth(formdata, user) is False
|
||
formdata.user_id = user.id
|
||
assert status_item.check_auth(formdata, user) is True
|
||
formdata.user_id = None
|
||
|
||
status_item.by = ['_receiver']
|
||
assert status_item.check_auth(formdata, user) is False
|
||
formdata.workflow_roles = {'_receiver': user.id}
|
||
assert status_item.check_auth(formdata, user) is True
|
||
formdef.workflow_roles = {'_receiver': user.id}
|
||
formdata.workflow_roles = None
|
||
assert status_item.check_auth(formdata, user) is True
|
||
|
||
|
||
def test_dispatch(pub):
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.store()
|
||
|
||
Role.wipe()
|
||
role = Role(name='xxx')
|
||
role.store()
|
||
|
||
item = DispatchWorkflowStatusItem()
|
||
|
||
formdata = formdef.data_class()()
|
||
item.perform(formdata)
|
||
assert not formdata.workflow_roles
|
||
|
||
formdata = formdef.data_class()()
|
||
item.role_key = '_receiver'
|
||
item.role_id = role.id
|
||
item.perform(formdata)
|
||
assert formdata.workflow_roles == {'_receiver': role.id}
|
||
|
||
|
||
def test_dispatch_auto(two_pubs):
|
||
if not two_pubs.is_using_postgresql():
|
||
pytest.skip('this requires sql')
|
||
return
|
||
|
||
two_pubs.loggederror_class.wipe()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
StringField(id='1', label='Test', type='string', varname='foo'),
|
||
]
|
||
formdef.store()
|
||
|
||
item = DispatchWorkflowStatusItem()
|
||
item.role_key = '_receiver'
|
||
item.dispatch_type = 'automatic'
|
||
|
||
formdata = formdef.data_class()()
|
||
two_pubs.substitutions.reset()
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
assert not formdata.workflow_roles
|
||
|
||
Role.wipe()
|
||
role1 = Role('xxx1')
|
||
role1.store()
|
||
role2 = Role('xxx2')
|
||
role2.store()
|
||
|
||
for variable in ('form_var_foo', '{{form_var_foo}}'):
|
||
formdata.data = {}
|
||
formdata.workflow_roles = {}
|
||
item.variable = variable
|
||
item.rules = [
|
||
{'role_id': role1.id, 'value': 'foo'},
|
||
{'role_id': role2.id, 'value': 'bar'},
|
||
]
|
||
|
||
two_pubs.substitutions.reset()
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
assert not formdata.workflow_roles
|
||
|
||
# no match
|
||
formdata.data = {'1': 'XXX'}
|
||
two_pubs.substitutions.reset()
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
assert not formdata.workflow_roles
|
||
|
||
# match
|
||
formdata.data = {'1': 'foo'}
|
||
two_pubs.substitutions.reset()
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
assert formdata.workflow_roles == {'_receiver': role1.id}
|
||
|
||
# other match
|
||
formdata.data = {'1': 'bar'}
|
||
two_pubs.substitutions.reset()
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
assert formdata.workflow_roles == {'_receiver': role2.id}
|
||
|
||
# unknown role
|
||
formdata.data = {'1': 'foo'}
|
||
formdata.workflow_roles = {}
|
||
item.variable = variable
|
||
item.rules = [
|
||
{'role_id': 'foobar', 'value': 'foo'},
|
||
]
|
||
two_pubs.substitutions.reset()
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
assert not formdata.workflow_roles
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
error = two_pubs.loggederror_class.select()[0]
|
||
assert error.tech_id == '%s-_default-error-in-dispatch-missing-role-foobar' % formdef.id
|
||
assert error.formdef_id == formdef.id
|
||
assert error.workflow_id == '_default'
|
||
assert error.summary == 'error in dispatch, missing role (foobar)'
|
||
assert error.occurences_count == 1
|
||
|
||
|
||
def test_dispatch_computed(two_pubs):
|
||
if not two_pubs.is_using_postgresql():
|
||
pytest.skip('this requires sql')
|
||
return
|
||
|
||
two_pubs.loggederror_class.wipe()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.store()
|
||
|
||
Role.wipe()
|
||
role = Role(name='xxx')
|
||
role.slug = 'yyy'
|
||
role.store()
|
||
|
||
item = DispatchWorkflowStatusItem()
|
||
|
||
formdata = formdef.data_class()()
|
||
item.perform(formdata)
|
||
assert not formdata.workflow_roles
|
||
|
||
formdata = formdef.data_class()()
|
||
item.role_key = '_receiver'
|
||
item.role_id = '="yyy"' # slug
|
||
item.perform(formdata)
|
||
assert formdata.workflow_roles == {'_receiver': role.id}
|
||
|
||
formdata = formdef.data_class()()
|
||
item.role_key = '_receiver'
|
||
item.role_id = '="xxx"' # name
|
||
item.perform(formdata)
|
||
assert formdata.workflow_roles == {'_receiver': role.id}
|
||
|
||
# unknown role
|
||
formdata = formdef.data_class()()
|
||
item.role_key = '_receiver'
|
||
item.role_id = '="foobar"'
|
||
item.perform(formdata)
|
||
assert not formdata.workflow_roles
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
error = two_pubs.loggederror_class.select()[0]
|
||
assert error.tech_id == '%s-_default-error-in-dispatch-missing-role-foobar' % formdef.id
|
||
assert error.formdef_id == formdef.id
|
||
assert error.workflow_id == '_default'
|
||
assert error.summary == 'error in dispatch, missing role (="foobar")'
|
||
assert error.occurences_count == 1
|
||
|
||
|
||
def test_roles(pub):
|
||
user = pub.user_class()
|
||
user.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.user_id = user.id
|
||
|
||
item = AddRoleWorkflowStatusItem()
|
||
|
||
item.perform(formdata)
|
||
assert not pub.user_class.get(user.id).roles
|
||
|
||
item.role_id = '1'
|
||
item.perform(formdata)
|
||
assert pub.user_class.get(user.id).roles == ['1']
|
||
|
||
user.roles = None
|
||
user.store()
|
||
item = RemoveRoleWorkflowStatusItem()
|
||
|
||
item.perform(formdata)
|
||
assert not pub.user_class.get(user.id).roles
|
||
|
||
item.role_id = '1'
|
||
item.perform(formdata)
|
||
assert not pub.user_class.get(user.id).roles
|
||
|
||
user.roles = ['1']
|
||
user.store()
|
||
item.perform(formdata)
|
||
assert not pub.user_class.get(user.id).roles
|
||
|
||
user.roles = ['2', '1']
|
||
user.store()
|
||
item.perform(formdata)
|
||
assert pub.user_class.get(user.id).roles == ['2']
|
||
|
||
|
||
def test_add_remove_computed_roles(pub):
|
||
user = pub.user_class()
|
||
user.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.user_id = user.id
|
||
|
||
role = Role(name='plop')
|
||
role.store()
|
||
role2 = Role(name='xxx')
|
||
role2.store()
|
||
|
||
item = AddRoleWorkflowStatusItem()
|
||
|
||
item.perform(formdata)
|
||
assert not pub.user_class.get(user.id).roles
|
||
|
||
item.role_id = role.name
|
||
item.perform(formdata)
|
||
assert pub.user_class.get(user.id).roles == [role.id]
|
||
|
||
user.roles = None
|
||
user.store()
|
||
item = RemoveRoleWorkflowStatusItem()
|
||
|
||
item.perform(formdata)
|
||
assert not pub.user_class.get(user.id).roles
|
||
|
||
item.role_id = role.name
|
||
item.perform(formdata)
|
||
assert not pub.user_class.get(user.id).roles
|
||
|
||
user.roles = [role.id]
|
||
user.store()
|
||
item.perform(formdata)
|
||
assert not pub.user_class.get(user.id).roles
|
||
|
||
user.roles = [role2.id, role.id]
|
||
user.store()
|
||
item.perform(formdata)
|
||
assert pub.user_class.get(user.id).roles == [role2.id]
|
||
|
||
|
||
def test_roles_idp(pub):
|
||
pub.cfg['sp'] = {'idp-manage-user-attributes': True}
|
||
pub.cfg['idp'] = {'xxx': {'metadata_url': 'http://idp.example.net/idp/saml2/metadata'}}
|
||
pub.write_cfg()
|
||
user = pub.user_class()
|
||
user.name_identifiers = ['xxx']
|
||
user.store()
|
||
|
||
role = Role(name='bar1')
|
||
role.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.user_id = user.id
|
||
|
||
item = AddRoleWorkflowStatusItem()
|
||
|
||
item.perform(formdata)
|
||
assert not pub.user_class.get(user.id).roles
|
||
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
|
||
http_post_request.return_value = (None, 201, '', None)
|
||
get_response().process_after_jobs()
|
||
assert http_post_request.call_count == 0
|
||
|
||
item.role_id = role.id
|
||
item.perform(formdata)
|
||
assert pub.user_class.get(user.id).roles == [role.id]
|
||
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
|
||
http_post_request.return_value = (None, 201, '', None)
|
||
get_response().process_after_jobs()
|
||
assert http_post_request.call_count == 1
|
||
assert http_post_request.call_args[0][0].startswith(
|
||
'http://idp.example.net/api/roles/bar1/members/xxx/')
|
||
assert 'signature=' in http_post_request.call_args[0][0]
|
||
|
||
user.roles = None
|
||
user.store()
|
||
|
||
item2 = RemoveRoleWorkflowStatusItem()
|
||
|
||
item2.perform(formdata)
|
||
assert not pub.user_class.get(user.id).roles
|
||
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
|
||
http_delete_request.return_value = (None, 200, '', None)
|
||
get_response().process_after_jobs()
|
||
assert http_delete_request.call_count == 0
|
||
|
||
item2.role_id = role.id
|
||
user.roles = [role.id]
|
||
user.store()
|
||
item2.perform(formdata)
|
||
assert not pub.user_class.get(user.id).roles
|
||
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
|
||
http_delete_request.return_value = (None, 200, '', None)
|
||
get_response().process_after_jobs()
|
||
assert http_delete_request.call_count == 1
|
||
assert http_delete_request.call_args[0][0].startswith(
|
||
'http://idp.example.net/api/roles/bar1/members/xxx/')
|
||
assert 'signature=' in http_delete_request.call_args[0][0]
|
||
|
||
# out of http request/response cycle
|
||
pub._set_request(None)
|
||
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
|
||
http_post_request.return_value = (None, 201, '', None)
|
||
item.perform(formdata)
|
||
assert pub.user_class.get(user.id).roles == [role.id]
|
||
|
||
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
|
||
http_delete_request.return_value = (None, 200, '', None)
|
||
item2.perform(formdata)
|
||
assert pub.user_class.get(user.id).roles == []
|
||
|
||
|
||
def test_anonymise(two_pubs):
|
||
# build a backoffice field
|
||
Workflow.wipe()
|
||
wf = Workflow(name='wf with backoffice field')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
wf.backoffice_fields_formdef.fields = [
|
||
StringField(id='bo1', label='bo field 1', type='string'),
|
||
ItemField(id='bo2', label='list', type='item', items=['bofoo', 'bobar']),
|
||
]
|
||
wf.add_status('Status1')
|
||
wf.store()
|
||
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
StringField(id='1', label='field 1', type='string'),
|
||
ItemField(id='2', label='list', type='item', items=['abc', 'def']),
|
||
]
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.user_id = '1'
|
||
formdata.data = {'1': 'foo',
|
||
'2': 'abc', '2_display': 'abc',
|
||
'bo1': 'bar',
|
||
'bo2': 'foo', 'bo2_display': 'foo'}
|
||
formdata.workflow_data = {'e': 'mc2'}
|
||
formdata.submission_context = {'foo': 'bar'}
|
||
formdata.store()
|
||
|
||
item = AnonymiseWorkflowStatusItem()
|
||
item.perform(formdata)
|
||
assert formdef.data_class().get(formdata.id).user_id is None
|
||
assert formdef.data_class().get(formdata.id).anonymised
|
||
assert formdef.data_class().get(formdata.id).submission_context is None
|
||
assert formdef.data_class().get(formdata.id).data == {'1': None,
|
||
'2': 'abc', '2_display': 'abc',
|
||
'bo1': None,
|
||
'bo2': 'foo', 'bo2_display': 'foo'}
|
||
assert formdef.data_class().get(formdata.id).workflow_data is None
|
||
assert formdef.data_class().get(formdata.id).evolution[0].who is None
|
||
|
||
|
||
def test_remove(pub):
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.store()
|
||
|
||
item = RemoveWorkflowStatusItem()
|
||
assert formdef.data_class().has_key(formdata.id)
|
||
assert item.perform(formdata) == 'http://example.net'
|
||
assert not formdef.data_class().has_key(formdata.id)
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.store()
|
||
|
||
item = RemoveWorkflowStatusItem()
|
||
req = pub.get_request()
|
||
req.response.filter['in_backoffice'] = True
|
||
assert formdef.data_class().has_key(formdata.id)
|
||
assert item.perform(formdata) == '..'
|
||
assert not formdef.data_class().has_key(formdata.id)
|
||
req.response.filter = {}
|
||
assert req.session.message
|
||
|
||
|
||
def test_register_comment(pub):
|
||
pub.substitutions.feed(MockSubstitutionVariables())
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
item = RegisterCommenterWorkflowStatusItem()
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts() == []
|
||
|
||
item.comment = 'Hello world'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<p>Hello world</p>'
|
||
|
||
item.comment = '<div>Hello world</div>'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<div>Hello world</div>'
|
||
|
||
formdata.evolution[-1].parts = []
|
||
formdata.store()
|
||
item.comment = '{{ test }}'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts() == []
|
||
|
||
item.comment = '[test]'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<p>[test]</p>'
|
||
|
||
item.comment = '{{ bar }}'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<div>Foobar</div>'
|
||
|
||
item.comment = '[bar]'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<p>Foobar</p>'
|
||
|
||
item.comment = '<p>{{ foo }}</p>'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 < 3</p>'
|
||
|
||
item.comment = '<p>{{ foo|safe }}</p>'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 < 3</p>'
|
||
|
||
item.comment = '{{ foo }}'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
|
||
|
||
item.comment = '{{ foo|safe }}'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
|
||
|
||
item.comment = '[foo]'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 < 3</p>'
|
||
|
||
item.comment = '<div>{{ foo }}</div>'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
|
||
|
||
item.comment = '<div>[foo]</div>'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
|
||
|
||
|
||
def test_register_comment_django_escaping(pub, emails):
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.data = {'1': '<p>hello</p>'}
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item = RegisterCommenterWorkflowStatusItem()
|
||
item.comment = '<div>{{form_var_foo}}</div>'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<div><p>hello</p></div>'
|
||
|
||
# |safe
|
||
item = RegisterCommenterWorkflowStatusItem()
|
||
item.comment = '<div>{{form_var_foo|safe}}</div>'
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts()[-1] == '<div><p>hello</p></div>'
|
||
|
||
|
||
def test_register_comment_attachment(pub):
|
||
pub.substitutions.feed(MockSubstitutionVariables())
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
item = RegisterCommenterWorkflowStatusItem()
|
||
item.perform(formdata)
|
||
formdata.evolution[-1]._display_parts = None
|
||
assert formdata.evolution[-1].display_parts() == []
|
||
|
||
if os.path.exists(os.path.join(get_publisher().app_dir, 'attachments')):
|
||
shutil.rmtree(os.path.join(get_publisher().app_dir, 'attachments'))
|
||
|
||
formdata.evolution[-1].parts = [AttachmentEvolutionPart('hello.txt',
|
||
fp=BytesIO(b'hello world'), varname='testfile')]
|
||
formdata.store()
|
||
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
|
||
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
|
||
assert len(subdir) == 4
|
||
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
|
||
|
||
item.comment = '{{ attachments.testfile.url }}'
|
||
|
||
pub.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
url1 = formdata.evolution[-1].parts[-1].content
|
||
|
||
pub.substitutions.feed(formdata)
|
||
item.comment = '{{ form_attachments.testfile.url }}'
|
||
item.perform(formdata)
|
||
url2 = formdata.evolution[-1].parts[-1].content
|
||
|
||
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
|
||
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
|
||
assert len(subdir) == 4
|
||
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
|
||
assert url1 == url2
|
||
|
||
# test with a condition
|
||
item.comment = '{% if form_attachments.testfile %}file is there{% endif %}'
|
||
item.perform(formdata)
|
||
assert formdata.evolution[-1].parts[-1].content == '<div>file is there</div>'
|
||
item.comment = '{% if form_attachments.nope %}file is there{% endif %}'
|
||
item.perform(formdata)
|
||
assert formdata.evolution[-1].parts[-1].content == ''
|
||
|
||
# test with an action condition
|
||
item.condition = {'type': 'django', 'value': 'form_attachments.testfile'}
|
||
assert item.check_condition(formdata) is True
|
||
|
||
item.condition = {'type': 'django', 'value': 'form_attachments.missing'}
|
||
assert item.check_condition(formdata) is False
|
||
|
||
pub.substitutions.feed(formdata)
|
||
item.comment = '[attachments.testfile.url]'
|
||
item.perform(formdata)
|
||
url3 = formdata.evolution[-1].parts[-1].content
|
||
pub.substitutions.feed(formdata)
|
||
item.comment = '[form_attachments.testfile.url]'
|
||
item.perform(formdata)
|
||
url4 = formdata.evolution[-1].parts[-1].content
|
||
assert url3 == url4
|
||
|
||
|
||
def test_register_comment_with_attachment_file(pub):
|
||
wf = Workflow(name='comment with attachments')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
wf.backoffice_fields_formdef.fields = [
|
||
FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'),
|
||
]
|
||
st1 = wf.add_status('Status1')
|
||
wf.store()
|
||
|
||
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
||
jpg = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
|
||
upload.receive([jpg])
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
FileField(id='1', label='File', type='file', varname='frontoffice_file'),
|
||
]
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'1': upload}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
pub.substitutions.feed(formdata)
|
||
|
||
setbo = SetBackofficeFieldsWorkflowStatusItem()
|
||
setbo.parent = st1
|
||
setbo.fields = [{'field_id': 'bo1', 'value': '=form_var_frontoffice_file_raw'}]
|
||
setbo.perform(formdata)
|
||
|
||
if os.path.exists(os.path.join(get_publisher().app_dir, 'attachments')):
|
||
shutil.rmtree(os.path.join(get_publisher().app_dir, 'attachments'))
|
||
|
||
comment_text = 'File is attached to the form history'
|
||
|
||
item = RegisterCommenterWorkflowStatusItem()
|
||
item.attachments = ['form_var_backoffice_file1_raw']
|
||
item.comment = comment_text
|
||
item.perform(formdata)
|
||
|
||
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
|
||
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
|
||
assert len(subdir) == 4
|
||
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
|
||
|
||
assert len(formdata.evolution[-1].parts) == 2
|
||
assert isinstance(formdata.evolution[-1].parts[0], AttachmentEvolutionPart)
|
||
assert formdata.evolution[-1].parts[0].orig_filename == upload.orig_filename
|
||
|
||
assert isinstance(formdata.evolution[-1].parts[1], JournalEvolutionPart)
|
||
assert len(formdata.evolution[-1].parts[1].content) > 0
|
||
comment_view = str(formdata.evolution[-1].parts[1].view())
|
||
assert comment_view == '<p>%s</p>' % comment_text
|
||
|
||
if os.path.exists(os.path.join(get_publisher().app_dir, 'attachments')):
|
||
shutil.rmtree(os.path.join(get_publisher().app_dir, 'attachments'))
|
||
|
||
formdata.evolution[-1].parts = []
|
||
formdata.store()
|
||
|
||
ws_response_varname = 'ws_response_afile'
|
||
wf_data = {
|
||
'%s_filename' % ws_response_varname: 'hello.txt',
|
||
'%s_content_type' % ws_response_varname: 'text/plain',
|
||
'%s_b64_content' % ws_response_varname: base64.encodebytes(b'hello world'),
|
||
}
|
||
formdata.update_workflow_data(wf_data)
|
||
formdata.store()
|
||
assert hasattr(formdata, 'workflow_data')
|
||
assert isinstance(formdata.workflow_data, dict)
|
||
|
||
item = RegisterCommenterWorkflowStatusItem()
|
||
item.attachments = ["utils.dict_from_prefix('%s_', locals())" % ws_response_varname]
|
||
item.comment = comment_text
|
||
item.perform(formdata)
|
||
|
||
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1
|
||
for subdir in os.listdir(os.path.join(get_publisher().app_dir, 'attachments')):
|
||
assert len(subdir) == 4
|
||
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments', subdir))) == 1
|
||
|
||
assert len(formdata.evolution[-1].parts) == 2
|
||
assert isinstance(formdata.evolution[-1].parts[0], AttachmentEvolutionPart)
|
||
assert formdata.evolution[-1].parts[0].orig_filename == 'hello.txt'
|
||
|
||
assert isinstance(formdata.evolution[-1].parts[1], JournalEvolutionPart)
|
||
assert len(formdata.evolution[-1].parts[1].content) > 0
|
||
comment_view = str(formdata.evolution[-1].parts[1].view())
|
||
assert comment_view == '<p>%s</p>' % comment_text
|
||
|
||
|
||
def test_email(pub, emails):
|
||
pub.substitutions.feed(MockSubstitutionVariables())
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
user = pub.user_class(name='foo')
|
||
user.email = 'zorg@localhost'
|
||
user.store()
|
||
|
||
Role.wipe()
|
||
role1 = Role(name='foo')
|
||
role1.emails = ['foo@localhost']
|
||
role1.store()
|
||
|
||
role2 = Role(name='bar')
|
||
role2.emails = ['bar@localhost', 'baz@localhost']
|
||
role2.store()
|
||
|
||
# send using an uncompleted element
|
||
item = SendmailWorkflowStatusItem()
|
||
item.perform(formdata) # nothing
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 0
|
||
|
||
item.to = [role1.id]
|
||
item.perform(formdata) # no subject nor body
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 0
|
||
|
||
item.subject = 'foobar'
|
||
item.perform(formdata) # no body
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 0
|
||
|
||
# send for real
|
||
item.body = 'baz'
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('foobar')
|
||
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost']
|
||
assert 'baz' in emails.get('foobar')['payload']
|
||
|
||
# template for subject or body (Django)
|
||
emails.empty()
|
||
item.subject = '{{ bar }}'
|
||
item.body = '{{ foo }}'
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('Foobar')
|
||
assert '1 < 3' in emails.get('Foobar')['payload']
|
||
|
||
# template for subject or body (ezt)
|
||
emails.empty()
|
||
item.subject = '[bar]'
|
||
item.body = '[foo]'
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('Foobar')
|
||
assert '1 < 3' in emails.get('Foobar')['payload']
|
||
|
||
# two recipients
|
||
emails.empty()
|
||
item.subject = 'foobar'
|
||
item.to = [role1.id, role2.id]
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('foobar')['to'] == 'Undisclosed recipients:;'
|
||
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost',
|
||
'bar@localhost', 'baz@localhost']
|
||
|
||
# submitter as recipient, no known email address
|
||
emails.empty()
|
||
item.to = ['_submitter']
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 0
|
||
|
||
# submitter as recipient, known email address
|
||
emails.empty()
|
||
formdata.user_id = user.id
|
||
formdata.store()
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('foobar')['email_rcpt'] == ['zorg@localhost']
|
||
|
||
# computed recipient
|
||
emails.empty()
|
||
item.to = ['=email']
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('foobar')['email_rcpt'] == ['sub@localhost']
|
||
|
||
# computed list of recipients
|
||
emails.empty()
|
||
item.to = ['=["foo@localhost", "bar@localhost"]']
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost', 'bar@localhost']
|
||
|
||
# multiple recipients in a single computed string
|
||
emails.empty()
|
||
item.to = ['="foo@localhost, bar@localhost"']
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost', 'bar@localhost']
|
||
|
||
# string as recipient
|
||
emails.empty()
|
||
item.to = 'xyz@localhost'
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
|
||
|
||
# string as recipient (but correctly set in a list)
|
||
emails.empty()
|
||
item.to = ['xyz@localhost']
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
|
||
|
||
# multiple recipients in a static string
|
||
emails.empty()
|
||
item.to = ['foo@localhost, bar@localhost']
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost', 'bar@localhost']
|
||
|
||
# invalid recipient
|
||
emails.empty()
|
||
item.to = ['=foobar']
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 0
|
||
|
||
# empty recipient
|
||
emails.empty()
|
||
item.to = ['=None']
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 0
|
||
|
||
# custom from email
|
||
emails.empty()
|
||
item.to = [role1.id]
|
||
item.custom_from = 'foobar@localhost'
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('foobar').get('from') == 'foobar@localhost'
|
||
|
||
# custom from email (computed)
|
||
emails.empty()
|
||
item.to = [role1.id]
|
||
item.custom_from = '="foobar@localhost"'
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('foobar').get('from') == 'foobar@localhost'
|
||
|
||
# custom sender name defined from site-options variable
|
||
pub.load_site_options()
|
||
if not pub.site_options.has_section('variables'):
|
||
pub.site_options.add_section('variables')
|
||
pub.site_options.set('variables', 'email_sender_name', 'SENDER NAME')
|
||
emails.empty()
|
||
item.to = [role1.id]
|
||
item.custom_from = '="foobar@localhost"'
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('foobar')['msg']['From'] == 'SENDER NAME <foobar@localhost>'
|
||
|
||
|
||
def test_email_django_escaping(pub, emails):
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item = SendmailWorkflowStatusItem()
|
||
item.to = ['foo@localhost']
|
||
item.subject = 'Foobar'
|
||
|
||
# explicit safe strings
|
||
emails.empty()
|
||
formdata.data = {'1': '1 < 3'}
|
||
item.body = '{{ form_var_foo|safe }}'
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('Foobar')['payload'].strip() == '1 < 3'
|
||
|
||
# automatic no-escaping (because text/plain)
|
||
emails.empty()
|
||
formdata.data = {'1': '1 < 3'}
|
||
item.body = '{{ form_var_foo }}'
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('Foobar')['payload'].strip() == '1 < 3'
|
||
|
||
# automatic escaping (because mail body is HTML)
|
||
emails.empty()
|
||
formdata.data = {'1': '1 < 3'}
|
||
item.body = '<p>{{ form_var_foo }}</p>'
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('Foobar')
|
||
assert '<p>1 < 3</p>' in emails.get('Foobar')['payload'].strip()
|
||
|
||
# no automatic escaping for subject (even if mail body is HTML)
|
||
emails.empty()
|
||
formdata.data = {'1': '1 < 3'}
|
||
item.subject = '{{ form_var_foo }}'
|
||
item.body = '<p>{{ form_var_foo }}</p>'
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('1 < 3')
|
||
|
||
|
||
def test_email_attachments(pub, emails):
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
FileField(id='3', label='File', type='file', varname='file'),
|
||
]
|
||
formdef.store()
|
||
|
||
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
||
jpg = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
|
||
upload.receive([jpg])
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'3': upload}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
sendmail = SendmailWorkflowStatusItem()
|
||
sendmail.subject = 'foobar'
|
||
sendmail.body = '<p>force html</p>'
|
||
sendmail.to = ['to@example.net']
|
||
sendmail.attachments = ['form_var_file_raw']
|
||
sendmail.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.emails['foobar']['msg'].is_multipart()
|
||
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
||
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
||
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
||
|
||
# build a backoffice field
|
||
Workflow.wipe()
|
||
FormDef.wipe()
|
||
wf = Workflow(name='email with attachments')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
wf.backoffice_fields_formdef.fields = [
|
||
FileField(id='bo1-1x', label='bo field 1', type='file', varname='backoffice_file1'),
|
||
FileField(id='bo2', label='bo field 2', type='file', varname='backoffice_file2'),
|
||
]
|
||
st1 = wf.add_status('Status1')
|
||
wf.store()
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
FileField(id='1', label='File', type='file', varname='frontoffice_file'),
|
||
]
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'1': upload}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
# store file in backoffice field form_fbo1_1x / form_var_backoffice_file_raw
|
||
setbo = SetBackofficeFieldsWorkflowStatusItem()
|
||
setbo.parent = st1
|
||
setbo.fields = [{'field_id': 'bo1-1x', 'value': '=form_var_frontoffice_file_raw'}]
|
||
setbo.perform(formdata)
|
||
|
||
# check compatibility with actions defined before #33366 was fixed
|
||
emails.empty()
|
||
sendmail.attachments = ['form_fbo1-1x']
|
||
sendmail.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.emails['foobar']['msg'].is_multipart()
|
||
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
||
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
||
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
||
|
||
# check with correct varname-less field
|
||
emails.empty()
|
||
sendmail.attachments = ['form_fbo1_1x']
|
||
sendmail.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.emails['foobar']['msg'].is_multipart()
|
||
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
||
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
||
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
||
|
||
# check with variable
|
||
emails.empty()
|
||
sendmail.attachments = ['form_var_backoffice_file1_raw']
|
||
sendmail.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.emails['foobar']['msg'].is_multipart()
|
||
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
||
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
||
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
||
|
||
emails.empty()
|
||
sendmail.attachments = ['form_var_backoffice_file1_raw', 'form_var_backoffice_file2_raw']
|
||
sendmail.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.emails['foobar']['msg'].is_multipart()
|
||
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
||
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
||
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
||
# backoffice_file2 is unset, no more parts :
|
||
assert len(emails.emails['foobar']['msg'].get_payload()) == 2
|
||
|
||
# set backoffice_file2 and retry
|
||
setbo.fields = [{'field_id': 'bo2',
|
||
'value': '={"content": "blah", "filename": "hello.txt", '
|
||
'"content_type": "text/plain"}'}]
|
||
setbo.perform(formdata)
|
||
emails.empty()
|
||
sendmail.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.emails['foobar']['msg'].is_multipart()
|
||
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
||
assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html'
|
||
assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg'
|
||
assert emails.emails['foobar']['msg'].get_payload()[2].get_content_type() == 'text/plain'
|
||
assert base64.decodebytes(force_bytes(emails.emails['foobar']['msg'].get_payload()[2].get_payload())) == b'blah'
|
||
assert len(emails.emails['foobar']['msg'].get_payload()) == 3
|
||
|
||
emails.empty()
|
||
sendmail.attachments = [
|
||
'utils.attachment("Hello world")',
|
||
'utils.attachment(\'{"hello": "world"}\', content_type=\'application/json\')',
|
||
]
|
||
sendmail.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.emails['foobar']['msg'].is_multipart()
|
||
assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed'
|
||
assert emails.emails['foobar']['msg'].get_payload(0).get_content_type() == 'text/html'
|
||
assert emails.emails['foobar']['msg'].get_payload(1).get_content_type() == 'application/octet-stream'
|
||
assert emails.emails['foobar']['msg'].get_payload(2).get_content_type() == 'application/json'
|
||
payload1 = emails.emails['foobar']['msg'].get_payload(1)
|
||
payload2 = emails.emails['foobar']['msg'].get_payload(2)
|
||
assert payload1.get_payload(decode=True) == b"Hello world"
|
||
assert json.loads(force_text(payload2.get_payload(decode=True))) == {'hello': 'world'}
|
||
|
||
|
||
def test_webservice_call(http_requests, pub):
|
||
pub.substitutions.feed(MockSubstitutionVariables())
|
||
|
||
wf = Workflow(name='wf1')
|
||
st1 = wf.add_status('Status1', 'st1')
|
||
sterr = wf.add_status('StatusErr', 'sterr')
|
||
wf.store()
|
||
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net'
|
||
item.perform(formdata)
|
||
assert http_requests.get_last('url') == 'http://remote.example.net'
|
||
assert http_requests.get_last('method') == 'GET'
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net'
|
||
item.post = True
|
||
item.perform(formdata)
|
||
assert http_requests.get_last('url') == 'http://remote.example.net'
|
||
assert http_requests.get_last('method') == 'POST'
|
||
payload = json.loads(http_requests.get_last('body'))
|
||
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
|
||
assert payload['display_id'] == formdata.get_display_id()
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net'
|
||
item.post_data = {'str': 'abcd', 'one': '=1', 'django': '{{ form_number }}',
|
||
'evalme': '=form_number', 'error':'=1=3'}
|
||
pub.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
assert http_requests.get_last('url') == 'http://remote.example.net'
|
||
assert http_requests.get_last('method') == 'POST'
|
||
payload = json.loads(http_requests.get_last('body'))
|
||
assert payload == {'one': 1, 'str': 'abcd',
|
||
'evalme': formdata.get_display_id(),
|
||
'django': formdata.get_display_id()}
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net'
|
||
item.post = True
|
||
item.post_data = {'str': 'abcd', 'one': '=1', 'decimal': '=Decimal(2)',
|
||
'evalme': '=form_number', 'error':'=1=3'}
|
||
pub.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
assert http_requests.get_last('url') == 'http://remote.example.net'
|
||
assert http_requests.get_last('method') == 'POST'
|
||
payload = json.loads(http_requests.get_last('body'))
|
||
assert payload['extra'] == {'one': 1, 'str': 'abcd',
|
||
'decimal': '2', 'evalme': formdata.get_display_id()}
|
||
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
|
||
assert payload['display_id'] == formdata.get_display_id()
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json'
|
||
item.varname = 'xxx'
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data['xxx_status'] == 200
|
||
assert formdata.workflow_data['xxx_response'] == {'foo': 'bar'}
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
|
||
get_publisher().substitutions.reset()
|
||
get_publisher().substitutions.feed(formdata)
|
||
substvars = get_publisher().substitutions.get_context_variables(mode='lazy')
|
||
assert str(substvars['xxx_status']) == '200'
|
||
assert 'xxx_status' in substvars.get_flat_keys()
|
||
assert str(substvars['xxx_response_foo']) == 'bar'
|
||
assert 'xxx_response_foo' in substvars.get_flat_keys()
|
||
|
||
pub.substitutions.reset()
|
||
pub.substitutions.feed(MockSubstitutionVariables())
|
||
|
||
formdata.workflow_data = None
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net'
|
||
item.request_signature_key = 'xxx'
|
||
item.perform(formdata)
|
||
assert 'signature=' in http_requests.get_last('url')
|
||
assert http_requests.get_last('method') == 'GET'
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net'
|
||
item.request_signature_key = '{{ doesntexist }}'
|
||
item.perform(formdata)
|
||
assert not 'signature=' in http_requests.get_last('url')
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net'
|
||
item.request_signature_key = '{{ empty }}'
|
||
item.perform(formdata)
|
||
assert not 'signature=' in http_requests.get_last('url')
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net'
|
||
item.request_signature_key = '[empty]'
|
||
item.perform(formdata)
|
||
assert not 'signature=' in http_requests.get_last('url')
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net'
|
||
item.request_signature_key = '{{ bar }}'
|
||
item.perform(formdata)
|
||
assert 'signature=' in http_requests.get_last('url')
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net'
|
||
item.request_signature_key = '[bar]'
|
||
item.perform(formdata)
|
||
assert 'signature=' in http_requests.get_last('url')
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/204'
|
||
item.varname = 'xxx'
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data.get('xxx_status') == 204
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
assert 'xxx_response' not in formdata.workflow_data
|
||
assert 'xxx_error_response' not in formdata.workflow_data
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/404'
|
||
item.varname = 'xxx'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data.get('xxx_status') == 404
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
assert 'xxx_error_response' not in formdata.workflow_data
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/404-json'
|
||
item.varname = 'xxx'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data.get('xxx_status') == 404
|
||
assert formdata.workflow_data.get('xxx_error_response') == {'err': 1}
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
assert 'xxx_response' not in formdata.workflow_data
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/404'
|
||
item.action_on_4xx = ':pass'
|
||
item.perform(formdata)
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/500'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/500'
|
||
item.action_on_5xx = ':pass'
|
||
item.perform(formdata)
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.parent = st1
|
||
assert item.get_jump_label(st1.id) == 'Webservice'
|
||
assert item.get_jump_label('sterr') == 'Error calling webservice'
|
||
item.label = 'Plop'
|
||
assert item.get_jump_label(st1.id) == 'Webservice "Plop"'
|
||
assert item.get_jump_label('sterr') == 'Error calling webservice "Plop"'
|
||
item.url = 'http://remote.example.net/500'
|
||
item.action_on_5xx = 'sterr' # jump to status
|
||
formdata.status = 'wf-st1'
|
||
formdata.store()
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
assert formdata.status == 'wf-sterr'
|
||
item.action_on_5xx = 'stdeleted' # removed status
|
||
formdata.status = 'wf-st1'
|
||
formdata.store()
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
assert formdata.status == 'wf-st1' # unknown status acts like :stop
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/xml'
|
||
item.perform(formdata)
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/xml'
|
||
item.varname = 'xxx'
|
||
item.action_on_bad_data = ':stop'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data.get('xxx_status') == 200
|
||
assert 'xxx_response' not in formdata.workflow_data
|
||
assert 'xxx_error_response' not in formdata.workflow_data
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/404'
|
||
item.record_errors = True
|
||
item.action_on_4xx = ':stop'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
assert formdata.evolution[-1].parts[-1].summary == '404 whatever'
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/xml'
|
||
item.varname = 'xxx'
|
||
item.action_on_bad_data = ':stop'
|
||
item.record_errors = True
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
assert formdata.evolution[-1].parts[-1].summary == 'json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)\n'
|
||
assert formdata.workflow_data.get('xxx_status') == 200
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
assert 'xxx_error_response' not in formdata.workflow_data
|
||
formdata.workflow_data = None
|
||
|
||
# check storing response as attachment
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/xml'
|
||
item.varname = 'xxx'
|
||
item.response_type = 'attachment'
|
||
item.record_errors = True
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data.get('xxx_status') == 200
|
||
assert formdata.workflow_data.get('xxx_content_type') == 'text/xml'
|
||
attachment = formdata.evolution[-1].parts[-1]
|
||
assert isinstance(attachment, AttachmentEvolutionPart)
|
||
assert attachment.base_filename == 'xxx.xml'
|
||
assert attachment.content_type == 'text/xml'
|
||
attachment.fp.seek(0)
|
||
assert attachment.fp.read(5) == b'<?xml'
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/400-json'
|
||
item.record_errors = True
|
||
item.action_on_4xx = ':stop'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
rendered = formdata.evolution[-1].parts[-1].view()
|
||
assert not rendered # empty if not in backoffice
|
||
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': '/backoffice/'})
|
||
pub._set_request(req)
|
||
rendered = formdata.evolution[-1].parts[-1].view()
|
||
assert 'Error during webservice call' in str(rendered)
|
||
assert 'Error Code: 1' in str(rendered)
|
||
assert 'Error Description: :(' in str(rendered)
|
||
|
||
item.label = 'do that'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
rendered = formdata.evolution[-1].parts[-1].view()
|
||
assert 'Error during webservice call "do that"' in str(rendered)
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.method = 'GET'
|
||
item.url = 'http://remote.example.net?in_url=1'
|
||
item.qs_data = {
|
||
'str': 'abcd',
|
||
'one': '=1',
|
||
'evalme': '=form_number',
|
||
'django': '{{ form_number }}',
|
||
'ezt': '[form_number]',
|
||
'error': '=1=3',
|
||
'in_url': '2',
|
||
}
|
||
pub.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
assert http_requests.get_last('method') == 'GET'
|
||
qs = urlparse.parse_qs(http_requests.get_last('url').split('?')[1])
|
||
assert set(qs.keys()) == set(['in_url', 'str', 'one', 'evalme', 'django', 'ezt'])
|
||
assert qs['in_url'] == ['1', '2']
|
||
assert qs['one'] == ['1']
|
||
assert qs['evalme'] == [formdata.get_display_id()]
|
||
assert qs['django'] == [formdata.get_display_id()]
|
||
assert qs['ezt'] == [formdata.get_display_id()]
|
||
assert qs['str'] == ['abcd']
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.method = 'DELETE'
|
||
item.post = False
|
||
item.url = 'http://remote.example.net/json'
|
||
pub.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
assert http_requests.get_last('method') == 'DELETE'
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net'
|
||
item.method = 'PUT'
|
||
item.post = False
|
||
item.post_data = {'str': 'abcd', 'one': '=1',
|
||
'evalme': '=form_number', 'error':'=1=3'}
|
||
pub.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
assert http_requests.get_last('url') == 'http://remote.example.net'
|
||
assert http_requests.get_last('method') == 'PUT'
|
||
payload = json.loads(http_requests.get_last('body'))
|
||
assert payload == {'one': 1, 'str': 'abcd', 'evalme': formdata.get_display_id()}
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net'
|
||
item.method = 'PATCH'
|
||
item.post = False
|
||
item.post_data = {'str': 'abcd', 'one': '=1',
|
||
'evalme': '=form_number', 'error':'=1=3'}
|
||
pub.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
assert http_requests.get_last('url') == 'http://remote.example.net'
|
||
assert http_requests.get_last('method') == 'PATCH'
|
||
payload = json.loads(http_requests.get_last('body'))
|
||
assert payload == {'one': 1, 'str': 'abcd', 'evalme': formdata.get_display_id()}
|
||
|
||
|
||
def test_webservice_waitpoint(pub):
|
||
item = WebserviceCallStatusItem()
|
||
assert item.waitpoint
|
||
item.action_on_app_error = ':pass'
|
||
item.action_on_4xx = ':pass'
|
||
item.action_on_5xx = ':pass'
|
||
item.action_on_bad_data = ':pass'
|
||
item.action_on_network_errors = ':pass'
|
||
assert not item.waitpoint
|
||
item.action_on_network_errors = ':stop'
|
||
assert item.waitpoint
|
||
|
||
|
||
def test_webservice_call_error_handling(http_requests, pub):
|
||
pub.substitutions.feed(MockSubstitutionVariables())
|
||
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-err1'
|
||
item.action_on_app_error = ':stop'
|
||
item.action_on_4xx = ':pass'
|
||
item.action_on_5xx = ':pass'
|
||
item.action_on_network_errors = ':pass'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-errheader1'
|
||
item.action_on_app_error = ':stop'
|
||
item.action_on_4xx = ':pass'
|
||
item.action_on_5xx = ':pass'
|
||
item.action_on_network_errors = ':pass'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-errheaderstr'
|
||
item.action_on_app_error = ':stop'
|
||
item.action_on_4xx = ':pass'
|
||
item.action_on_5xx = ':pass'
|
||
item.action_on_network_errors = ':pass'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-err0'
|
||
item.varname = 'xxx'
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data['xxx_status'] == 200
|
||
assert formdata.workflow_data['xxx_app_error_code'] == 0
|
||
assert formdata.workflow_data['xxx_response'] == {'data': 'foo', 'err': 0}
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-err0'
|
||
item.varname = 'xxx'
|
||
item.action_on_app_error = ':stop'
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data['xxx_status'] == 200
|
||
assert formdata.workflow_data['xxx_app_error_code'] == 0
|
||
assert formdata.workflow_data['xxx_response'] == {'data': 'foo', 'err': 0}
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-err1'
|
||
item.varname = 'xxx'
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data['xxx_status'] == 200
|
||
assert formdata.workflow_data['xxx_app_error_code'] == 1
|
||
assert 'xxx_response' not in formdata.workflow_data
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-errstr'
|
||
item.varname = 'xxx'
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data['xxx_status'] == 200
|
||
assert formdata.workflow_data['xxx_app_error_code'] == 'bug'
|
||
assert 'xxx_response' not in formdata.workflow_data
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-err1'
|
||
item.varname = 'xxx'
|
||
item.action_on_app_error = ':stop'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data['xxx_status'] == 200
|
||
assert formdata.workflow_data['xxx_app_error_code'] == 1
|
||
assert formdata.workflow_data['xxx_error_response'] == {'data': '', 'err': 1}
|
||
assert 'xxx_response' not in formdata.workflow_data
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-errheader0'
|
||
item.varname = 'xxx'
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data['xxx_status'] == 200
|
||
assert formdata.workflow_data['xxx_app_error_code'] == 0
|
||
assert formdata.workflow_data['xxx_app_error_header'] == '0'
|
||
assert formdata.workflow_data['xxx_response'] == {'foo': 'bar'}
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-errheader1'
|
||
item.varname = 'xxx'
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data['xxx_status'] == 200
|
||
assert formdata.workflow_data['xxx_app_error_code'] == 1
|
||
assert formdata.workflow_data['xxx_app_error_header'] == '1'
|
||
assert formdata.workflow_data['xxx_error_response'] == {'foo': 'bar'}
|
||
assert 'xxx_response' not in formdata.workflow_data
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-errheaderstr'
|
||
item.varname = 'xxx'
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data['xxx_status'] == 200
|
||
assert formdata.workflow_data['xxx_app_error_code'] == 'bug'
|
||
assert formdata.workflow_data['xxx_app_error_header'] == 'bug'
|
||
assert formdata.workflow_data['xxx_error_response'] == {'foo': 'bar'}
|
||
assert 'xxx_response' not in formdata.workflow_data
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-errheader1'
|
||
item.varname = 'xxx'
|
||
item.action_on_app_error = ':stop'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data['xxx_status'] == 200
|
||
assert formdata.workflow_data['xxx_app_error_code'] == 1
|
||
assert formdata.workflow_data['xxx_app_error_header'] == '1'
|
||
assert formdata.workflow_data['xxx_error_response'] == {'foo': 'bar'}
|
||
assert 'xxx_response' not in formdata.workflow_data
|
||
assert formdata.workflow_data.get('xxx_time')
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/xml-errheader'
|
||
item.varname = 'xxx'
|
||
item.response_type = 'attachment'
|
||
item.record_errors = True
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data.get('xxx_status') == 200
|
||
assert formdata.workflow_data.get('xxx_app_error_code') == 1
|
||
assert formdata.workflow_data.get('xxx_app_error_header') == '1'
|
||
assert 'xxx_response' not in formdata.workflow_data
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/xml-errheader'
|
||
item.varname = 'xxx'
|
||
item.response_type = 'attachment'
|
||
item.record_errors = True
|
||
item.action_on_app_error = ':stop'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data.get('xxx_status') == 200
|
||
assert formdata.workflow_data.get('xxx_app_error_code') == 1
|
||
assert formdata.workflow_data.get('xxx_app_error_header') == '1'
|
||
assert 'xxx_response' not in formdata.workflow_data
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/xml-errheader'
|
||
item.varname = 'xxx'
|
||
item.response_type = 'json' # wait for json but receive xml
|
||
item.record_errors = True
|
||
item.perform(formdata)
|
||
assert formdata.workflow_data.get('xxx_status') == 200
|
||
assert formdata.workflow_data.get('xxx_app_error_code') == 1
|
||
assert formdata.workflow_data.get('xxx_app_error_header') == '1'
|
||
assert 'xxx_response' not in formdata.workflow_data
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-err1'
|
||
item.action_on_app_error = ':stop'
|
||
item.response_type = 'attachment' # err value is not an error
|
||
item.perform(formdata) # so, everything is "ok" here
|
||
formdata.workflow_data = None
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/json-errheaderstr'
|
||
item.action_on_app_error = ':stop'
|
||
item.response_type = 'attachment'
|
||
with pytest.raises(AbortActionException):
|
||
item.perform(formdata)
|
||
formdata.workflow_data = None
|
||
|
||
# xml instead of json is not a app_error
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/xml'
|
||
item.varname = 'xxx'
|
||
item.action_on_app_error = ':stop'
|
||
item.action_on_4xx = ':pass'
|
||
item.action_on_5xx = ':pass'
|
||
item.action_on_network_errors = ':pass'
|
||
item.action_on_bad_data = ':pass'
|
||
item.perform(formdata)
|
||
formdata.workflow_data = None
|
||
|
||
# connection error
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/connection-error'
|
||
item.record_errors = True
|
||
item.action_on_network_errors = ':pass'
|
||
item.perform(formdata)
|
||
assert not formdata.workflow_data
|
||
|
||
# connection error, with varname
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/connection-error'
|
||
item.varname = 'plop'
|
||
item.record_errors = True
|
||
item.action_on_network_errors = ':pass'
|
||
item.perform(formdata)
|
||
assert 'ConnectionError: error\n' in formdata.evolution[-1].parts[-1].summary
|
||
assert formdata.workflow_data['plop_connection_error'] == 'error'
|
||
|
||
|
||
def test_webservice_call_store_in_backoffice_filefield(http_requests, pub):
|
||
wf = Workflow(name='wscall to backoffice file field')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
wf.backoffice_fields_formdef.fields = [
|
||
FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'),
|
||
]
|
||
st1 = wf.add_status('Status1')
|
||
wf.store()
|
||
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.data = {}
|
||
formdata.store()
|
||
|
||
# check storing response in backoffice file field
|
||
item = WebserviceCallStatusItem()
|
||
item.parent = st1
|
||
item.backoffice_filefield_id = 'bo1'
|
||
item.url = 'http://remote.example.net/xml'
|
||
item.response_type = 'attachment'
|
||
item.record_errors = True
|
||
item.perform(formdata)
|
||
|
||
assert 'bo1' in formdata.data
|
||
fbo1 = formdata.data['bo1']
|
||
assert fbo1.base_filename == 'file-bo1.xml'
|
||
assert fbo1.content_type == 'text/xml'
|
||
assert fbo1.get_content().startswith(b'<?xml')
|
||
# nothing else is stored
|
||
assert formdata.workflow_data is None
|
||
assert not formdata.evolution[-1].parts
|
||
|
||
# store in backoffice file field + varname
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
item.varname = 'xxx'
|
||
item.perform(formdata)
|
||
# backoffice file field
|
||
assert 'bo1' in formdata.data
|
||
fbo1 = formdata.data['bo1']
|
||
assert fbo1.base_filename == 'xxx.xml'
|
||
assert fbo1.content_type == 'text/xml'
|
||
assert fbo1.get_content().startswith(b'<?xml')
|
||
# varname => workflow_data and AttachmentEvolutionPart
|
||
assert formdata.workflow_data.get('xxx_status') == 200
|
||
assert formdata.workflow_data.get('xxx_content_type') == 'text/xml'
|
||
attachment = formdata.evolution[-1].parts[-1]
|
||
assert isinstance(attachment, AttachmentEvolutionPart)
|
||
assert attachment.base_filename == 'xxx.xml'
|
||
assert attachment.content_type == 'text/xml'
|
||
attachment.fp.seek(0)
|
||
assert attachment.fp.read(5) == b'<?xml'
|
||
|
||
# no more 'bo1' backoffice field: do nothing
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
wf.backoffice_fields_formdef.fields = [
|
||
FileField(id='bo2', label='bo field 2', type='file'), # id != 'bo1'
|
||
]
|
||
item.perform(formdata)
|
||
assert formdata.data == {}
|
||
# backoffice field is not a field file:
|
||
wf.backoffice_fields_formdef.fields = [
|
||
StringField(id='bo1', label='bo field 1', type='string'),
|
||
]
|
||
item.perform(formdata)
|
||
assert formdata.data == {}
|
||
# no field at all:
|
||
wf.backoffice_fields_formdef.fields = []
|
||
item.perform(formdata)
|
||
assert formdata.data == {}
|
||
|
||
|
||
def test_webservice_target_status(pub):
|
||
wf = Workflow(name='boo')
|
||
status1 = wf.add_status('Status1', 'st1')
|
||
status2 = wf.add_status('Status2', 'st2')
|
||
wf.store()
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.parent = status1
|
||
assert item.get_target_status() == [status1.id]
|
||
|
||
item.action_on_app_error = status1.id
|
||
item.action_on_4xx = status2.id
|
||
item.action_on_5xx = status2.id
|
||
targets = item.get_target_status()
|
||
assert len(item.get_target_status()) == 4
|
||
assert targets.count(status1) == 2
|
||
assert targets.count(status2) == 2
|
||
|
||
item.action_on_bad_data = 'st3' # doesn't exist
|
||
targets = item.get_target_status()
|
||
assert len(item.get_target_status()) == 4
|
||
assert targets.count(status1) == 2
|
||
assert targets.count(status2) == 2
|
||
|
||
|
||
def test_webservice_with_complex_data(http_requests, pub):
|
||
pub.substitutions.feed(MockSubstitutionVariables())
|
||
|
||
wf = Workflow(name='wf1')
|
||
st1 = wf.add_status('Status1', 'st1')
|
||
sterr = wf.add_status('StatusErr', 'sterr')
|
||
wf.store()
|
||
|
||
datasource = {'type': 'formula',
|
||
'value': repr([{'id': 'a', 'text': 'aa', 'more': 'aaa'},
|
||
{'id': 'b', 'text': 'bb', 'more': 'bbb'},
|
||
{'id': 'c', 'text': 'cc', 'more': 'ccc'}])}
|
||
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
ItemField(id='1', label='1st field',
|
||
type='item', varname='item',
|
||
data_source=datasource),
|
||
ItemsField(id='2', label='2nd field',
|
||
type='items', varname='items',
|
||
data_source=datasource),
|
||
]
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.data['1'] = 'a'
|
||
formdata.data['1_display'] = 'aa'
|
||
formdata.data['1_structured'] = formdef.fields[0].store_structured_value(formdata.data, '1')
|
||
formdata.data['2'] = ['a', 'b']
|
||
formdata.data['2_display'] = 'aa, bb'
|
||
formdata.data['2_structured'] = formdef.fields[1].store_structured_value(formdata.data, '2')
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
item = WebserviceCallStatusItem()
|
||
item.method = 'POST'
|
||
item.url = 'http://remote.example.net'
|
||
item.post_data = {
|
||
'item': '{{ form_var_item }}',
|
||
'ezt_item': '[form_var_item]',
|
||
'items': '{{ form_var_items }}',
|
||
'ezt_items': '[form_var_items]',
|
||
'item_raw': '{{ form_var_item_raw }}',
|
||
'ezt_item_raw': '[form_var_item_raw]',
|
||
'items_raw': '{{ form_var_items_raw }}',
|
||
'ezt_items_raw': '[form_var_items_raw]',
|
||
}
|
||
pub.substitutions.feed(formdata)
|
||
with get_publisher().complex_data():
|
||
item.perform(formdata)
|
||
assert http_requests.get_last('url') == 'http://remote.example.net'
|
||
assert http_requests.get_last('method') == 'POST'
|
||
payload = json.loads(http_requests.get_last('body'))
|
||
assert payload == {
|
||
'item': 'aa',
|
||
'ezt_item': 'aa',
|
||
'items': 'aa, bb',
|
||
'ezt_items': 'aa, bb',
|
||
'item_raw': 'a',
|
||
'ezt_item_raw': 'a',
|
||
'items_raw': ['a', 'b'],
|
||
'ezt_items_raw': ['a', 'b'],
|
||
}
|
||
|
||
|
||
def test_timeout(two_pubs):
|
||
workflow = Workflow(name='timeout')
|
||
st1 = workflow.add_status('Status1', 'st1')
|
||
st2 = workflow.add_status('Status2', 'st2')
|
||
|
||
jump = JumpWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
jump.by = ['_submitter', '_receiver']
|
||
jump.timeout = 0.1
|
||
jump.status = 'st2'
|
||
st1.items.append(jump)
|
||
jump.parent = st1
|
||
|
||
workflow.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.workflow_id = workflow.id
|
||
assert formdef.get_workflow().id == workflow.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
formdata_id = formdata.id
|
||
|
||
time.sleep(0.3)
|
||
_apply_timeouts(two_pubs)
|
||
|
||
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
|
||
|
||
# check there's no crash on workflow without jumps
|
||
formdef = FormDef()
|
||
formdef.name = 'xxx'
|
||
formdef.store()
|
||
_apply_timeouts(two_pubs)
|
||
|
||
|
||
def test_legacy_timeout(pub):
|
||
workflow = Workflow(name='timeout')
|
||
st1 = workflow.add_status('Status1', 'st1')
|
||
st2 = workflow.add_status('Status2', 'st2')
|
||
|
||
jump = TimeoutWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
jump.timeout = 0.1
|
||
jump.status = 'st2'
|
||
st1.items.append(jump)
|
||
jump.parent = st1
|
||
|
||
workflow.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.workflow_id = workflow.id
|
||
assert formdef.get_workflow().id == workflow.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
formdata_id = formdata.id
|
||
|
||
time.sleep(0.3)
|
||
_apply_timeouts(pub)
|
||
|
||
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
|
||
|
||
|
||
def test_timeout_then_remove(two_pubs):
|
||
workflow = Workflow(name='timeout-then-remove')
|
||
st1 = workflow.add_status('Status1', 'st1')
|
||
st2 = workflow.add_status('Status2', 'st2')
|
||
|
||
jump = JumpWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
jump.by = ['_submitter', '_receiver']
|
||
jump.timeout = 0.1
|
||
jump.status = 'st2'
|
||
st1.items.append(jump)
|
||
jump.parent = st1
|
||
|
||
remove = RemoveWorkflowStatusItem()
|
||
st2.items.append(remove)
|
||
remove.parent = st2
|
||
|
||
workflow.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz%s' % id(two_pubs)
|
||
formdef.fields = []
|
||
formdef.workflow_id = workflow.id
|
||
assert formdef.get_workflow().id == workflow.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
formdata_id = formdata.id
|
||
|
||
assert str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
|
||
|
||
time.sleep(0.2)
|
||
_apply_timeouts(two_pubs)
|
||
|
||
assert not str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
|
||
|
||
|
||
def test_timeout_with_mark(two_pubs):
|
||
workflow = Workflow(name='timeout')
|
||
st1 = workflow.add_status('Status1', 'st1')
|
||
st2 = workflow.add_status('Status2', 'st2')
|
||
|
||
jump = JumpWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
jump.by = ['_submitter', '_receiver']
|
||
jump.timeout = 0.1
|
||
jump.status = 'st2'
|
||
jump.set_marker_on_status = True
|
||
st1.items.append(jump)
|
||
jump.parent = st1
|
||
|
||
workflow.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.workflow_id = workflow.id
|
||
assert formdef.get_workflow().id == workflow.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
formdata_id = formdata.id
|
||
|
||
time.sleep(0.3)
|
||
_apply_timeouts(two_pubs)
|
||
|
||
formdata = formdef.data_class().get(formdata_id)
|
||
assert formdata.workflow_data.get('_markers_stack') == [{'status_id': 'st1'}]
|
||
|
||
|
||
def test_jump_missing_previous_mark(two_pubs):
|
||
if not two_pubs.is_using_postgresql():
|
||
pytest.skip('this requires SQL')
|
||
return
|
||
|
||
workflow = Workflow(name='jump-mark')
|
||
st1 = workflow.add_status('Status1', 'st1')
|
||
|
||
jump = JumpWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
jump.by = ['_submitter', '_receiver']
|
||
jump.status = '_previous'
|
||
jump.timeout = 0.1
|
||
st1.items.append(jump)
|
||
jump.parent = st1
|
||
|
||
workflow.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.workflow_id = workflow.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
time.sleep(0.3)
|
||
two_pubs.loggederror_class.wipe()
|
||
_apply_timeouts(two_pubs)
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
|
||
|
||
def test_sms(pub, sms_mocking):
|
||
pub.cfg['sms'] = {'sender': 'xxx', 'passerelle_url': 'http://passerelle.invalid/'}
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
item = SendSMSWorkflowStatusItem()
|
||
item.to = ['000']
|
||
item.body = 'XXX'
|
||
assert len(sms_mocking.sms) == 0
|
||
item.perform(formdata)
|
||
assert len(sms_mocking.sms) == 1
|
||
assert sms_mocking.sms[0]['destinations'] == ['000']
|
||
assert sms_mocking.sms[0]['text'] == 'XXX'
|
||
|
||
# check None as recipient is not passed to the SMS backend
|
||
item.to = [None]
|
||
item.perform(formdata) # nothing
|
||
assert len(sms_mocking.sms) == 1
|
||
|
||
item.to = ['000', None]
|
||
item.perform(formdata)
|
||
assert len(sms_mocking.sms) == 2
|
||
assert sms_mocking.sms[1]['destinations'] == ['000']
|
||
assert sms_mocking.sms[1]['text'] == 'XXX'
|
||
|
||
pub.substitutions.feed(MockSubstitutionVariables())
|
||
item.body = 'dj{{ bar }}'
|
||
item.perform(formdata)
|
||
assert len(sms_mocking.sms) == 3
|
||
assert sms_mocking.sms[2]['destinations'] == ['000']
|
||
assert sms_mocking.sms[2]['text'] == 'djFoobar'
|
||
|
||
item.body = 'ezt[bar]'
|
||
item.perform(formdata)
|
||
assert len(sms_mocking.sms) == 4
|
||
assert sms_mocking.sms[3]['destinations'] == ['000']
|
||
assert sms_mocking.sms[3]['text'] == 'eztFoobar'
|
||
|
||
# disable SMS system
|
||
pub.cfg['sms'] = {'mode': 'none'}
|
||
item.to = ['000']
|
||
item.body = 'XXX'
|
||
item.perform(formdata) # nothing
|
||
assert len(sms_mocking.sms) == 4
|
||
|
||
|
||
def test_sms_with_passerelle(pub):
|
||
pub.cfg['sms'] = {'mode': 'passerelle',
|
||
'passerelle_url': 'http://passerelle.example.com/send?nostop=1',
|
||
'sender': 'Passerelle'}
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
item = SendSMSWorkflowStatusItem()
|
||
item.to = ['1234']
|
||
item.body = 'my message'
|
||
with mock.patch('wcs.wscalls.get_secret_and_orig') as mocked_secret_and_orig:
|
||
mocked_secret_and_orig.return_value = ('secret', 'localhost')
|
||
with mock.patch('wcs.qommon.misc._http_request') as mocked_http_post:
|
||
mocked_http_post.return_value = (mock.Mock(headers={}), 200, 'data', 'headers')
|
||
item.perform(formdata)
|
||
url = mocked_http_post.call_args[0][0]
|
||
payload = mocked_http_post.call_args[1]['body']
|
||
assert 'http://passerelle.example.com' in url
|
||
assert '?nostop=1' in url
|
||
assert 'orig=localhost' in url
|
||
assert 'signature=' in url
|
||
json_payload = json.loads(payload)
|
||
assert 'message' in json_payload
|
||
assert json_payload['message'] == 'my message'
|
||
assert json_payload['to'] == ['1234']
|
||
assert json_payload['from'] == 'Passerelle'
|
||
|
||
|
||
def test_display_form(two_pubs):
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
wf = Workflow(name='status')
|
||
st1 = wf.add_status('Status1', 'st1')
|
||
|
||
display_form = FormWorkflowStatusItem()
|
||
display_form.id = '_x'
|
||
display_form.varname = 'xxx'
|
||
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
||
display_form.formdef.fields.append(StringField(id='1', label='Test',
|
||
type='string'))
|
||
display_form.formdef.fields.append(DateField(id='2', label='Date',
|
||
type='date', varname='date'))
|
||
st1.items.append(display_form)
|
||
display_form.parent = st1
|
||
|
||
form = Form(action='#', use_tokens=False)
|
||
display_form.fill_form(form, formdata, None)
|
||
assert form.widgets[0].title == 'Test'
|
||
assert form.widgets[1].title == 'Date'
|
||
|
||
two_pubs.get_request().environ['REQUEST_METHOD'] = 'POST'
|
||
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '2015-05-12', 'submit': 'submit'}
|
||
display_form.submit_form(form, formdata, None, None)
|
||
|
||
assert formdata.get_substitution_variables()['xxx_var_date'] == '2015-05-12'
|
||
|
||
two_pubs.cfg['language'] = {'language': 'fr'}
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
form = Form(action='#', use_tokens=False)
|
||
display_form.fill_form(form, formdata, None)
|
||
two_pubs.get_request().environ['REQUEST_METHOD'] = 'POST'
|
||
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '12/05/2015', 'submit': 'submit'}
|
||
display_form.submit_form(form, formdata, None, None)
|
||
assert formdata.get_substitution_variables()['xxx_var_date'] == '12/05/2015'
|
||
|
||
assert formdata.get_substitution_variables()['xxx_var_date_raw'] == \
|
||
time.strptime('2015-05-12', '%Y-%m-%d')
|
||
|
||
two_pubs.cfg['language'] = {'language': 'en'}
|
||
|
||
|
||
def test_display_form_and_comment(pub):
|
||
role = Role(name='bar1')
|
||
role.store()
|
||
|
||
user = pub.user_class()
|
||
user.roles = [role.id]
|
||
user.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
wf = Workflow(name='status')
|
||
st1 = wf.add_status('Status1', 'st1')
|
||
|
||
display_form = FormWorkflowStatusItem()
|
||
display_form.by = [role.id]
|
||
display_form.id = '_x'
|
||
display_form.varname = 'xxx'
|
||
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
||
display_form.formdef.fields.append(CommentField(id='1', label='Test',
|
||
type='comment'))
|
||
st1.items.append(display_form)
|
||
display_form.parent = st1
|
||
|
||
commentable = CommentableWorkflowStatusItem()
|
||
commentable.by = [role.id]
|
||
st1.items.append(commentable)
|
||
commentable.parent = st1
|
||
|
||
wf.store()
|
||
|
||
formdef.workflow = wf
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
assert formdata.get_status().name == 'Status1'
|
||
|
||
form = formdata.get_workflow_form(user)
|
||
assert 'Test' in str(form.widgets[0].render())
|
||
assert '<textarea' in str(form.widgets[1].render())
|
||
|
||
|
||
def test_display_form_migration(pub):
|
||
wf = Workflow(name='status')
|
||
st1 = wf.add_status('Status1', 'st1')
|
||
|
||
display_form = FormWorkflowStatusItem()
|
||
display_form.id = '_x'
|
||
display_form.varname = 'xxx'
|
||
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
||
display_form.formdef.fields = [
|
||
ItemField(id='1', label='Test', type='item')
|
||
]
|
||
st1.items.append(display_form)
|
||
display_form.parent = st1
|
||
|
||
display_form.formdef.fields[0].show_as_radio = True
|
||
wf.store()
|
||
|
||
wf = Workflow.get(wf.id)
|
||
assert wf.possible_status[0].items[0].formdef.fields[0].display_mode == 'radio'
|
||
|
||
|
||
def test_choice_button_no_label(pub):
|
||
role = Role(name='bar1')
|
||
role.store()
|
||
|
||
user = pub.user_class()
|
||
user.roles = [role.id]
|
||
user.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
wf = Workflow(name='status')
|
||
st1 = wf.add_status('Status1', 'st1')
|
||
|
||
choice = ChoiceWorkflowStatusItem()
|
||
choice.by = [role.id]
|
||
choice.id = '_x'
|
||
st1.items.append(choice)
|
||
choice.parent = st1
|
||
|
||
choice2 = ChoiceWorkflowStatusItem()
|
||
choice2.label = 'TEST'
|
||
choice2.by = [role.id]
|
||
choice2.id = '_x2'
|
||
st1.items.append(choice2)
|
||
choice2.parent = st1
|
||
|
||
wf.store()
|
||
|
||
formdef.workflow = wf
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
assert formdata.get_status().name == 'Status1'
|
||
|
||
form = formdata.get_workflow_form(user)
|
||
form.render()
|
||
assert str(form.render()).count('<button') == 1
|
||
assert '>TEST</button>' in str(form.render())
|
||
|
||
|
||
def test_workflow_role_type_migration(pub):
|
||
workflow = Workflow(name='role migration')
|
||
st1 = workflow.add_status('Status1', 'st1')
|
||
|
||
jump = JumpWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
jump.by = [1, 2]
|
||
st1.items.append(jump)
|
||
jump.parent = st1
|
||
|
||
workflow.store()
|
||
|
||
reloaded_workflow = Workflow.get(workflow.id)
|
||
assert reloaded_workflow.possible_status[0].items[0].by == ['1', '2']
|
||
|
||
|
||
def test_workflow_display_message(pub):
|
||
pub.substitutions.feed(MockSubstitutionVariables())
|
||
|
||
workflow = Workflow(name='display message')
|
||
st1 = workflow.add_status('Status1', 'st1')
|
||
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'foobar'
|
||
formdef.fields = []
|
||
formdef.store()
|
||
formdata = formdef.data_class()()
|
||
formdata.id = '1'
|
||
|
||
display_message = DisplayMessageWorkflowStatusItem()
|
||
display_message.parent = st1
|
||
|
||
display_message.message = 'test'
|
||
assert display_message.get_message(formdata) == 'test'
|
||
|
||
display_message.message = '{{ number }}'
|
||
assert display_message.get_message(formdata) == str(formdata.id)
|
||
|
||
display_message.message = '[number]'
|
||
assert display_message.get_message(formdata) == str(formdata.id)
|
||
|
||
display_message.message = '{{ bar }}'
|
||
assert display_message.get_message(formdata) == 'Foobar'
|
||
|
||
display_message.message = '[bar]'
|
||
assert display_message.get_message(formdata) == 'Foobar'
|
||
|
||
# makes sure the string is correctly escaped for HTML
|
||
display_message.message = '{{ foo }}'
|
||
assert display_message.get_message(formdata) == '1 < 3'
|
||
display_message.message = '[foo]'
|
||
assert display_message.get_message(formdata) == '1 < 3'
|
||
|
||
|
||
def test_workflow_display_message_to(pub):
|
||
workflow = Workflow(name='display message to')
|
||
st1 = workflow.add_status('Status1', 'st1')
|
||
|
||
role = Role(name='foorole')
|
||
role.store()
|
||
role2 = Role(name='no-one-role')
|
||
role2.store()
|
||
user = pub.user_class(name='baruser')
|
||
user.roles = []
|
||
user.store()
|
||
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.url_name = 'foobar'
|
||
formdef._workflow = workflow
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.status = 'wf-st1'
|
||
|
||
display_message = DisplayMessageWorkflowStatusItem()
|
||
display_message.parent = st1
|
||
st1.items.append(display_message)
|
||
|
||
display_message.message = 'all'
|
||
display_message.to = None
|
||
assert display_message.get_message(formdata) == 'all'
|
||
assert formdata.get_workflow_messages() == ['all']
|
||
|
||
display_message.message = 'to-role'
|
||
display_message.to = [role.id]
|
||
assert display_message.get_message(formdata) == ''
|
||
assert formdata.get_workflow_messages() == []
|
||
|
||
pub._request._user = user
|
||
display_message.message = 'to-role'
|
||
display_message.to = [role.id]
|
||
assert display_message.get_message(formdata) == ''
|
||
assert formdata.get_workflow_messages() == []
|
||
user.roles = [role.id]
|
||
assert display_message.get_message(formdata) == 'to-role'
|
||
assert formdata.get_workflow_messages() == ['to-role']
|
||
|
||
user.roles = []
|
||
display_message.message = 'to-submitter'
|
||
display_message.to = ['_submitter']
|
||
assert display_message.get_message(formdata) == ''
|
||
assert formdata.get_workflow_messages() == []
|
||
formdata.user_id = user.id
|
||
assert display_message.get_message(formdata) == 'to-submitter'
|
||
assert formdata.get_workflow_messages() == ['to-submitter']
|
||
|
||
display_message.message = 'to-role-or-submitter'
|
||
display_message.to = [role.id, '_submitter']
|
||
assert display_message.get_message(formdata) == 'to-role-or-submitter'
|
||
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
|
||
formdata.user_id = None
|
||
assert display_message.get_message(formdata) == ''
|
||
assert formdata.get_workflow_messages() == []
|
||
user.roles = [role.id]
|
||
assert display_message.get_message(formdata) == 'to-role-or-submitter'
|
||
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
|
||
formdata.user_id = user.id
|
||
assert display_message.get_message(formdata) == 'to-role-or-submitter'
|
||
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
|
||
|
||
display_message.to = [role2.id]
|
||
assert display_message.get_message(formdata) == ''
|
||
assert formdata.get_workflow_messages() == []
|
||
|
||
display_message.message = 'd1'
|
||
display_message2 = DisplayMessageWorkflowStatusItem()
|
||
display_message2.parent = st1
|
||
st1.items.append(display_message2)
|
||
display_message2.message = 'd2'
|
||
display_message2.to = [role.id, '_submitter']
|
||
assert formdata.get_workflow_messages() == ['d2']
|
||
user.roles = [role.id, role2.id]
|
||
assert 'd1' in formdata.get_workflow_messages()
|
||
assert 'd2' in formdata.get_workflow_messages()
|
||
|
||
|
||
def test_workflow_display_message_line_details(pub):
|
||
workflow = Workflow(name='display message to')
|
||
st1 = workflow.add_status('Status1', 'st1')
|
||
display_message = DisplayMessageWorkflowStatusItem()
|
||
display_message.parent = st1
|
||
|
||
assert display_message.get_line_details() == 'top of page'
|
||
display_message.position = 'top'
|
||
assert display_message.get_line_details() == 'top of page'
|
||
display_message.position = 'bottom'
|
||
assert display_message.get_line_details() == 'bottom of page'
|
||
display_message.position = 'actions'
|
||
assert display_message.get_line_details() == 'with actions'
|
||
|
||
role = Role(name='foorole')
|
||
role.store()
|
||
display_message.to = [role.id]
|
||
assert display_message.get_line_details() == 'with actions, for foorole'
|
||
|
||
|
||
def test_workflow_roles(pub, emails):
|
||
pub.substitutions.feed(MockSubstitutionVariables())
|
||
|
||
user = pub.user_class(name='foo')
|
||
user.email = 'zorg@localhost'
|
||
user.store()
|
||
|
||
Role.wipe()
|
||
role1 = Role(name='foo')
|
||
role1.emails = ['foo@localhost']
|
||
role1.details = 'Hello World'
|
||
role1.store()
|
||
|
||
role2 = Role(name='bar')
|
||
role2.emails = ['bar@localhost', 'baz@localhost']
|
||
role2.store()
|
||
|
||
workflow = Workflow(name='wf roles')
|
||
st1 = workflow.add_status('Status1', 'st1')
|
||
item = SendmailWorkflowStatusItem()
|
||
item.to = ['_receiver', '_other']
|
||
item.subject = 'Foobar'
|
||
item.body = 'Hello'
|
||
st1.items.append(item)
|
||
item.parent = st1
|
||
workflow.roles['_other'] = 'Other Function'
|
||
workflow.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.workflow_roles = {'_receiver': role1.id, '_other': role2.id}
|
||
formdef.workflow_id = workflow.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
emails.empty()
|
||
item.perform(formdata)
|
||
get_response().process_after_jobs()
|
||
assert emails.count() == 1
|
||
assert emails.get('Foobar')
|
||
assert set(emails.get('Foobar')['email_rcpt']) == set(
|
||
['foo@localhost', 'bar@localhost', 'baz@localhost'])
|
||
|
||
workflow.roles['_slug-with-dash'] = 'Dashed Function'
|
||
workflow.store()
|
||
formdef.workflow_roles['_slug-with-dash'] = role1.id
|
||
formdef.store()
|
||
substvars = formdata.get_substitution_variables()
|
||
assert substvars.get('form_role_other_name') == 'bar'
|
||
assert substvars.get('form_role_slug_with_dash_name') == 'foo'
|
||
assert substvars.get('form_role_slug_with_dash_details') == 'Hello World'
|
||
|
||
|
||
def test_criticality(pub):
|
||
FormDef.wipe()
|
||
|
||
workflow = Workflow(name='criticality')
|
||
workflow.criticality_levels = [
|
||
WorkflowCriticalityLevel(name='green'),
|
||
WorkflowCriticalityLevel(name='yellow'),
|
||
WorkflowCriticalityLevel(name='red'),
|
||
]
|
||
workflow.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.workflow_id = workflow.id
|
||
formdef.store()
|
||
|
||
item = ModifyCriticalityWorkflowStatusItem()
|
||
|
||
formdata = formdef.data_class()()
|
||
item.perform(formdata)
|
||
assert formdata.get_criticality_level_object().name == 'yellow'
|
||
|
||
formdata = formdef.data_class()()
|
||
item.mode = MODE_INC
|
||
item.perform(formdata)
|
||
assert formdata.get_criticality_level_object().name == 'yellow'
|
||
item.perform(formdata)
|
||
assert formdata.get_criticality_level_object().name == 'red'
|
||
item.perform(formdata)
|
||
assert formdata.get_criticality_level_object().name == 'red'
|
||
|
||
item.mode = MODE_DEC
|
||
item.perform(formdata)
|
||
assert formdata.get_criticality_level_object().name == 'yellow'
|
||
item.perform(formdata)
|
||
assert formdata.get_criticality_level_object().name == 'green'
|
||
item.perform(formdata)
|
||
assert formdata.get_criticality_level_object().name == 'green'
|
||
|
||
item.mode = MODE_SET
|
||
item.absolute_value = 2
|
||
item.perform(formdata)
|
||
assert formdata.get_criticality_level_object().name == 'red'
|
||
item.absolute_value = 0
|
||
item.perform(formdata)
|
||
assert formdata.get_criticality_level_object().name == 'green'
|
||
|
||
|
||
def test_geolocate_address(pub):
|
||
formdef = FormDef()
|
||
formdef.geolocations = {'base': 'bla'}
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
StringField(id='1', label='String', type='string', varname='string'),
|
||
]
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'1': '169 rue du chateau'}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item = GeolocateWorkflowStatusItem()
|
||
item.method = 'address_string'
|
||
item.address_string = '[form_var_string], paris, france'
|
||
|
||
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
||
http_get_page.return_value = (None, 200,
|
||
force_bytes(json.dumps([{'lat':'48.8337085','lon':'2.3233693'}])), None)
|
||
item.perform(formdata)
|
||
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0]
|
||
assert urlparse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
|
||
assert int(formdata.geolocations['base']['lat']) == 48
|
||
assert int(formdata.geolocations['base']['lon']) == 2
|
||
|
||
pub.load_site_options()
|
||
pub.site_options.set('options', 'nominatim_key', 'KEY')
|
||
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
||
http_get_page.return_value = (None, 200,
|
||
force_bytes(json.dumps([{'lat':'48.8337085', 'lon':'2.3233693'}])), None)
|
||
item.perform(formdata)
|
||
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0]
|
||
assert urlparse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
|
||
assert 'key=KEY' in http_get_page.call_args[0][0]
|
||
assert int(formdata.geolocations['base']['lat']) == 48
|
||
assert int(formdata.geolocations['base']['lon']) == 2
|
||
|
||
pub.load_site_options()
|
||
pub.site_options.set('options', 'geocoding_service_url', 'http://example.net/')
|
||
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
||
http_get_page.return_value = (None, 200,
|
||
force_bytes(json.dumps([{'lat':'48.8337085', 'lon':'2.3233693'}])), None)
|
||
item.perform(formdata)
|
||
assert 'http://example.net/?q=' in http_get_page.call_args[0][0]
|
||
|
||
pub.site_options.set('options', 'geocoding_service_url', 'http://example.net/?param=value')
|
||
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
||
http_get_page.return_value = (None, 200,
|
||
force_bytes(json.dumps([{'lat':'48.8337085', 'lon':'2.3233693'}])), None)
|
||
item.perform(formdata)
|
||
assert 'http://example.net/?param=value&' in http_get_page.call_args[0][0]
|
||
|
||
# check for invalid ezt
|
||
item.address_string = '[if-any], paris, france'
|
||
formdata.geolocations = None
|
||
item.perform(formdata)
|
||
assert formdata.geolocations == {}
|
||
|
||
# check for None
|
||
item.address_string = '=None'
|
||
formdata.geolocations = None
|
||
item.perform(formdata)
|
||
assert formdata.geolocations == {}
|
||
|
||
# check for nominatim server error
|
||
formdata.geolocations = None
|
||
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
||
http_get_page.return_value = (None, 500,
|
||
force_bytes(json.dumps([{'lat':'48.8337085', 'lon':'2.3233693'}])), None)
|
||
item.perform(formdata)
|
||
assert formdata.geolocations == {}
|
||
|
||
# check for nominatim returning an empty result set
|
||
formdata.geolocations = None
|
||
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
||
http_get_page.return_value = (None, 200, force_bytes(json.dumps([])), None)
|
||
item.perform(formdata)
|
||
assert formdata.geolocations == {}
|
||
|
||
# check for nominatim bad json
|
||
formdata.geolocations = None
|
||
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
||
http_get_page.return_value = (None, 200, b'bad json', None)
|
||
item.perform(formdata)
|
||
assert formdata.geolocations == {}
|
||
|
||
# check for nominatim connection error
|
||
formdata.geolocations = None
|
||
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
||
http_get_page.side_effect = ConnectionError
|
||
item.perform(formdata)
|
||
assert formdata.geolocations == {}
|
||
|
||
|
||
def test_geolocate_image(pub):
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.geolocations = {'base': 'bla'}
|
||
formdef.fields = [
|
||
FileField(id='3', label='File', type='file', varname='file'),
|
||
]
|
||
formdef.store()
|
||
|
||
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
||
upload.receive([open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()])
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'3': upload}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item = GeolocateWorkflowStatusItem()
|
||
item.method = 'photo_variable'
|
||
|
||
item.photo_variable = '=form_var_file_raw'
|
||
item.perform(formdata)
|
||
assert int(formdata.geolocations['base']['lat']) == -1
|
||
assert int(formdata.geolocations['base']['lon']) == 6
|
||
|
||
# invalid expression
|
||
formdata.geolocations = None
|
||
item.photo_variable = '=1/0'
|
||
item.perform(formdata)
|
||
assert formdata.geolocations == {}
|
||
|
||
# invalid type
|
||
formdata.geolocations = None
|
||
item.photo_variable = '="bla"'
|
||
item.perform(formdata)
|
||
assert formdata.geolocations == {}
|
||
|
||
# invalid photo
|
||
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
||
upload.receive([open(os.path.join(os.path.dirname(__file__), 'template.odt'), 'rb').read()])
|
||
formdata.data = {'3': upload}
|
||
formdata.geolocations = None
|
||
item.perform(formdata)
|
||
assert formdata.geolocations == {}
|
||
|
||
|
||
def test_geolocate_map(pub):
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.geolocations = {'base': 'bla'}
|
||
formdef.fields = [
|
||
MapField(id='2', label='Map', type='map', varname='map'),
|
||
]
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'2': '48.8337085;2.3233693'}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item = GeolocateWorkflowStatusItem()
|
||
item.method = 'map_variable'
|
||
item.map_variable = '=form_var_map'
|
||
|
||
item.perform(formdata)
|
||
assert int(formdata.geolocations['base']['lat']) == 48
|
||
assert int(formdata.geolocations['base']['lon']) == 2
|
||
|
||
# invalid data
|
||
formdata.geolocations = None
|
||
item.map_variable = '=form_var'
|
||
item.perform(formdata)
|
||
assert formdata.geolocations == {}
|
||
|
||
|
||
def test_geolocate_overwrite(pub):
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.geolocations = {'base': 'bla'}
|
||
formdef.fields = [
|
||
MapField(id='2', label='Map', type='map', varname='map'),
|
||
]
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'2': '48.8337085;2.3233693'}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item = GeolocateWorkflowStatusItem()
|
||
item.method = 'map_variable'
|
||
item.map_variable = '=form_var_map'
|
||
|
||
item.perform(formdata)
|
||
assert int(formdata.geolocations['base']['lat']) == 48
|
||
assert int(formdata.geolocations['base']['lon']) == 2
|
||
|
||
formdata.data = {'2': '48.8337085;3.3233693'}
|
||
item.perform(formdata)
|
||
assert int(formdata.geolocations['base']['lat']) == 48
|
||
assert int(formdata.geolocations['base']['lon']) == 3
|
||
|
||
formdata.data = {'2': '48.8337085;4.3233693'}
|
||
item.overwrite = False
|
||
item.perform(formdata)
|
||
assert int(formdata.geolocations['base']['lat']) == 48
|
||
assert int(formdata.geolocations['base']['lon']) == 3
|
||
|
||
|
||
@pytest.mark.skipif(transform_to_pdf is None, reason='libreoffice not found')
|
||
def test_transform_to_pdf():
|
||
instream = open(os.path.join(os.path.dirname(__file__), 'template.odt'), 'rb')
|
||
outstream = transform_to_pdf(instream)
|
||
assert outstream is not False
|
||
assert outstream.read(10).startswith(b'%PDF-')
|
||
|
||
|
||
def test_export_to_model_image(pub):
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
FileField(id='3', label='File', type='file', varname='image'),
|
||
]
|
||
formdef.store()
|
||
|
||
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
||
image_data = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
|
||
upload.receive([image_data])
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'3': upload}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item = ExportToModel()
|
||
item.convert_to_pdf = False
|
||
item.method = 'non-interactive'
|
||
template_filename = os.path.join(os.path.dirname(__file__), 'template-with-image.odt')
|
||
template = open(template_filename, 'rb').read()
|
||
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
|
||
upload.fp = BytesIO()
|
||
upload.fp.write(template)
|
||
upload.fp.seek(0)
|
||
item.model_file = UploadedFile(pub.app_dir, None, upload)
|
||
item.attach_to_history = True
|
||
|
||
item.perform(formdata)
|
||
|
||
assert formdata.evolution[-1].parts[-1].base_filename == 'template.odt'
|
||
zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
|
||
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
|
||
# check the image has been replaced by the one from the formdata
|
||
assert zinfo.file_size == len(image_data)
|
||
|
||
# check with missing data or wrong kind of data
|
||
for field_value in (None, 'wrong kind'):
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'3': field_value}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item.perform(formdata)
|
||
|
||
zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
|
||
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
|
||
# check the original image has been left
|
||
assert zinfo.file_size == 580
|
||
|
||
item.filename = 'formulaire-{{form_number}}/2.odt'
|
||
item.perform(formdata)
|
||
assert formdata.evolution[-1].parts[-1].base_filename == 'formulaire-%s-%s-2.odt' % (formdef.id,
|
||
formdata.id)
|
||
|
||
|
||
def test_export_to_model_backoffice_field(pub):
|
||
wf = Workflow(name='email with attachments')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
wf.backoffice_fields_formdef.fields = [
|
||
FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'),
|
||
]
|
||
st1 = wf.add_status('Status1')
|
||
wf.store()
|
||
formdef = FormDef()
|
||
formdef.name = 'foo-export-to-bofile'
|
||
formdef.fields = [
|
||
StringField(id='1', label='String', type='string', varname='string'),
|
||
]
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item = ExportToModel()
|
||
item.method = 'non-interactive'
|
||
item.convert_to_pdf = False
|
||
template_filename = os.path.join(os.path.dirname(__file__), 'template.odt')
|
||
template = open(template_filename, 'rb').read()
|
||
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream')
|
||
upload.fp = BytesIO()
|
||
upload.fp.write(template)
|
||
upload.fp.seek(0)
|
||
item.model_file = UploadedFile(pub.app_dir, None, upload)
|
||
item.parent = st1
|
||
item.backoffice_filefield_id = 'bo1'
|
||
item.perform(formdata)
|
||
|
||
assert 'bo1' in formdata.data
|
||
fbo1 = formdata.data['bo1']
|
||
assert fbo1.base_filename == 'template.odt'
|
||
assert fbo1.content_type == 'application/octet-stream'
|
||
zfile = zipfile.ZipFile(fbo1.get_file())
|
||
assert b'foo-export-to-bofile' in zfile.read('content.xml')
|
||
|
||
# no more 'bo1' backoffice field: do nothing
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
# id is not bo1:
|
||
wf.backoffice_fields_formdef.fields = [
|
||
FileField(id='bo2', label='bo field 2', type='file'),
|
||
]
|
||
item.perform(formdata)
|
||
assert formdata.data == {}
|
||
# field is not a field file:
|
||
wf.backoffice_fields_formdef.fields = [
|
||
StringField(id='bo1', label='bo field 1', type='string'),
|
||
]
|
||
item.perform(formdata)
|
||
assert formdata.data == {}
|
||
# no field at all:
|
||
wf.backoffice_fields_formdef.fields = []
|
||
item.perform(formdata)
|
||
assert formdata.data == {}
|
||
|
||
|
||
def test_export_to_model_django_template(pub):
|
||
formdef = FormDef()
|
||
formdef.name = 'foo-export-to-template-with-django'
|
||
formdef.fields = [
|
||
StringField(id='1', label='String', type='string', varname='string'),
|
||
]
|
||
formdef.store()
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item = ExportToModel()
|
||
item.method = 'non-interactive'
|
||
item.attach_to_history = True
|
||
template_filename = os.path.join(os.path.dirname(__file__), 'template-django.odt')
|
||
template = open(template_filename, 'rb').read()
|
||
upload = QuixoteUpload('/foo/template-django.odt', content_type='application/octet-stream')
|
||
upload.fp = BytesIO()
|
||
upload.fp.write(template)
|
||
upload.fp.seek(0)
|
||
item.model_file = UploadedFile(pub.app_dir, None, upload)
|
||
item.convert_to_pdf = False
|
||
item.perform(formdata)
|
||
|
||
new_content = zipfile.ZipFile(open(formdata.evolution[0].parts[0].filename, 'rb')).read('content.xml')
|
||
assert b'>foo-export-to-template-with-django<' in new_content
|
||
|
||
formdef.name = 'Name with a \' simple quote'
|
||
formdef.store()
|
||
item.perform(formdata)
|
||
|
||
new_content = zipfile.ZipFile(open(formdata.evolution[0].parts[1].filename, 'rb')).read('content.xml')
|
||
assert b'>Name with a \' simple quote<' in new_content
|
||
|
||
formdef.name = 'A <> name'
|
||
formdef.store()
|
||
item.perform(formdata)
|
||
|
||
new_content = zipfile.ZipFile(open(formdata.evolution[0].parts[2].filename, 'rb')).read('content.xml')
|
||
assert b'>A <> name<' in new_content
|
||
|
||
|
||
@pytest.mark.parametrize('filename', ['template-form-details.odt', 'template-form-details-no-styles.odt'])
|
||
def test_export_to_model_form_details_section(pub, filename):
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'foo-export-details'
|
||
formdef.fields = [
|
||
PageField(id='1', label='Page 1', type='page'),
|
||
TitleField(id='2', label='Title', type='title'),
|
||
SubtitleField(id='3', label='Subtitle', type='subtitle'),
|
||
StringField(id='4', label='String', type='string', varname='string'),
|
||
EmailField(id='5', label='Email', type='email'),
|
||
TextField(id='6', label='Text', type='text'),
|
||
BoolField(id='8', label='Bool', type='bool'),
|
||
FileField(id='9', label='File', type='file'),
|
||
DateField(id='10', label='Date', type='date'),
|
||
ItemField(id='11', label='Item', type='item', items=['foo', 'bar']),
|
||
ItemsField(id='11', label='Items', type='items', items=['foo', 'bar']),
|
||
TableField(id='12', label='Table', type='table', columns=['a', 'b'], rows=['c', 'd']),
|
||
PageField(id='13', label='Empty Page', type='page'),
|
||
TitleField(id='14', label='Empty Title', type='title'),
|
||
StringField(id='15', label='Empty String', type='string', varname='invisiblestr'),
|
||
]
|
||
formdef.store()
|
||
formdef.data_class().wipe()
|
||
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
||
upload.receive([open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()])
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {
|
||
'4': 'string',
|
||
'5': 'foo@localhost',
|
||
'6': 'para1\npara2',
|
||
'8': False,
|
||
'9': upload,
|
||
'10': time.strptime('2015-05-12', '%Y-%m-%d'),
|
||
'11': 'foo',
|
||
'12': [['1', '2'], ['3', '4']],
|
||
}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
pub.substitutions.feed(formdata)
|
||
|
||
item = ExportToModel()
|
||
item.method = 'non-interactive'
|
||
item.attach_to_history = True
|
||
template_filename = os.path.join(os.path.dirname(__file__), filename)
|
||
template = open(template_filename, 'rb').read()
|
||
upload = QuixoteUpload(filename, content_type='application/octet-stream')
|
||
upload.fp = BytesIO()
|
||
upload.fp.write(template)
|
||
upload.fp.seek(0)
|
||
item.model_file = UploadedFile(pub.app_dir, None, upload)
|
||
item.convert_to_pdf = False
|
||
item.perform(formdata)
|
||
|
||
new_content = force_text(zipfile.ZipFile(open(formdata.evolution[0].parts[0].filename, 'rb')).read('content.xml'))
|
||
assert 'Titre de page' not in new_content # section contents has been replaced
|
||
assert '>Page 1<' in new_content
|
||
assert '>Title<' in new_content
|
||
assert '>Subtitle<' in new_content
|
||
assert '<text:span>string</text:span>' in new_content
|
||
assert '>para1<' in new_content
|
||
assert '>para2<' in new_content
|
||
assert '<text:span>No</text:span>' in new_content
|
||
assert 'xlink:href="http://example.net/foo-export-details/1/download?f=9"' in new_content
|
||
assert '>test.jpeg</text:a' in new_content
|
||
assert '>2015-05-12<' in new_content
|
||
assert 'Invisible' not in new_content
|
||
assert new_content.count('/table:table-cell') == 8
|
||
|
||
if filename == 'template-form-details-no-styles.odt':
|
||
new_styles = force_text(zipfile.ZipFile(open(formdata.evolution[0].parts[0].filename, 'rb')).read('styles.xml'))
|
||
assert 'Field_20_Label' in new_styles
|
||
|
||
|
||
def test_global_timeouts(two_pubs):
|
||
pub = two_pubs
|
||
FormDef.wipe()
|
||
Workflow.wipe()
|
||
|
||
workflow = Workflow(name='global-timeouts')
|
||
workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
|
||
workflow.criticality_levels = [
|
||
WorkflowCriticalityLevel(name='green'),
|
||
WorkflowCriticalityLevel(name='yellow'),
|
||
WorkflowCriticalityLevel(name='red'),
|
||
]
|
||
action = workflow.add_global_action('Timeout Test')
|
||
item = action.append_item('modify_criticality')
|
||
trigger = action.append_trigger('timeout')
|
||
trigger.anchor = 'creation'
|
||
workflow.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.workflow_id = workflow.id
|
||
formdef.store()
|
||
formdef.data_class().wipe()
|
||
|
||
formdata1 = formdef.data_class()()
|
||
formdata1.just_created()
|
||
formdata1.store()
|
||
|
||
# delay isn't set yet, no crash
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
||
|
||
# delay didn't expire yet, no change
|
||
trigger.timeout = '2'
|
||
workflow.store()
|
||
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
||
|
||
formdata1.receipt_time = time.localtime(time.time()-3*86400)
|
||
formdata1.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
|
||
# make sure it's not triggered a second time
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
|
||
# change id so it's triggered again
|
||
trigger.id = 'XXX1'
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'red'
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'red'
|
||
|
||
# reset formdata to initial state
|
||
formdata1.store()
|
||
|
||
trigger.anchor = '1st-arrival'
|
||
trigger.anchor_status_first = None
|
||
workflow.store()
|
||
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
||
|
||
formdata1.evolution[-1].time = time.localtime(time.time()-3*86400)
|
||
formdata1.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
|
||
formdata1.store() # reset
|
||
|
||
# bad (obsolete) status: do nothing
|
||
trigger.anchor_status_first = 'wf-foobar'
|
||
workflow.store()
|
||
formdata1.evolution[-1].time = time.localtime(time.time()-3*86400)
|
||
formdata1.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
||
formdata1.store()
|
||
|
||
trigger.anchor = 'latest-arrival'
|
||
trigger.anchor_status_latest = None
|
||
workflow.store()
|
||
|
||
formdata1.evolution[-1].time = time.localtime()
|
||
formdata1.store()
|
||
formdata1.jump_status('new')
|
||
formdata1.evolution[-1].time = time.localtime(time.time()-7*86400)
|
||
formdata1.jump_status('accepted')
|
||
formdata1.jump_status('new')
|
||
formdata1.evolution[-1].time = time.localtime(time.time()-1*86400)
|
||
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
||
|
||
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
|
||
formdata1.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
formdata1.store()
|
||
|
||
# limit trigger to formdata with "accepted" status
|
||
trigger.anchor_status_latest = 'wf-accepted'
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
||
formdata1.store()
|
||
|
||
# limit trigger to formdata with "new" status
|
||
trigger.anchor_status_latest = 'wf-new'
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
formdata1.store()
|
||
|
||
# bad (obsolete) status: do nothing
|
||
trigger.anchor_status_latest = 'wf-foobar'
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
||
formdata1.store()
|
||
|
||
# check trigger is not run on finalized formdata
|
||
formdata1.jump_status('finished')
|
||
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
|
||
formdata1.store()
|
||
trigger.anchor = 'creation'
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
||
formdata1.store()
|
||
|
||
# check trigger is run on finalized formdata when anchor status is an
|
||
# endpoint
|
||
formdata1.jump_status('finished')
|
||
formdata1.evolution[-1].last_jump_datetime = None
|
||
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
|
||
formdata1.store()
|
||
trigger.anchor = 'latest-arrival'
|
||
trigger.anchor_status_latest = 'wf-finished'
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
formdata1.store()
|
||
|
||
# check "finalized" anchor
|
||
trigger.anchor = 'finalized'
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
formdata1.store()
|
||
|
||
# use python expression as anchor
|
||
# timestamp
|
||
formdata1.jump_status('new')
|
||
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
|
||
formdata1.evolution[-1].last_jump_datetime = None
|
||
formdata1.store()
|
||
|
||
trigger.anchor = 'python'
|
||
trigger.anchor_expression = repr(time.time())
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
||
|
||
trigger.anchor = 'python'
|
||
trigger.anchor_expression = repr(time.time() - 10*86400)
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
formdata1.store()
|
||
|
||
# datetime object
|
||
trigger.anchor = 'python'
|
||
trigger.anchor_expression = 'datetime.datetime(%s, %s, %s, %s, %s)' % (
|
||
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:5]
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
formdata1.store()
|
||
|
||
# datetime object
|
||
trigger.anchor = 'python'
|
||
trigger.anchor_expression = 'datetime.date(%s, %s, %s)' % (
|
||
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:3]
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
formdata1.store()
|
||
|
||
# string object
|
||
trigger.anchor = 'python'
|
||
trigger.anchor_expression = '"%04d-%02d-%02d"' % (
|
||
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:3]
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
formdata1.store()
|
||
|
||
# invalid variable
|
||
trigger.anchor = 'python'
|
||
trigger.anchor_expression = 'Ellipsis'
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
||
formdata1.store()
|
||
|
||
# invalid expression
|
||
trigger.anchor = 'python'
|
||
trigger.anchor_expression = 'XXX'
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
||
formdata1.store()
|
||
|
||
# django template
|
||
trigger.anchor = 'template'
|
||
trigger.anchor_template = '{{ form_receipt_date|date:"Y-m-d" }}'
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
formdata1.store()
|
||
|
||
# django template
|
||
trigger.anchor = 'template'
|
||
trigger.anchor_template = '{{ form_receipt_date|date:"Y-m-d" }}'
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
formdata1.store()
|
||
|
||
# django template (with local date format)
|
||
trigger.anchor = 'template'
|
||
trigger.anchor_template = '{{ form_receipt_date }}'
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
formdata1.store()
|
||
|
||
# django template (with local date/time format)
|
||
trigger.anchor = 'template'
|
||
trigger.anchor_template = '{{ form_receipt_datetime }}'
|
||
workflow.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
formdata1.store()
|
||
|
||
# django template (from form_option_)
|
||
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow)
|
||
workflow.variables_formdef.fields = [
|
||
DateField(id='4', label='Date', type='date', varname='date'),
|
||
]
|
||
trigger.anchor = 'template'
|
||
trigger.anchor_template = '{{ form_option_date }}'
|
||
workflow.store()
|
||
formdef.workflow_options = {
|
||
'date': time.strptime('2015-05-12', '%Y-%m-%d'),
|
||
}
|
||
formdef.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
formdata1.store()
|
||
|
||
|
||
def test_global_timeouts_latest_arrival(two_pubs):
|
||
pub = two_pubs
|
||
FormDef.wipe()
|
||
Workflow.wipe()
|
||
|
||
workflow = Workflow(name='global-timeouts')
|
||
workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
|
||
workflow.criticality_levels = [
|
||
WorkflowCriticalityLevel(name='green'),
|
||
WorkflowCriticalityLevel(name='yellow'),
|
||
WorkflowCriticalityLevel(name='red'),
|
||
]
|
||
action = workflow.add_global_action('Timeout Test')
|
||
item = action.append_item('modify_criticality')
|
||
trigger = action.append_trigger('timeout')
|
||
trigger.anchor = 'latest-arrival'
|
||
trigger.anchor_status_latest = 'wf-new'
|
||
trigger.timeout = '2'
|
||
workflow.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.workflow_id = workflow.id
|
||
formdef.store()
|
||
formdef.data_class().wipe()
|
||
|
||
formdata1 = formdef.data_class()()
|
||
formdata1.just_created()
|
||
formdata1.store()
|
||
|
||
formdata1.jump_status('new')
|
||
# enter in status 8 days ago
|
||
formdata1.evolution[-1].time = time.localtime(time.time()-8*86400)
|
||
# but get a new comment 1 day ago
|
||
formdata1.evolution.append(Evolution())
|
||
formdata1.evolution[-1].time = time.localtime(time.time()-1*86400)
|
||
formdata1.evolution[-1].comment = 'plop'
|
||
formdata1.store()
|
||
pub.apply_global_action_timeouts()
|
||
# no change
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
||
|
||
formdata1.evolution[-1].time = time.localtime(time.time()-5*86400)
|
||
formdata1.store()
|
||
pub.apply_global_action_timeouts()
|
||
# change
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
|
||
# check it applies even after the status has been left
|
||
formdata1 = formdef.data_class()()
|
||
formdata1.just_created()
|
||
formdata1.store()
|
||
formdata1.jump_status('new')
|
||
formdata1.evolution[-1].time = time.localtime(time.time()-5*86400)
|
||
formdata1.store()
|
||
formdata1.jump_status('accepted')
|
||
formdata1.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
||
|
||
# but not if an endpoint has been reached
|
||
formdata1 = formdef.data_class()()
|
||
formdata1.just_created()
|
||
formdata1.store()
|
||
formdata1.jump_status('new')
|
||
formdata1.evolution[-1].time = time.localtime(time.time()-5*86400)
|
||
formdata1.store()
|
||
formdata1.jump_status('accepted')
|
||
formdata1.jump_status('finished')
|
||
formdata1.store()
|
||
pub.apply_global_action_timeouts()
|
||
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
||
|
||
|
||
def test_profile(two_pubs):
|
||
User = two_pubs.user_class
|
||
user = User()
|
||
user.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
StringField(id='1', label='Test', type='string', varname='foo'),
|
||
]
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.user_id = user.id
|
||
formdata.data = {'1': 'bar@localhost'}
|
||
|
||
item = UpdateUserProfileStatusItem()
|
||
item.fields = [{'field_id': '__email', 'value': '=form_var_foo'}]
|
||
item.perform(formdata)
|
||
assert User.get(user.id).email == 'bar@localhost'
|
||
|
||
formdata.data = {'1': 'Plop'}
|
||
item.fields = [{'field_id': '__name', 'value': '=form_var_foo'}]
|
||
item.perform(formdata)
|
||
assert User.get(user.id).name == 'Plop'
|
||
|
||
item.fields = [{'field_id': '__name', 'value': 'dj{{form_var_foo}}'}]
|
||
item.perform(formdata)
|
||
assert User.get(user.id).name == 'djPlop'
|
||
item.fields = [{'field_id': '__name', 'value': 'ezt[form_var_foo]'}]
|
||
item.perform(formdata)
|
||
assert User.get(user.id).name == 'eztPlop'
|
||
|
||
from wcs.admin.settings import UserFieldsFormDef
|
||
formdef = UserFieldsFormDef(two_pubs)
|
||
formdef.fields = [
|
||
StringField(id='3', label='test', type='string', varname='plop'),
|
||
DateField(id='4', label='Date', type='date', varname='bar'),
|
||
]
|
||
formdef.store()
|
||
|
||
item.fields = [{'field_id': 'plop', 'value': '=form_var_foo'}]
|
||
item.perform(formdata)
|
||
assert User.get(user.id).form_data.get('3') == 'Plop'
|
||
assert not User.get(user.id).form_data.get('4')
|
||
|
||
# check transmission to IdP
|
||
get_publisher().cfg['sp'] = {'idp-manage-user-attributes': True}
|
||
get_publisher().cfg['idp'] = {'xxx': {'metadata_url': 'http://idp.example.net/idp/saml2/metadata'}}
|
||
get_publisher().write_cfg()
|
||
|
||
user = User.get(user.id)
|
||
user.name_identifiers = ['xyz']
|
||
user.store()
|
||
|
||
for date_value in (
|
||
'20/03/2018',
|
||
'=utils.make_date("20/03/2018")',
|
||
'=utils.make_date("20/03/2018").timetuple()'):
|
||
|
||
# check local value
|
||
item.fields = [{'field_id': 'bar', 'value': date_value}]
|
||
item.perform(formdata)
|
||
assert User.get(user.id).form_data.get('3') == 'Plop'
|
||
assert User.get(user.id).form_data.get('4').tm_year == 2018
|
||
|
||
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
|
||
http_patch_request.return_value = (None, 200, '', None)
|
||
get_response().process_after_jobs()
|
||
assert http_patch_request.call_count == 1
|
||
assert http_patch_request.call_args[0][1] == '{"bar": "2018-03-20"}'
|
||
|
||
for date_value in ('baddate', '', {}, [], None):
|
||
# reset date to a known value
|
||
user.form_data['4'] = datetime.datetime.now().timetuple()
|
||
user.store()
|
||
year = User.get(user.id).form_data.get('4').tm_year
|
||
# perform action
|
||
item.fields = [{'field_id': 'bar', 'value': date_value}]
|
||
item.perform(formdata)
|
||
if date_value not in (None, ''): # bad value : do nothing
|
||
assert User.get(user.id).form_data.get('4').tm_year == year
|
||
else: # empty value : empty field
|
||
assert User.get(user.id).form_data.get('4') == None
|
||
|
||
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
|
||
http_patch_request.return_value = (None, 200, '', None)
|
||
get_response().process_after_jobs()
|
||
assert http_patch_request.call_count == 1
|
||
if date_value not in (None, ''): # bad value : do nothing
|
||
assert http_patch_request.call_args[0][1] == '{}'
|
||
else: # empty value : null field
|
||
assert http_patch_request.call_args[0][1] == '{"bar": null}'
|
||
|
||
# out of http request/response cycle (cron, after_job)
|
||
two_pubs._set_request(None)
|
||
item.fields = [{'field_id': 'bar', 'value': '01/01/2020'}]
|
||
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
|
||
http_patch_request.return_value = (None, 200, '', None)
|
||
item.perform(formdata)
|
||
assert User.get(user.id).form_data.get('4').tm_year == 2020
|
||
assert http_patch_request.call_count == 1
|
||
assert http_patch_request.call_args[0][1] == '{"bar": "2020-01-01"}'
|
||
|
||
|
||
def test_set_backoffice_field(http_requests, two_pubs):
|
||
Workflow.wipe()
|
||
FormDef.wipe()
|
||
if two_pubs.is_using_postgresql():
|
||
two_pubs.loggederror_class.wipe()
|
||
wf = Workflow(name='xxx')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
wf.backoffice_fields_formdef.fields = [
|
||
StringField(id='bo1', label='1st backoffice field',
|
||
type='string', varname='backoffice_blah'),
|
||
]
|
||
st1 = wf.add_status('Status1')
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
StringField(id='00', label='String', type='string', varname='string'),
|
||
StringField(id='01', label='Other string', type='string', varname='other'),
|
||
]
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'00': 'HELLO'}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
two_pubs.substitutions.feed(formdata)
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
|
||
assert item.render_as_line() == 'Backoffice Data'
|
||
assert item.get_jump_label('plop') == 'Backoffice Data'
|
||
item.label = 'label'
|
||
assert item.render_as_line() == 'Backoffice Data (label)'
|
||
assert item.get_jump_label('plop') == 'Backoffice Data "label"'
|
||
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data.get('bo1') == None
|
||
|
||
item.fields = [{'field_id': 'bo1', 'value': '=form_var_string'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'HELLO'
|
||
|
||
item.fields = [{'field_id': 'bo1', 'value': '{{ form_var_string }} WORLD'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'HELLO WORLD'
|
||
|
||
item.fields = [{'field_id': 'bo1', 'value': '[form_var_string] GOODBYE'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'HELLO GOODBYE'
|
||
|
||
item.fields = [{'field_id': 'bo1', 'value': '{{ form.var.string }} LAZY'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'HELLO LAZY'
|
||
|
||
item.fields = [{'field_id': 'bo1', 'value': '=form.var.string'}] # lazy python
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'HELLO'
|
||
|
||
item.fields = [{'field_id': 'bo1', 'value': '=vars().get("form_var_string") + " PLOP"'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'HELLO PLOP'
|
||
|
||
item.fields = [{'field_id': 'bo1',
|
||
'value': '=vars().get("form_var_other") or vars().get("form_var_string")'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'HELLO'
|
||
|
||
item.fields = [{'field_id': 'bo1', 'value': '=vars().get("form_var_bouh", "X") + " PLOP"'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'X PLOP'
|
||
|
||
item.fields = [{'field_id': 'bo1', 'value': None}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] is None
|
||
|
||
item.fields = [{'field_id': 'bo1', 'value': ''}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] is None
|
||
|
||
# check a value computed as the empty string is stored as an empty string, not None
|
||
item.fields = [{'field_id': 'bo1', 'value': '=""'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == ''
|
||
item.fields = [{'field_id': 'bo1', 'value': '{{ does_not_exist }}'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == ''
|
||
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 0
|
||
|
||
item.fields = [{'field_id': 'bo1', 'value': '= ~ invalid python ~'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
logged_error = two_pubs.loggederror_class.select()[0]
|
||
assert logged_error.summary == 'Failed to compute Python expression'
|
||
assert logged_error.formdata_id == str(formdata.id)
|
||
assert logged_error.expression == ' ~ invalid python ~'
|
||
assert logged_error.expression_type == 'python'
|
||
assert logged_error.exception_class == 'SyntaxError'
|
||
assert logged_error.exception_message == 'invalid syntax (<string>, line 1)'
|
||
|
||
if two_pubs.is_using_postgresql():
|
||
two_pubs.loggederror_class.wipe()
|
||
item.fields = [{'field_id': 'bo1', 'value': '{% if bad django %}'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
logged_error = two_pubs.loggederror_class.select()[0]
|
||
assert logged_error.summary == 'Failed to compute template'
|
||
assert logged_error.formdata_id == str(formdata.id)
|
||
assert logged_error.expression == '{% if bad django %}'
|
||
assert logged_error.expression_type == 'template'
|
||
assert logged_error.exception_class == 'TemplateError'
|
||
assert logged_error.exception_message.startswith('syntax error in Django template')
|
||
|
||
|
||
def test_set_backoffice_field_file(http_requests, two_pubs):
|
||
Workflow.wipe()
|
||
FormDef.wipe()
|
||
wf = Workflow(name='xxx')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
wf.backoffice_fields_formdef.fields = [
|
||
FileField(id='bo1', label='1st backoffice field',
|
||
type='file', varname='backoffice_file'),
|
||
StringField(id='bo2', label='2nd backoffice field', type='string'),
|
||
]
|
||
st1 = wf.add_status('Status1')
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
FileField(id='00', label='File', type='file', varname='file'),
|
||
]
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': '=locals().get("form_var_file_raw")'}]
|
||
|
||
# the file does not exist
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == None
|
||
|
||
# store a PiclableUpload
|
||
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
||
upload.receive([open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()])
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'00': upload}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'].base_filename == 'test.jpeg'
|
||
assert formdata.data['bo1'].content_type == 'image/jpeg'
|
||
assert formdata.data['bo1'].get_content() == open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
|
||
assert formdata.data['bo1'].get_base64_content() == base64.encodebytes(
|
||
open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read())
|
||
|
||
# same test with PicklableUpload wcs.qommon.form
|
||
from wcs.qommon.form import PicklableUpload as PicklableUpload2
|
||
upload2 = PicklableUpload2('test2.odt', 'application/vnd.oasis.opendocument.text')
|
||
upload2.receive([open(os.path.join(os.path.dirname(__file__), 'template.odt'), 'rb').read()])
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'00': upload2}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'].base_filename == 'test2.odt'
|
||
assert formdata.data['bo1'].content_type == 'application/vnd.oasis.opendocument.text'
|
||
assert formdata.data['bo1'].get_content() == open(os.path.join(os.path.dirname(__file__), 'template.odt'), 'rb').read()
|
||
|
||
# check with template string
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'00': upload}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.fields = [{'field_id': 'bo1', 'value': '{{form_var_file_raw}}'}]
|
||
item.perform(formdata)
|
||
|
||
assert formdata.data['bo1'].base_filename == 'test.jpeg'
|
||
assert formdata.data['bo1'].content_type == 'image/jpeg'
|
||
assert formdata.data['bo1'].get_content() == open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
|
||
|
||
# check with template string, without _raw
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'00': upload}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.fields = [{'field_id': 'bo1', 'value': '{{form_var_file}}'}]
|
||
item.perform(formdata)
|
||
|
||
assert formdata.data['bo1'].base_filename == 'test.jpeg'
|
||
assert formdata.data['bo1'].content_type == 'image/jpeg'
|
||
assert formdata.data['bo1'].get_content() == open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read()
|
||
|
||
# check with a template string, into a string field
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.fields = [{'field_id': 'bo2', 'value': '{{form_var_file}}'}]
|
||
item.perform(formdata)
|
||
|
||
assert formdata.data['bo2'] == 'test.jpeg'
|
||
|
||
# check with template string and missing file
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'00': None}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
assert formdata.data.get('bo1') is None
|
||
|
||
# check stripping metadata
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.fields = [{'field_id': 'bo1',
|
||
'value': '=utils.attachment(form_var_file_raw,' +
|
||
' filename="new_"+form_var_file,' +
|
||
' content_type="my content type",' +
|
||
' strip_metadata=True)'}]
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'00': upload}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'].base_filename == 'new_test.jpeg'
|
||
assert formdata.data['bo1'].content_type == 'my content type'
|
||
assert formdata.data['bo1'].qfilename != formdata.data['00'].qfilename
|
||
assert formdata.data['00'].get_content().find(b'<exif:XResolution>')
|
||
assert not Image or formdata.data['bo1'].get_content().find(b'<exif:XResolution>') == -1
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'00': upload2}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'].base_filename == 'new_test2.odt'
|
||
assert formdata.data['bo1'].content_type == 'my content type'
|
||
assert formdata.data['bo1'].qfilename == formdata.data['00'].qfilename
|
||
|
||
# check storing response as attachment
|
||
two_pubs.substitutions.feed(formdata)
|
||
item = WebserviceCallStatusItem()
|
||
item.url = 'http://remote.example.net/xml'
|
||
item.varname = 'xxx'
|
||
item.response_type = 'attachment'
|
||
item.record_errors = True
|
||
item.perform(formdata)
|
||
attachment = formdata.evolution[-1].parts[-1]
|
||
assert isinstance(attachment, AttachmentEvolutionPart)
|
||
assert attachment.base_filename == 'xxx.xml'
|
||
assert attachment.content_type == 'text/xml'
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
two_pubs.substitutions.feed(formdata)
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': '=attachments.xxx'}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'].base_filename == 'xxx.xml'
|
||
|
||
# check storing a file from an assembled dictionary
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1',
|
||
'value': '={"content": "hello world", "filename": "hello.txt"}'}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'].base_filename == 'hello.txt'
|
||
assert formdata.data['bo1'].get_content() == b'hello world'
|
||
|
||
# check resetting a value
|
||
for value in ('', None):
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': value}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] is None
|
||
|
||
# set from base64 content
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1',
|
||
'value': '={"b64_content": "SEVMTE8gV09STEQ=", "filename": "hello.txt"}'}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'].base_filename == 'hello.txt'
|
||
assert formdata.data['bo1'].get_content() == b'HELLO WORLD'
|
||
|
||
hello_world = formdata.data['bo1']
|
||
# check wrong value
|
||
for value in ('="HELLO"', 'BAD'):
|
||
formdata.data['bo1'] = hello_world
|
||
formdata.store()
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': value}]
|
||
|
||
if two_pubs.is_using_postgresql():
|
||
two_pubs.loggederror_class.wipe()
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'].base_filename == 'hello.txt'
|
||
assert formdata.data['bo1'].get_content() == b'HELLO WORLD'
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
logged_error = two_pubs.loggederror_class.select()[0]
|
||
assert logged_error.summary.startswith('Failed to convert')
|
||
assert logged_error.formdata_id == str(formdata.id)
|
||
assert logged_error.exception_class == 'ValueError'
|
||
|
||
# check wrong field
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo3', 'value': '=form_var_file_raw'}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data.get('bo3') is None
|
||
|
||
|
||
def test_set_backoffice_field_item(two_pubs):
|
||
Workflow.wipe()
|
||
FormDef.wipe()
|
||
wf = Workflow(name='xxx')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
st1 = wf.add_status('Status1')
|
||
wf.backoffice_fields_formdef.fields = [
|
||
ItemField(id='bo1', label='1st backoffice field',
|
||
type='item', varname='backoffice_item',
|
||
items=['a', 'b', 'c']),
|
||
]
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': 'a'}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'a'
|
||
assert formdata.data['bo1_display'] == 'a'
|
||
|
||
datasource = {'type': 'formula',
|
||
'value': repr([('a', 'aa'), ('b', 'bb'), ('c', 'cc')])}
|
||
|
||
wf.backoffice_fields_formdef.fields = [
|
||
ItemField(id='bo1', label='1st backoffice field',
|
||
type='item', varname='backoffice_item',
|
||
data_source=datasource),
|
||
]
|
||
wf.store()
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': 'a'}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'a'
|
||
assert formdata.data['bo1_display'] == 'aa'
|
||
|
||
datasource = {'type': 'formula',
|
||
'value': repr([{'id': 'a', 'text': 'aa', 'more': 'aaa'},
|
||
{'id': 'b', 'text': 'bb', 'more': 'bbb'}])}
|
||
|
||
wf.backoffice_fields_formdef.fields = [
|
||
ItemField(id='bo1', label='1st backoffice field',
|
||
type='item', varname='backoffice_item',
|
||
data_source=datasource),
|
||
]
|
||
wf.store()
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': 'a'}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'a'
|
||
assert formdata.data['bo1_display'] == 'aa'
|
||
assert formdata.data['bo1_structured'] == {'id': 'a', 'more': 'aaa', 'text': 'aa'}
|
||
|
||
# check when assigning using the display value
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': 'aa'}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'a'
|
||
assert formdata.data['bo1_display'] == 'aa'
|
||
assert formdata.data['bo1_structured'] == {'id': 'a', 'more': 'aaa', 'text': 'aa'}
|
||
|
||
# check with unknown value
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': 'foobar'}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'foobar'
|
||
assert formdata.data.get('bo1_display') is None
|
||
assert formdata.data.get('bo1_structured') is None
|
||
|
||
|
||
def test_set_backoffice_field_card_item(two_pubs):
|
||
CardDef.wipe()
|
||
Workflow.wipe()
|
||
FormDef.wipe()
|
||
|
||
carddef = CardDef()
|
||
carddef.name = 'items'
|
||
carddef.digest_template = '{{form_var_name}}'
|
||
carddef.fields = [
|
||
StringField(id='0', label='string', varname='name'),
|
||
StringField(id='1', label='string', varname='attr'),
|
||
]
|
||
carddef.store()
|
||
carddef.data_class().wipe()
|
||
for i, value in enumerate(['foo', 'bar', 'baz']):
|
||
carddata = carddef.data_class()()
|
||
carddata.data = {
|
||
'0': value,
|
||
'1': 'attr%s' % i,
|
||
}
|
||
carddata.just_created()
|
||
carddata.store()
|
||
latest_carddata_id = carddata.id
|
||
ds = {'type': 'carddef:%s' % carddef.url_name}
|
||
|
||
wf = Workflow(name='xxx')
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.workflow_id = wf.id
|
||
formdef.fields = [ItemField(id='0', label='string', type='item',
|
||
data_source=ds, display_disabled_items=True)]
|
||
formdef.store()
|
||
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
st1 = wf.add_status('Status1')
|
||
wf.backoffice_fields_formdef.fields = [
|
||
ItemField(
|
||
id='bo1',
|
||
label='1st backoffice field',
|
||
type='item',
|
||
varname='backoffice_item',
|
||
data_source=ds),
|
||
]
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': str(latest_carddata_id)}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == str(latest_carddata_id)
|
||
assert formdata.data['bo1_display'] == 'baz'
|
||
assert formdata.data['bo1_structured']['attr'] == 'attr2'
|
||
|
||
# reset, and get by text value
|
||
formdata.data = {}
|
||
formdata.store()
|
||
item.fields = [{'field_id': 'bo1', 'value': 'bar'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] != str(latest_carddata_id)
|
||
assert formdata.data['bo1_display'] == 'bar'
|
||
assert formdata.data['bo1_structured']['attr'] == 'attr1'
|
||
|
||
# reset, and get unknown value
|
||
formdata.data = {}
|
||
formdata.store()
|
||
item.fields = [{'field_id': 'bo1', 'value': 'xxx'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == 'xxx' # raw value is still kept
|
||
assert formdata.data.get('bo1_display') is None
|
||
assert formdata.data.get('bo1_structured') is None
|
||
|
||
# reset, and get empty value
|
||
formdata.data = {}
|
||
formdata.store()
|
||
item.fields = [{'field_id': 'bo1', 'value': ''}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] is None
|
||
assert formdata.data.get('bo1_display') is None
|
||
assert formdata.data.get('bo1_structured') is None
|
||
|
||
|
||
def test_set_backoffice_field_items(two_pubs):
|
||
Workflow.wipe()
|
||
FormDef.wipe()
|
||
wf = Workflow(name='xxx')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
st1 = wf.add_status('Status1')
|
||
wf.backoffice_fields_formdef.fields = [
|
||
ItemsField(id='bo1', label='1st backoffice field',
|
||
type='items', varname='backoffice_item',
|
||
items=['a', 'b', 'c']),
|
||
]
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': "=['a', 'b']"}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == ['a', 'b']
|
||
assert formdata.data['bo1_display'] == 'a, b'
|
||
|
||
datasource = {'type': 'formula',
|
||
'value': repr([('a', 'aa'), ('b', 'bb'), ('c', 'cc')])}
|
||
|
||
wf.backoffice_fields_formdef.fields = [
|
||
ItemsField(id='bo1', label='1st backoffice field',
|
||
type='items', varname='backoffice_item',
|
||
data_source=datasource),
|
||
]
|
||
wf.store()
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': "=['a', 'b']"}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == ['a', 'b']
|
||
assert formdata.data['bo1_display'] == 'aa, bb'
|
||
|
||
datasource = {'type': 'formula',
|
||
'value': repr([{'id': 'a', 'text': 'aa', 'more': 'aaa'},
|
||
{'id': 'b', 'text': 'bb', 'more': 'bbb'},
|
||
{'id': 'c', 'text': 'cc', 'more': 'ccc'}])}
|
||
|
||
wf.backoffice_fields_formdef.fields = [
|
||
ItemsField(id='bo1', label='1st backoffice field',
|
||
type='items', varname='backoffice_item',
|
||
data_source=datasource),
|
||
]
|
||
wf.store()
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': "=['a', 'c']"}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == ['a', 'c']
|
||
assert formdata.data['bo1_display'] == 'aa, cc'
|
||
assert len(formdata.data['bo1_structured']) == 2
|
||
assert {'id': 'a', 'more': 'aaa', 'text': 'aa'} in formdata.data['bo1_structured']
|
||
assert {'id': 'c', 'more': 'ccc', 'text': 'cc'} in formdata.data['bo1_structured']
|
||
|
||
# from formdata field
|
||
formdef.fields = [
|
||
ItemsField(id='1', label='field', type='items', varname='items', data_source=datasource),
|
||
]
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'1': ['a', 'c']}
|
||
formdata.data['1_display'] = formdef.fields[0].store_display_value(formdata.data, '1')
|
||
formdata.data['1_structured'] = formdef.fields[0].store_structured_value(formdata.data, '1')
|
||
formdata.just_created()
|
||
formdata.store()
|
||
two_pubs.substitutions.feed(formdata)
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': "=form_var_items_raw"}]
|
||
item.perform(formdata)
|
||
|
||
assert formdata.data['bo1'] == ['a', 'c']
|
||
assert formdata.data['bo1_display'] == 'aa, cc'
|
||
assert len(formdata.data['bo1_structured']) == 2
|
||
assert {'id': 'a', 'more': 'aaa', 'text': 'aa'} in formdata.data['bo1_structured']
|
||
assert {'id': 'c', 'more': 'ccc', 'text': 'cc'} in formdata.data['bo1_structured']
|
||
|
||
# with a template
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'1': ['a', 'c']}
|
||
formdata.data['1_display'] = formdef.fields[0].store_display_value(formdata.data, '1')
|
||
formdata.data['1_structured'] = formdef.fields[0].store_structured_value(formdata.data, '1')
|
||
formdata.just_created()
|
||
formdata.store()
|
||
two_pubs.substitutions.reset()
|
||
two_pubs.substitutions.feed(formdata)
|
||
|
||
item.fields = [{'field_id': 'bo1', 'value': "{{form_var_items_raw}}"}]
|
||
item.perform(formdata)
|
||
|
||
|
||
def test_set_backoffice_field_date(two_pubs):
|
||
Workflow.wipe()
|
||
FormDef.wipe()
|
||
if two_pubs.is_using_postgresql():
|
||
two_pubs.loggederror_class.wipe()
|
||
wf = Workflow(name='xxx')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
st1 = wf.add_status('Status1')
|
||
wf.backoffice_fields_formdef.fields = [
|
||
DateField(id='bo1', label='1st backoffice field',
|
||
type='date', varname='backoffice_date'),
|
||
]
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': "=utils.today()"}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date.today()
|
||
|
||
formdata.data['bo1'] = None
|
||
formdata.store()
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': '{% now "j/n/Y" %}'}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date.today()
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': "23/3/2017"}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
|
||
|
||
# invalid values => do nothing
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 0
|
||
for value in ('plop', '={}', '=[]'):
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': value}]
|
||
|
||
if two_pubs.is_using_postgresql():
|
||
two_pubs.loggederror_class.wipe()
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
assert two_pubs.loggederror_class.select()[0].summary.startswith('Failed to convert')
|
||
|
||
# None : empty date
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': None}]
|
||
item.perform(formdata)
|
||
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] is None
|
||
|
||
|
||
def test_set_backoffice_field_boolean(two_pubs):
|
||
Workflow.wipe()
|
||
FormDef.wipe()
|
||
wf = Workflow(name='xxx')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
st1 = wf.add_status('Status1')
|
||
wf.backoffice_fields_formdef.fields = [
|
||
BoolField(id='bo1', label='1st backoffice field',
|
||
type='bool', varname='backoffice_bool'),
|
||
]
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [BoolField(id='1', label='field', type='bool', varname='foo')]
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'1': True}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
get_publisher().substitutions.feed(formdata)
|
||
|
||
for value in ('=True', '=form_var_foo_raw', '{{ form_var_foo_raw }}', 'True', 'Yes', 'true', 'yes'):
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': value}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] is True
|
||
formdata.data['bo1'] = None
|
||
formdata.store()
|
||
|
||
for value in ('=False', '=not(form_var_foo_raw)', 'False', 'plop', ''):
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': value}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] is False
|
||
formdata.data['bo1'] = None
|
||
formdata.store()
|
||
|
||
|
||
def test_set_backoffice_field_block(two_pubs, blocks_feature):
|
||
BlockDef.wipe()
|
||
Workflow.wipe()
|
||
FormDef.wipe()
|
||
|
||
block = BlockDef()
|
||
block.name = 'foobar'
|
||
block.digest_template = 'X{{foobar_var_foo}}Y'
|
||
block.fields = [
|
||
StringField(id='123', required=True, label='Test',
|
||
type='string', varname='foo'),
|
||
StringField(id='234', required=True, label='Test2',
|
||
type='string', varname='bar'),
|
||
]
|
||
block.store()
|
||
|
||
wf = Workflow(name='xxx')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
st1 = wf.add_status('Status1')
|
||
wf.backoffice_fields_formdef.fields = [
|
||
BlockField(id='bo1', label='1st backoffice field', type='block:foobar'),
|
||
StringField(id='bo2', label='2nd backoffice field', type='string'),
|
||
]
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
BlockField(id='1', label='test', type='block:foobar', max_items=3, varname='foo'),
|
||
]
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
# value from test_block_digest in tests/test_form_pages.py
|
||
formdata.data = {
|
||
'1': {
|
||
'data': [{'123': 'foo', '234': 'bar'}, {'123': 'foo2', '234': 'bar2'}],
|
||
'schema': {'123': 'string', '234': 'string'}
|
||
},
|
||
'1_display': 'XfooY, Xfoo2Y',
|
||
}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
get_publisher().substitutions.feed(formdata)
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [{'field_id': 'bo1', 'value': '{{form_var_foo_raw}}'}]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == formdata.data['1']
|
||
assert formdata.data['bo1_display'] == formdata.data['1_display']
|
||
|
||
# without _raw suffix
|
||
formdata = formdef.data_class()()
|
||
# value from test_block_digest in tests/test_form_pages.py
|
||
formdata.data = {
|
||
'1': {
|
||
'data': [{'123': 'foo', '234': 'bar'}, {'123': 'foo2', '234': 'bar2'}],
|
||
'schema': {'123': 'string', '234': 'string'}
|
||
},
|
||
'1_display': 'XfooY, Xfoo2Y',
|
||
}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
get_publisher().substitutions.reset()
|
||
get_publisher().substitutions.feed(formdata)
|
||
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
item.fields = [
|
||
{'field_id': 'bo1', 'value': '{{form_var_foo}}'},
|
||
{'field_id': 'bo2', 'value': '{{form_var_foo}}'},
|
||
]
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data['bo1'] == formdata.data['1']
|
||
assert formdata.data['bo1_display'] == formdata.data['1_display']
|
||
assert formdata.data['bo2'] == formdata.data['1_display']
|
||
|
||
|
||
def test_set_backoffice_field_immediate_use(http_requests, two_pubs):
|
||
Workflow.wipe()
|
||
FormDef.wipe()
|
||
wf = Workflow(name='xxx')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
wf.backoffice_fields_formdef.fields = [
|
||
StringField(id='bo1', label='1st backoffice field',
|
||
type='string', varname='backoffice_blah'),
|
||
StringField(id='bo2', label='2nd backoffice field',
|
||
type='string', varname='backoffice_barr'),
|
||
]
|
||
st1 = wf.add_status('Status1')
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
StringField(id='00', label='String', type='string', varname='string'),
|
||
]
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'00': 'HELLO'}
|
||
formdata.just_created()
|
||
formdata.store()
|
||
item = SetBackofficeFieldsWorkflowStatusItem()
|
||
item.parent = st1
|
||
|
||
item.fields = [
|
||
{'field_id': 'bo1', 'value': '{{form_var_string}}'},
|
||
]
|
||
two_pubs.substitutions.reset()
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data.get('bo1') == 'HELLO'
|
||
|
||
item.fields = [
|
||
{'field_id': 'bo1', 'value': 'WORLD'},
|
||
]
|
||
two_pubs.substitutions.reset()
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data.get('bo1') == 'WORLD'
|
||
|
||
item.fields = [
|
||
{'field_id': 'bo1', 'value': 'X{{form_var_string}}X'},
|
||
{'field_id': 'bo2', 'value': "Y{{form_var_backoffice_blah}}Y"},
|
||
]
|
||
two_pubs.substitutions.reset()
|
||
two_pubs.substitutions.feed(formdata)
|
||
item.perform(formdata)
|
||
formdata = formdef.data_class().get(formdata.id)
|
||
assert formdata.data.get('bo1') == 'XHELLOX'
|
||
assert formdata.data.get('bo2') == 'YXHELLOXY'
|
||
|
||
|
||
def test_redirect_to_url(pub):
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [
|
||
StringField(id='1', label='Test', type='string', varname='foo'),
|
||
]
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'1': 'bar'}
|
||
|
||
item = RedirectToUrlWorkflowStatusItem()
|
||
assert item.render_as_line() == 'Web Redirection (not configured)'
|
||
item.url = 'https://www.example.net/?foo=[form_var_foo]'
|
||
assert item.render_as_line() == 'Web Redirection (to https://www.example.net/?foo=[form_var_foo])'
|
||
pub.substitutions.feed(formdata)
|
||
assert item.perform(formdata) == 'https://www.example.net/?foo=bar'
|
||
|
||
item.url = 'https://www.example.net/?django={{ form_var_foo }}'
|
||
assert item.render_as_line() == 'Web Redirection (to https://www.example.net/?django={{ form_var_foo }})'
|
||
pub.substitutions.feed(formdata)
|
||
assert item.perform(formdata) == 'https://www.example.net/?django=bar'
|
||
|
||
item.url = '[if-any nada]https://www.example.net/[end]'
|
||
pub.substitutions.feed(formdata)
|
||
assert item.perform(formdata) == None
|
||
|
||
item.url = '{% if nada %}https://www.example.net/{% endif %}'
|
||
pub.substitutions.feed(formdata)
|
||
assert item.perform(formdata) == None
|
||
|
||
|
||
def test_workflow_jump_condition_migration(pub):
|
||
workflow = Workflow(name='jump condition migration')
|
||
st1 = workflow.add_status('Status1', 'st1')
|
||
|
||
jump = JumpWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
st1.items.append(jump)
|
||
jump.parent = st1
|
||
workflow.store()
|
||
|
||
reloaded_workflow = Workflow.get(workflow.id)
|
||
assert reloaded_workflow.possible_status[0].items[0].condition is None
|
||
|
||
jump.condition = 'foobar'
|
||
workflow.store()
|
||
reloaded_workflow = Workflow.get(workflow.id)
|
||
assert reloaded_workflow.possible_status[0].items[0].condition == {
|
||
'type': 'python', 'value': 'foobar'}
|
||
|
||
|
||
def test_workflow_action_condition(two_pubs):
|
||
two_pubs._set_request(None) # to avoid after jobs
|
||
workflow = Workflow(name='jump condition migration')
|
||
st1 = workflow.add_status('Status1', 'st1')
|
||
workflow.store()
|
||
|
||
role = Role(name='bar1')
|
||
role.store()
|
||
|
||
user = two_pubs.user_class()
|
||
user.roles = [role.id]
|
||
user.store()
|
||
|
||
choice = ChoiceWorkflowStatusItem()
|
||
choice.by = [role.id]
|
||
choice.id = '_x'
|
||
st1.items.append(choice)
|
||
choice.parent = st1
|
||
workflow.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
|
||
formdef.workflow_id = workflow.id
|
||
formdef.store()
|
||
|
||
formdata1 = formdef.data_class()()
|
||
formdata1.data = {'1': 'foo'}
|
||
formdata1.just_created()
|
||
formdata1.store()
|
||
|
||
formdata2 = formdef.data_class()()
|
||
formdata2.data = {'2': 'bar'}
|
||
formdata2.just_created()
|
||
formdata2.store()
|
||
|
||
assert formdata1.get_actions_roles() == set([role.id])
|
||
assert formdata2.get_actions_roles() == set([role.id])
|
||
|
||
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 2
|
||
|
||
choice.condition = {'type': 'python', 'value': 'form_var_foo == "foo"'}
|
||
workflow.store()
|
||
|
||
with two_pubs.substitutions.temporary_feed(formdata1):
|
||
assert FormDef.get(formdef.id).data_class().get(formdata1.id).get_actions_roles() == set([role.id])
|
||
with two_pubs.substitutions.temporary_feed(formdata2):
|
||
assert FormDef.get(formdef.id).data_class().get(formdata2.id).get_actions_roles() == set()
|
||
|
||
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 1
|
||
|
||
# check with a formdef condition
|
||
choice.condition = {'type': 'python', 'value': 'form_name == "test"'}
|
||
workflow.store()
|
||
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 0
|
||
|
||
choice.condition = {'type': 'python', 'value': 'form_name == "baz"'}
|
||
workflow.store()
|
||
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 2
|
||
|
||
# bad condition
|
||
if two_pubs.is_using_postgresql():
|
||
two_pubs.loggederror_class.wipe()
|
||
choice.condition = {'type': 'python', 'value': 'foobar == barfoo'}
|
||
workflow.store()
|
||
assert len(FormDef.get(formdef.id).data_class().get_actionable_ids([role.id])) == 0
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
logged_error = two_pubs.loggederror_class.select()[0]
|
||
assert logged_error.occurences_count > 1 # should be 2... == 12 with pickle, 4 with sql
|
||
assert logged_error.summary == 'Failed to evaluate condition'
|
||
assert logged_error.exception_class == 'NameError'
|
||
assert logged_error.exception_message == "name 'foobar' is not defined"
|
||
assert logged_error.expression == 'foobar == barfoo'
|
||
assert logged_error.expression_type == 'python'
|
||
|
||
|
||
def test_notifications(pub, http_requests):
|
||
formdef = FormDef()
|
||
formdef.name = 'baz'
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.just_created()
|
||
formdata.store()
|
||
|
||
assert not SendNotificationWorkflowStatusItem.is_available()
|
||
|
||
if not pub.site_options.has_section('variables'):
|
||
pub.site_options.add_section('variables')
|
||
pub.site_options.set('variables', 'portal_url', 'https://portal/')
|
||
assert SendNotificationWorkflowStatusItem.is_available()
|
||
|
||
item = SendNotificationWorkflowStatusItem()
|
||
assert item.to == ['_submitter']
|
||
item.title = 'xxx'
|
||
item.body = 'XXX'
|
||
|
||
# no user
|
||
http_requests.empty()
|
||
item.perform(formdata)
|
||
assert http_requests.count() == 0
|
||
|
||
# user
|
||
http_requests.empty()
|
||
user = pub.user_class()
|
||
user.name_identifiers = ['xxx']
|
||
user.store()
|
||
formdata.user_id = user.id
|
||
formdata.store()
|
||
|
||
item.perform(formdata)
|
||
assert http_requests.count() == 1
|
||
assert http_requests.get_last('url') == 'https://portal/api/notification/add/?NameID=xxx'
|
||
assert json.loads(http_requests.get_last('body')) == {
|
||
'body': 'XXX',
|
||
'url': formdata.get_url(),
|
||
'id': 'formdata:%s' % formdata.get_display_id(),
|
||
'origin': '',
|
||
'summary': 'xxx'
|
||
}
|
||
|
||
# roles (not exposed in current UI)
|
||
http_requests.empty()
|
||
|
||
role = Role(name='blah')
|
||
role.store()
|
||
|
||
user1 = pub.user_class()
|
||
user1.roles = [role.id]
|
||
user1.name_identifiers = ['xxy1']
|
||
user1.store()
|
||
user2 = pub.user_class()
|
||
user2.roles = [role.id]
|
||
user2.name_identifiers = ['xxy2']
|
||
user2.store()
|
||
|
||
formdef.workflow_roles = {'_receiver': role.id}
|
||
|
||
item.to = ['_receiver']
|
||
item.perform(formdata)
|
||
assert http_requests.count() == 2
|
||
assert set(x['url'] for x in http_requests.requests) == set([
|
||
'https://portal/api/notification/add/?NameID=xxy1',
|
||
'https://portal/api/notification/add/?NameID=xxy2'])
|
||
|
||
# test inactive users are ignored
|
||
user2.is_active = False
|
||
user2.store()
|
||
http_requests.empty()
|
||
item.perform(formdata)
|
||
assert http_requests.count() == 1
|
||
assert set(x['url'] for x in http_requests.requests) == set(['https://portal/api/notification/add/?NameID=xxy1'])
|
||
|
||
# check notifications are sent to interco portal if it exists
|
||
pub.site_options.set('variables', '_interco_portal_url', 'https://interco-portal/')
|
||
http_requests.empty()
|
||
item.perform(formdata)
|
||
assert http_requests.count() == 1
|
||
assert set(x['url'] for x in http_requests.requests) == set([
|
||
'https://interco-portal/api/notification/add/?NameID=xxy1'])
|
||
|
||
|
||
def test_workflow_field_migration(pub):
|
||
Workflow.wipe()
|
||
wf = Workflow(name='wf with backoffice field')
|
||
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
||
wf.backoffice_fields_formdef.fields = [
|
||
StringField(id='bo1', label='bo field 1', type='string', in_listing=True),
|
||
]
|
||
wf.add_status('Status1')
|
||
wf.store()
|
||
|
||
wf = Workflow.get(wf.id)
|
||
assert wf.backoffice_fields_formdef.fields[0].display_locations == ['validation', 'summary', 'listings']
|
||
|
||
|
||
def test_aggregation_email(pub, emails):
|
||
Workflow.wipe()
|
||
Role.wipe()
|
||
AggregationEmail.wipe()
|
||
|
||
role = Role(name='foobar')
|
||
role.emails = ['foobar@localhost']
|
||
role.emails_to_members = False
|
||
role.store()
|
||
|
||
workflow = Workflow(name='aggregation-email')
|
||
workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
|
||
aggregation = AggregationEmailWorkflowStatusItem()
|
||
aggregation.parent = workflow.possible_status[1]
|
||
assert aggregation.get_line_details() == 'not completed'
|
||
aggregation.to = [role.id]
|
||
assert aggregation.get_line_details() == 'to foobar'
|
||
workflow.possible_status[1].items.insert(0, aggregation)
|
||
workflow.store()
|
||
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'foobar'
|
||
formdef.fields = []
|
||
formdef.workflow_id = workflow.id
|
||
formdef.store()
|
||
formdef.data_class().wipe()
|
||
|
||
for i in range(5):
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.store()
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
assert AggregationEmail.count() == 1
|
||
|
||
send_aggregation_emails(pub)
|
||
assert AggregationEmail.count() == 0
|
||
assert 'New arrivals' in emails.emails
|
||
for i in range(5):
|
||
assert 'http://example.net/foobar/%s/status (New)' % (i+1) in emails.emails['New arrivals']['payload']
|
||
|
||
emails.empty()
|
||
send_aggregation_emails(pub)
|
||
assert 'New arrivals' not in emails.emails
|
||
|
||
role.emails = []
|
||
role.emails_to_members = True
|
||
role.store()
|
||
|
||
user = pub.user_class(name='bar')
|
||
user.email = 'bar@localhost'
|
||
user.roles = [role.id]
|
||
user.store()
|
||
|
||
formdata.perform_workflow()
|
||
assert AggregationEmail.count() == 1
|
||
|
||
send_aggregation_emails(pub)
|
||
assert AggregationEmail.count() == 0
|
||
assert 'New arrivals' in emails.emails
|
||
assert 'http://example.net/foobar/%s/status (New)' % formdata.id in emails.emails['New arrivals']['payload']
|
||
|
||
|
||
def test_create_formdata(two_pubs):
|
||
FormDef.wipe()
|
||
if two_pubs.is_using_postgresql():
|
||
two_pubs.loggederror_class.wipe()
|
||
two_pubs.tracking_code_class.wipe()
|
||
|
||
target_formdef = FormDef()
|
||
target_formdef.name = 'target form'
|
||
target_formdef.fields = [
|
||
StringField(id='0', label='string', varname='foo_string'),
|
||
]
|
||
target_formdef.store()
|
||
|
||
wf = Workflow(name='create-formdata')
|
||
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
|
||
create = CreateFormdataWorkflowStatusItem()
|
||
create.label = 'create a new linked form'
|
||
create.varname = 'resubmitted'
|
||
create.id = '_create'
|
||
create.mappings = [
|
||
Mapping(field_id='0', expression='=form_var_toto_string'),
|
||
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
|
||
Mapping(field_id='2', expression='=form_var_toto_item_raw'),
|
||
]
|
||
create.parent = wf.possible_status[1]
|
||
wf.possible_status[1].items.insert(0, create)
|
||
wf.store()
|
||
|
||
source_formdef = FormDef()
|
||
source_formdef.name = 'source form'
|
||
source_formdef.fields = []
|
||
source_formdef.workflow_id = wf.id
|
||
source_formdef.store()
|
||
|
||
formdata = source_formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
|
||
assert target_formdef.data_class().count() == 0
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 0
|
||
# check unconfigure action do nothing
|
||
formdata.perform_workflow()
|
||
assert target_formdef.data_class().count() == 0
|
||
|
||
create.formdef_slug = target_formdef.url_name
|
||
wf.store()
|
||
del source_formdef._workflow
|
||
formdata.perform_workflow()
|
||
assert target_formdef.data_class().count() == 1
|
||
|
||
if two_pubs.is_using_postgresql():
|
||
errors = two_pubs.loggederror_class.select()
|
||
assert len(errors) == 2
|
||
assert any('form_var_toto_string' in (error.exception_message or '') for error in errors)
|
||
assert any('Missing field' in error.summary for error in errors)
|
||
|
||
# no tracking code has been created
|
||
created_formdata = target_formdef.data_class().select()[0]
|
||
assert created_formdata.tracking_code is None
|
||
assert two_pubs.tracking_code_class.count() == 0
|
||
# now we want one
|
||
target_formdef.enable_tracking_codes = True
|
||
target_formdef.store()
|
||
target_formdef.data_class().wipe()
|
||
formdata.perform_workflow()
|
||
# and a tracking code is created
|
||
assert target_formdef.data_class().count() == 1
|
||
created_formdata = target_formdef.data_class().select()[0]
|
||
assert created_formdata.tracking_code is not None
|
||
assert two_pubs.tracking_code_class.count() == 1
|
||
assert two_pubs.tracking_code_class.select()[0].formdef_id == target_formdef.id
|
||
assert two_pubs.tracking_code_class.select()[0].formdata_id == str(created_formdata.id)
|
||
|
||
create.condition = {'type': 'python', 'value': '1 == 2'}
|
||
wf.store()
|
||
del source_formdef._workflow
|
||
target_formdef.data_class().wipe()
|
||
assert target_formdef.data_class().count() == 0
|
||
formdata.perform_workflow()
|
||
assert target_formdef.data_class().count() == 0
|
||
|
||
|
||
def test_create_carddata(two_pubs):
|
||
CardDef.wipe()
|
||
FormDef.wipe()
|
||
if two_pubs.is_using_postgresql():
|
||
two_pubs.loggederror_class.wipe()
|
||
|
||
carddef = CardDef()
|
||
carddef.name = 'My card'
|
||
carddef.fields = [
|
||
StringField(id='1', label='string'),
|
||
ItemField(id='2', label='List', items=['item1', 'item2'],
|
||
varname='clist'),
|
||
DateField(id='3', label='Date', varname='cdate'),
|
||
FileField(id='4', label='File', varname='cfile'),
|
||
]
|
||
carddef.store()
|
||
|
||
wf = Workflow(name='create-carddata')
|
||
wf.possible_status = Workflow.get_default_workflow().possible_status[:]
|
||
create = CreateCarddataWorkflowStatusItem()
|
||
create.label = 'Create CardDef'
|
||
create.varname = 'mycard'
|
||
create.id = '_create'
|
||
create.formdef_slug = carddef.url_name
|
||
create.mappings = [
|
||
Mapping(field_id='1', expression='=form_var_undefined'),
|
||
Mapping(field_id='2', expression='{{ form_var_list }}'),
|
||
Mapping(field_id='3', expression='{{ form_var_date }}'),
|
||
Mapping(field_id='4', expression='{{ form_var_file|default_if_none:"" }}'),
|
||
]
|
||
create.parent = wf.possible_status[1]
|
||
wf.possible_status[1].items.insert(0, create)
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'source form'
|
||
formdef.fields = [
|
||
ItemField(id='1', label='List', items=['item1', 'item2'], varname='list'),
|
||
DateField(id='2', label='Date', varname='date'),
|
||
FileField(id='3', label='File', varname='file'),
|
||
]
|
||
formdef.workflow_id = wf.id
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
formdata.just_created()
|
||
|
||
assert carddef.data_class().count() == 0
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {}
|
||
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
|
||
assert carddef.data_class().count() == 1
|
||
|
||
if two_pubs.is_using_postgresql():
|
||
errors = two_pubs.loggederror_class.select()
|
||
assert len(errors) == 2
|
||
assert any('form_var_undefined' in (error.exception_message or '') for error in errors)
|
||
assert any('invalid date value' in (error.exception_message or '') for error in errors)
|
||
|
||
formdata = formdef.data_class()()
|
||
today = datetime.date.today()
|
||
|
||
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
||
with open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb') as jpg:
|
||
upload.receive([jpg.read()])
|
||
|
||
formdata.data = {'1': 'item1',
|
||
'1_display': 'item1',
|
||
'2': today.timetuple(),
|
||
'3': upload}
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
|
||
assert formdata.get_substitution_variables()['form_links_mycard_form_number'] == '1-2'
|
||
carddata = carddef.data_class().get(id=2)
|
||
assert carddata.data['2'] == 'item1'
|
||
assert carddata.data['2_display'] == 'item1'
|
||
assert carddata.data['3'] == today.timetuple()
|
||
assert carddata.data['4'].base_filename == 'test.jpeg'
|
||
|
||
create.condition = {'type': 'python', 'value': '1 == 2'}
|
||
wf.store()
|
||
del formdef._workflow
|
||
carddef.data_class().wipe()
|
||
assert carddef.data_class().count() == 0
|
||
formdata.perform_workflow()
|
||
assert carddef.data_class().count() == 0
|
||
|
||
|
||
def test_call_external_workflow_with_evolution_linked_object(two_pubs):
|
||
FormDef.wipe()
|
||
CardDef.wipe()
|
||
if two_pubs.is_using_postgresql():
|
||
two_pubs.loggederror_class.wipe()
|
||
|
||
external_wf = Workflow(name='External Workflow')
|
||
st1 = external_wf.add_status(name='New')
|
||
action = external_wf.add_global_action('Delete', 'delete')
|
||
action.append_item('remove')
|
||
trigger = action.append_trigger('webservice')
|
||
trigger.identifier = 'delete'
|
||
external_wf.store()
|
||
|
||
external_formdef = FormDef()
|
||
external_formdef.name = 'External Form'
|
||
external_formdef.fields = [
|
||
StringField(id='0', label='string', varname='form_string'),
|
||
]
|
||
external_formdef.workflow = external_wf
|
||
external_formdef.store()
|
||
|
||
external_carddef = CardDef()
|
||
external_carddef.name = 'External Card'
|
||
external_carddef.fields = [
|
||
StringField(id='0', label='string', varname='card_string'),
|
||
]
|
||
external_carddef.workflow = external_wf
|
||
external_carddef.store()
|
||
|
||
wf = Workflow(name='External actions')
|
||
st1 = wf.add_status('Create external formdata')
|
||
create_formdata = CreateFormdataWorkflowStatusItem()
|
||
create_formdata.label = 'create linked form'
|
||
create_formdata.formdef_slug = external_formdef.url_name
|
||
create_formdata.varname = 'created_form'
|
||
create_formdata.id = '_create_form'
|
||
mappings = [
|
||
Mapping(field_id='0', expression='{{ form_var_string }}')
|
||
]
|
||
create_formdata.mappings = mappings
|
||
create_formdata.parent = st1
|
||
|
||
create_carddata = CreateCarddataWorkflowStatusItem()
|
||
create_carddata.label = 'create linked card'
|
||
create_carddata.formdef_slug = external_carddef.url_name
|
||
create_carddata.varname = 'created_card'
|
||
create_carddata.id = '_create_card'
|
||
create_carddata.mappings = mappings
|
||
create_carddata.parent = st1
|
||
|
||
st1.items.append(create_formdata)
|
||
st1.items.append(create_carddata)
|
||
|
||
global_action = wf.add_global_action('Delete external linked object', 'delete')
|
||
action = global_action.append_item('external_workflow_global_action')
|
||
action.slug = 'formdef:%s' % external_formdef.url_name
|
||
action.trigger_id = 'action:%s' % trigger.identifier
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'External action form'
|
||
formdef.fields = [
|
||
StringField(id='0', label='string', varname='string'),
|
||
]
|
||
formdef.workflow = wf
|
||
formdef.store()
|
||
|
||
assert external_formdef.data_class().count() == 0
|
||
assert external_carddef.data_class().count() == 0
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'0': 'test form'}
|
||
formdata.store()
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
|
||
assert external_formdef.data_class().count() == 1
|
||
assert external_carddef.data_class().count() == 1
|
||
external_formdata = external_formdef.data_class().select()[0]
|
||
|
||
# remove external formdata
|
||
perform_items([action], formdata)
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 0
|
||
assert external_formdef.data_class().count() == 0
|
||
assert external_carddef.data_class().count() == 1
|
||
|
||
# formdata is already deleted: cannot find it again
|
||
perform_items([action], formdata)
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
logged_error = two_pubs.loggederror_class.select()[0]
|
||
assert logged_error.summary == 'Could not find linked "External Form" object by id %s' % external_formdata.id
|
||
assert logged_error.exception_class == 'KeyError'
|
||
assert logged_error.status_item_id == action.id
|
||
|
||
# try remove an unexisting carddef: do nothing
|
||
unused_carddef = CardDef()
|
||
unused_carddef.name = 'External Card (not used)'
|
||
unused_carddef.fields = []
|
||
unused_carddef.workflow = external_wf
|
||
unused_carddef.store()
|
||
if two_pubs.is_using_postgresql():
|
||
two_pubs.loggederror_class.wipe()
|
||
action.slug = 'carddef:%s' % unused_carddef.url_name
|
||
wf.store()
|
||
perform_items([action], formdata)
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 0
|
||
assert external_formdef.data_class().count() == 0
|
||
assert external_carddef.data_class().count() == 1
|
||
# remove the right carddef
|
||
action.slug = 'carddef:%s' % external_carddef.url_name
|
||
wf.store()
|
||
perform_items([action], formdata)
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 0
|
||
assert external_formdef.data_class().count() == 0
|
||
assert external_carddef.data_class().count() == 0
|
||
|
||
|
||
def test_call_external_workflow_with_data_sourced_object(two_pubs):
|
||
FormDef.wipe()
|
||
CardDef.wipe()
|
||
if two_pubs.is_using_postgresql():
|
||
two_pubs.loggederror_class.wipe()
|
||
|
||
carddef_wf = Workflow(name='Carddef Workflow')
|
||
carddef_wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(carddef_wf)
|
||
carddef_wf.backoffice_fields_formdef.fields = [
|
||
StringField(id='bo0', varname='bo', type='string', label='bo variable'),
|
||
]
|
||
update_action = carddef_wf.add_global_action('Update')
|
||
update_action.append_item('set-backoffice-fields')
|
||
setbo = update_action.items[0]
|
||
setbo.fields = [{'field_id': 'bo0',
|
||
'value': '{{ form_var_bo|default:"0"|add:1 }}'}]
|
||
trigger = update_action.append_trigger('webservice')
|
||
trigger.identifier = 'update'
|
||
|
||
delete = carddef_wf.add_global_action('Delete', 'delete')
|
||
delete.append_item('remove')
|
||
trigger = delete.append_trigger('webservice')
|
||
trigger.identifier = 'delete'
|
||
carddef_wf.store()
|
||
|
||
carddef = CardDef()
|
||
carddef.name = 'Data'
|
||
carddef.fields = [
|
||
StringField(id='0', label='string', varname='card_string'),
|
||
]
|
||
carddef.digest_template = '{{ form_var_card_string }}'
|
||
carddef.workflow = carddef_wf
|
||
carddef.store()
|
||
|
||
carddata = carddef.data_class()()
|
||
carddata.data = {'0': 'Text'}
|
||
carddata.store()
|
||
|
||
wf = Workflow(name='External actions')
|
||
st1 = wf.add_status('Action')
|
||
|
||
update_global_action = wf.add_global_action('Update linked object data')
|
||
update_action = update_global_action.append_item('external_workflow_global_action')
|
||
update_action.slug = 'carddef:%s' % carddef.url_name
|
||
update_action.trigger_id = 'action:update'
|
||
|
||
delete_global_action = wf.add_global_action('Delete external linked object', 'delete')
|
||
delete_action = delete_global_action.append_item('external_workflow_global_action')
|
||
delete_action.slug = 'carddef:%s' % carddef.url_name
|
||
delete_action.trigger_id = 'action:delete'
|
||
wf.store()
|
||
|
||
datasource = {'type': 'carddef:%s' % carddef.url_name}
|
||
formdef = FormDef()
|
||
formdef.name = 'External action form'
|
||
formdef.fields = [
|
||
ItemField(id='0', label='Card',
|
||
type='item', varname='card',
|
||
data_source=datasource),
|
||
EmailField(id='1', label='Email',
|
||
varname='email')
|
||
]
|
||
formdef.workflow = wf
|
||
formdef.store()
|
||
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 0
|
||
assert carddef.data_class().count() == 1
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'0': '1', '1': 'foo@example.com'}
|
||
formdata.store()
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
|
||
perform_items([update_action], formdata)
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 0
|
||
assert carddef.data_class().count() == 1
|
||
data = carddef.data_class().select()[0]
|
||
assert data.data['bo0'] == '1'
|
||
|
||
perform_items([update_action], formdata)
|
||
data = carddef.data_class().select()[0]
|
||
assert data.data['bo0'] == '2'
|
||
|
||
perform_items([delete_action], formdata)
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 0
|
||
assert carddef.data_class().count() == 0
|
||
|
||
perform_items([delete_action], formdata)
|
||
if two_pubs.is_using_postgresql():
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
logged_error = two_pubs.loggederror_class.select()[0]
|
||
assert logged_error.summary == 'Could not find linked "Data" object by id %s' % carddata.id
|
||
assert logged_error.exception_class == 'KeyError'
|
||
|
||
|
||
def test_call_external_workflow_with_parent_object(pub):
|
||
FormDef.wipe()
|
||
CardDef.wipe()
|
||
|
||
# carddef workflow, with global action to increment a counter in its
|
||
# backoffice fields.
|
||
carddef_wf = Workflow(name='Carddef Workflow')
|
||
carddef_wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(carddef_wf)
|
||
carddef_wf.backoffice_fields_formdef.fields = [
|
||
StringField(id='bo0', varname='bo', type='string', label='bo variable'),
|
||
]
|
||
increment_global_action = carddef_wf.add_global_action('Update')
|
||
increment_global_action.append_item('set-backoffice-fields')
|
||
setbo = increment_global_action.items[0]
|
||
setbo.fields = [{'field_id': 'bo0',
|
||
'value': '{{ form_var_bo|default:"0"|add:1 }}'}]
|
||
trigger = increment_global_action.append_trigger('webservice')
|
||
trigger.identifier = 'update'
|
||
carddef_wf.store()
|
||
|
||
# associated carddef
|
||
carddef = CardDef()
|
||
carddef.name = 'Data'
|
||
carddef.fields = [
|
||
StringField(id='0', label='string', varname='card_string'),
|
||
]
|
||
carddef.workflow = carddef_wf
|
||
carddef.store()
|
||
|
||
# and sample carddata
|
||
carddata = carddef.data_class()()
|
||
carddata.data = {'0': 'Text'}
|
||
carddata.store()
|
||
|
||
# formdef workflow that will trigger the global action
|
||
wf = Workflow(name='External actions')
|
||
wf.add_status('Action')
|
||
|
||
update_global_action = wf.add_global_action('Update linked object data')
|
||
update_action = update_global_action.append_item('external_workflow_global_action')
|
||
update_action.slug = 'carddef:%s' % carddef.url_name
|
||
update_action.trigger_id = 'action:update'
|
||
wf.store()
|
||
|
||
# associated formdef
|
||
formdef = FormDef()
|
||
formdef.name = 'External action form'
|
||
formdef.fields = [EmailField(id='1', label='Email', varname='email')]
|
||
formdef.workflow = wf
|
||
formdef.store()
|
||
|
||
# and formdata
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'1': 'foo@example.com'}
|
||
formdata.store()
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
|
||
# run, against no parent
|
||
perform_items([update_action], formdata)
|
||
card = carddef.data_class().get(carddata.id)
|
||
assert 'bo0' not in card.data # not called
|
||
|
||
# other parent
|
||
formdata.submission_context = {
|
||
'orig_object_type': 'formdef',
|
||
'orig_formdata_id': str(formdata.id),
|
||
'orig_formdef_id': str(formdef.id),
|
||
}
|
||
formdata.store()
|
||
perform_items([update_action], formdata)
|
||
card = carddef.data_class().get(carddata.id)
|
||
assert 'bo0' not in card.data # not called
|
||
|
||
# appropriate parent
|
||
formdata.submission_context = {
|
||
'orig_object_type': 'carddef',
|
||
'orig_formdata_id': str(carddata.id),
|
||
'orig_formdef_id': str(carddef.id),
|
||
}
|
||
formdata.store()
|
||
perform_items([update_action], formdata)
|
||
card = carddef.data_class().get(carddata.id)
|
||
assert card.data['bo0'] == '1' # got called
|
||
|
||
|
||
def test_call_external_workflow_use_caller_variable(pub):
|
||
FormDef.wipe()
|
||
CardDef.wipe()
|
||
|
||
# carddef workflow, with global action to set a value in a backoffice field
|
||
carddef_wf = Workflow(name='Carddef Workflow')
|
||
carddef_wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(carddef_wf)
|
||
carddef_wf.backoffice_fields_formdef.fields = [
|
||
StringField(id='bo0', varname='bo', type='string', label='bo variable'),
|
||
]
|
||
global_action = carddef_wf.add_global_action('Update')
|
||
global_action.append_item('set-backoffice-fields')
|
||
setbo = global_action.items[0]
|
||
setbo.fields = [{'field_id': 'bo0',
|
||
'value': '{{ caller_form_var_email }}'}]
|
||
trigger = global_action.append_trigger('webservice')
|
||
trigger.identifier = 'update'
|
||
carddef_wf.store()
|
||
|
||
# associated carddef
|
||
carddef = CardDef()
|
||
carddef.name = 'Data'
|
||
carddef.fields = [
|
||
StringField(id='0', label='string', varname='card_string'),
|
||
]
|
||
carddef.workflow = carddef_wf
|
||
carddef.store()
|
||
|
||
# and sample carddata
|
||
carddata = carddef.data_class()()
|
||
carddata.data = {'0': 'Text'}
|
||
carddata.store()
|
||
|
||
# formdef workflow that will trigger the global action
|
||
wf = Workflow(name='External actions')
|
||
wf.add_status('Action')
|
||
|
||
update_global_action = wf.add_global_action('Update linked object data')
|
||
update_action = update_global_action.append_item('external_workflow_global_action')
|
||
update_action.slug = 'carddef:%s' % carddef.url_name
|
||
update_action.trigger_id = 'action:update'
|
||
wf.store()
|
||
|
||
# associated formdef
|
||
formdef = FormDef()
|
||
formdef.name = 'External action form'
|
||
formdef.fields = [EmailField(id='1', label='Email', varname='email')]
|
||
formdef.workflow = wf
|
||
formdef.store()
|
||
|
||
# and formdata
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {'1': 'foo@example.com'}
|
||
formdata.store()
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
|
||
# run, against no parent
|
||
perform_items([update_action], formdata)
|
||
card = carddef.data_class().get(carddata.id)
|
||
assert 'bo0' not in card.data # not called
|
||
|
||
# appropriate parent
|
||
formdata.submission_context = {
|
||
'orig_object_type': 'carddef',
|
||
'orig_formdata_id': str(carddata.id),
|
||
'orig_formdef_id': str(carddef.id),
|
||
}
|
||
formdata.store()
|
||
perform_items([update_action], formdata)
|
||
card = carddef.data_class().get(carddata.id)
|
||
assert card.data['bo0'] == 'foo@example.com' # got called
|
||
|
||
|
||
def test_call_external_workflow_manual_targeting(two_pubs):
|
||
if not two_pubs.is_using_postgresql():
|
||
pytest.skip('this requires SQL')
|
||
return
|
||
|
||
FormDef.wipe()
|
||
CardDef.wipe()
|
||
two_pubs.loggederror_class.wipe()
|
||
|
||
# carddef workflow, with global action to increment a counter in its
|
||
# backoffice fields.
|
||
carddef_wf = Workflow(name='Carddef Workflow')
|
||
carddef_wf.add_status(name='New')
|
||
carddef_wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(carddef_wf)
|
||
carddef_wf.backoffice_fields_formdef.fields = [
|
||
StringField(id='bo0', varname='bo', type='string', label='bo variable'),
|
||
]
|
||
global_action = carddef_wf.add_global_action('Update')
|
||
global_action.append_item('set-backoffice-fields')
|
||
setbo = global_action.items[0]
|
||
setbo.fields = [{'field_id': 'bo0',
|
||
'value': '{{ form_var_bo|default:"0"|add:1 }}'}]
|
||
trigger = global_action.append_trigger('webservice')
|
||
trigger.identifier = 'update'
|
||
carddef_wf.store()
|
||
|
||
# associated carddef
|
||
carddef = CardDef()
|
||
carddef.name = 'Data'
|
||
carddef.fields = [
|
||
StringField(id='0', label='string', varname='card_string'),
|
||
]
|
||
carddef.workflow = carddef_wf
|
||
carddef.store()
|
||
carddef.data_class().wipe()
|
||
|
||
# and sample carddatas
|
||
for i in range(1, 4):
|
||
carddata = carddef.data_class()()
|
||
carddata.data = {'0': 'Text %s' % i}
|
||
carddata.store()
|
||
|
||
# formdef workflow that will trigger the global action
|
||
wf = Workflow(name='External actions')
|
||
st1 = wf.add_status('Action')
|
||
|
||
update_global_action = wf.add_global_action('Update linked object data')
|
||
update_action = update_global_action.append_item('external_workflow_global_action')
|
||
update_action.slug = 'carddef:%s' % carddef.url_name
|
||
update_action.target_mode = 'manual'
|
||
update_action.target_id = None # not configured
|
||
update_action.trigger_id = 'action:update'
|
||
|
||
# and create carddata
|
||
create_carddata = CreateCarddataWorkflowStatusItem()
|
||
create_carddata.label = 'create linked card'
|
||
create_carddata.formdef_slug = carddef.url_name
|
||
create_carddata.varname = 'created_card'
|
||
create_carddata.id = '_create_card'
|
||
create_carddata.mappings = [
|
||
Mapping(field_id='0', expression='{{ form_var_string }}')
|
||
]
|
||
create_carddata.parent = st1
|
||
st1.items.append(create_carddata)
|
||
|
||
wf.store()
|
||
|
||
# associated formdef
|
||
datasource = {'type': 'carddef:%s' % carddef.url_name}
|
||
formdef = FormDef()
|
||
formdef.name = 'External action form'
|
||
formdef.fields = [
|
||
ItemField(
|
||
id='0', label='Card',
|
||
type='item', varname='card',
|
||
data_source=datasource),
|
||
StringField(id='1', label='string', varname='string'),
|
||
]
|
||
formdef.workflow = wf
|
||
formdef.store()
|
||
|
||
# and formdata
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {
|
||
'0': '3', # set from datasource
|
||
'1': '1',
|
||
}
|
||
# set parent
|
||
formdata.submission_context = {
|
||
'orig_object_type': 'carddef',
|
||
'orig_formdata_id': '2',
|
||
'orig_formdef_id': str(carddef.id),
|
||
}
|
||
formdata.store()
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
assert carddef.data_class().count() == 4
|
||
assert carddef.data_class().get(1).data['bo0'] is None
|
||
assert carddef.data_class().get(2).data['bo0'] is None
|
||
assert carddef.data_class().get(3).data['bo0'] is None
|
||
assert carddef.data_class().get(4).data['bo0'] is None
|
||
# linked carddata
|
||
assert carddef.data_class().get(4).data['0'] == '1'
|
||
|
||
# target not configured
|
||
perform_items([update_action], formdata)
|
||
assert carddef.data_class().get(1).data['bo0'] is None
|
||
assert carddef.data_class().get(2).data['bo0'] is None
|
||
assert carddef.data_class().get(3).data['bo0'] is None
|
||
assert carddef.data_class().get(4).data['bo0'] is None
|
||
assert two_pubs.loggederror_class.count() == 0
|
||
|
||
# configure target
|
||
update_action.target_id = '{{ form_var_string }}' # == '1'
|
||
wf.store()
|
||
perform_items([update_action], formdata)
|
||
assert carddef.data_class().get(1).data['bo0'] == '1'
|
||
assert carddef.data_class().get(2).data['bo0'] is None
|
||
assert carddef.data_class().get(3).data['bo0'] is None
|
||
assert carddef.data_class().get(4).data['bo0'] is None
|
||
|
||
# target not found
|
||
update_action.target_id = '42{{ form_var_string }}' # == '421'
|
||
wf.store()
|
||
perform_items([update_action], formdata)
|
||
assert carddef.data_class().get(1).data['bo0'] == '1'
|
||
assert carddef.data_class().get(2).data['bo0'] is None
|
||
assert carddef.data_class().get(3).data['bo0'] is None
|
||
assert carddef.data_class().get(4).data['bo0'] is None
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
logged_error = two_pubs.loggederror_class.select()[0]
|
||
assert logged_error.summary == 'Could not find targeted "Data" object by id 421'
|
||
|
||
|
||
def test_edit_carddata_with_data_sourced_object(pub):
|
||
FormDef.wipe()
|
||
CardDef.wipe()
|
||
|
||
datasource = {
|
||
'type': 'formula',
|
||
'value': repr(
|
||
[{'id': 'b', 'text': 'baker', 'extra': 'plop'},
|
||
{'id': 'c', 'text': 'cook', 'extra': 'plop2'},
|
||
{'id': 'l', 'text': 'lawyer', 'extra': 'plop3'}])
|
||
}
|
||
carddef = CardDef()
|
||
carddef.name = 'Person'
|
||
carddef.fields = [
|
||
StringField(id='0', label='First Name', varname='first_name'),
|
||
StringField(id='1', label='Last Name', varname='last_name'),
|
||
ItemField(id='2', label='Profession', type='item',
|
||
varname='profession', data_source=datasource),
|
||
]
|
||
carddef.digest_template = '{{ form_var_first_name }} {{ form_var_last_name }}'
|
||
carddef.store()
|
||
|
||
carddata = carddef.data_class()()
|
||
carddata.data = {
|
||
'0': 'Foo',
|
||
'1': 'Bar',
|
||
'2': 'l'
|
||
}
|
||
carddata.data['2_display'] = carddef.fields[2].store_display_value(carddata.data, '2')
|
||
carddata.data['2_structured'] = carddef.fields[2].store_structured_value(carddata.data, '2')
|
||
carddata.store()
|
||
|
||
wf = Workflow(name='Card update')
|
||
st1 = wf.add_status('Update card', 'st1')
|
||
|
||
edit = EditCarddataWorkflowStatusItem()
|
||
edit.formdef_slug = carddef.url_name
|
||
edit.mappings = [
|
||
Mapping(field_id='2', expression='{{ form_var_new_profession }}'),
|
||
]
|
||
edit.id = 'edit'
|
||
st1.items.append(edit)
|
||
edit.parent = st1
|
||
wf.store()
|
||
|
||
datasource = {'type': 'carddef:%s' % carddef.url_name}
|
||
formdef = FormDef()
|
||
formdef.name = 'Persons'
|
||
formdef.fields = [
|
||
ItemField(id='0', label='Person',
|
||
type='item', varname='person',
|
||
data_source=datasource),
|
||
StringField(id='1', label='New profession',
|
||
varname='new_profession')
|
||
]
|
||
formdef.workflow = wf
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {
|
||
'0': '1',
|
||
'1': 'c'
|
||
}
|
||
formdata.store()
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
|
||
data = carddef.data_class().select()[0]
|
||
assert data.data['2'] == 'c'
|
||
assert data.data['2_display'] == 'cook'
|
||
assert data.data['2_structured'] == {'id': 'c', 'text': 'cook', 'extra': 'plop2'}
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {
|
||
'0': '1',
|
||
'1': 'b'
|
||
}
|
||
formdata.store()
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
|
||
data = carddef.data_class().select()[0]
|
||
assert data.data['2'] == 'b'
|
||
assert data.data['2_display'] == 'baker'
|
||
assert data.data['2_structured'] == {'id': 'b', 'text': 'baker', 'extra': 'plop'}
|
||
|
||
# reset data
|
||
for expression in ('=None', '', '""'):
|
||
edit.mappings = [
|
||
Mapping(field_id='2', expression='=None'),
|
||
]
|
||
wf.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {
|
||
'0': '1',
|
||
'1': 'b'
|
||
}
|
||
formdata.store()
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
|
||
carddata = carddef.data_class().select()[0]
|
||
assert carddata.data['2'] is None
|
||
assert carddata.data.get('2_display') is None
|
||
assert carddata.data.get('2_structured') is None
|
||
|
||
# restore initial data
|
||
carddata.data = data.data
|
||
carddata.store()
|
||
|
||
|
||
def test_edit_carddata_with_linked_object(pub):
|
||
FormDef.wipe()
|
||
CardDef.wipe()
|
||
|
||
carddef = CardDef()
|
||
carddef.name = 'Parent'
|
||
carddef.fields = [
|
||
StringField(id='0', label='First Name', varname='first_name'),
|
||
StringField(id='1', label='Last Name', varname='last_name'),
|
||
StringField(id='2', label='Kids number', varname='kids_number'),
|
||
]
|
||
carddef.store()
|
||
|
||
wf = Workflow(name='Card create and update')
|
||
st1 = wf.add_status('Create card', 'st1')
|
||
edit = CreateCarddataWorkflowStatusItem()
|
||
edit.formdef_slug = carddef.url_name
|
||
edit.mappings = [
|
||
Mapping(field_id='0', expression='{{ form_var_first_name }}'),
|
||
Mapping(field_id='1', expression='{{ form_var_last_name }}'),
|
||
Mapping(field_id='2', expression='{{ form_var_kids_number|default:"0" }}'),
|
||
]
|
||
st1.items.append(edit)
|
||
edit.parent = st1
|
||
jump = JumpWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
jump.by = ['_submitter', '_receiver']
|
||
jump.status = 'st2'
|
||
st1.items.append(jump)
|
||
jump.parent = st1
|
||
|
||
st2 = wf.add_status('Update card', 'st2')
|
||
edit = EditCarddataWorkflowStatusItem()
|
||
edit.formdef_slug = carddef.url_name
|
||
edit.mappings = [
|
||
Mapping(field_id='2', expression='{{ form_var_kids_number|add:"1" }}'),
|
||
]
|
||
edit.id = 'edit'
|
||
st2.items.append(edit)
|
||
edit.parent = st2
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'Parents'
|
||
formdef.fields = [
|
||
StringField(id='0', label='First Name',
|
||
varname='first_name'),
|
||
StringField(id='1', label='Last Name',
|
||
varname='last_name'),
|
||
StringField(id='2', label='Number of kids',
|
||
varname='kids_number')
|
||
]
|
||
formdef.workflow = wf
|
||
formdef.store()
|
||
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {
|
||
'0': 'Parent',
|
||
'1': 'Foo',
|
||
'2': '2'
|
||
}
|
||
formdata.store()
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
|
||
assert carddef.data_class().count() == 1
|
||
card_data = carddef.data_class().select()[0]
|
||
assert card_data.data['2'] == '3'
|
||
|
||
|
||
def test_edit_carddata_manual_targeting(two_pubs):
|
||
if not two_pubs.is_using_postgresql():
|
||
pytest.skip('this requires SQL')
|
||
return
|
||
|
||
FormDef.wipe()
|
||
CardDef.wipe()
|
||
two_pubs.loggederror_class.wipe()
|
||
|
||
# carddef
|
||
carddef = CardDef()
|
||
carddef.name = 'Parent'
|
||
carddef.fields = [
|
||
StringField(id='0', label='First Name', varname='first_name'),
|
||
StringField(id='1', label='Last Name', varname='last_name'),
|
||
StringField(id='2', label='Kids number', varname='kids_number'),
|
||
]
|
||
carddef.store()
|
||
carddef.data_class().wipe()
|
||
|
||
# and sample carddatas
|
||
for i in range(1, 4):
|
||
carddata = carddef.data_class()()
|
||
carddata.data = {
|
||
'0': 'First name %s' % i,
|
||
'1': 'Last name %s' % i,
|
||
'2': '0',
|
||
}
|
||
carddata.store()
|
||
|
||
# formdef workflow that will update carddata
|
||
wf = Workflow(name='Card create and update')
|
||
st1 = wf.add_status('Create card', 'st1')
|
||
# create linked carddata
|
||
edit = CreateCarddataWorkflowStatusItem()
|
||
edit.formdef_slug = carddef.url_name
|
||
edit.mappings = [
|
||
Mapping(field_id='0', expression='{{ form_var_first_name }}'),
|
||
Mapping(field_id='1', expression='{{ form_var_last_name }}'),
|
||
Mapping(field_id='2', expression='{{ form_var_kids_number|default:"0" }}'),
|
||
]
|
||
st1.items.append(edit)
|
||
edit.parent = st1
|
||
jump = JumpWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
jump.by = ['_submitter', '_receiver']
|
||
jump.status = 'st2'
|
||
st1.items.append(jump)
|
||
jump.parent = st1
|
||
|
||
st2 = wf.add_status('Update card', 'st2')
|
||
edit = EditCarddataWorkflowStatusItem()
|
||
edit.formdef_slug = carddef.url_name
|
||
edit.target_mode = 'manual' # not configured
|
||
edit.mappings = [
|
||
Mapping(field_id='2', expression='{{ form_var_kids_number|add:"1" }}'),
|
||
]
|
||
edit.id = 'edit'
|
||
st2.items.append(edit)
|
||
edit.parent = st2
|
||
wf.store()
|
||
|
||
# associated formdef
|
||
formdef = FormDef()
|
||
formdef.name = 'Parents'
|
||
datasource = {'type': 'carddef:%s' % carddef.url_name}
|
||
formdef.fields = [
|
||
StringField(id='0', label='First Name',
|
||
varname='first_name'),
|
||
StringField(id='1', label='Last Name',
|
||
varname='last_name'),
|
||
StringField(id='2', label='Number of kids',
|
||
varname='kids_number'),
|
||
ItemField(
|
||
id='3', label='Card',
|
||
type='item', varname='card',
|
||
data_source=datasource),
|
||
StringField(id='4', label='string', varname='string'),
|
||
]
|
||
formdef.workflow = wf
|
||
formdef.store()
|
||
|
||
# create formdatas
|
||
|
||
# target not configured
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {
|
||
'0': 'Parent',
|
||
'1': 'Foo',
|
||
'2': '2',
|
||
'3': '3', # set from datasource
|
||
'4': '1',
|
||
}
|
||
# set parent
|
||
formdata.submission_context = {
|
||
'orig_object_type': 'carddef',
|
||
'orig_formdata_id': '2',
|
||
'orig_formdef_id': str(carddef.id),
|
||
}
|
||
formdata.store()
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
|
||
assert carddef.data_class().count() == 4
|
||
assert carddef.data_class().get(1).data['2'] == '0'
|
||
assert carddef.data_class().get(2).data['2'] == '0'
|
||
assert carddef.data_class().get(3).data['2'] == '0'
|
||
assert carddef.data_class().get(4).data['2'] == '2'
|
||
assert two_pubs.loggederror_class.count() == 0
|
||
|
||
# configure target
|
||
edit.target_id = '{{ form_var_string }}' # == '1'
|
||
wf.store()
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {
|
||
'0': 'Parent',
|
||
'1': 'Foo',
|
||
'2': '2',
|
||
'3': '3', # set from datasource
|
||
'4': '1',
|
||
}
|
||
# set parent
|
||
formdata.submission_context = {
|
||
'orig_object_type': 'carddef',
|
||
'orig_formdata_id': '2',
|
||
'orig_formdef_id': str(carddef.id),
|
||
}
|
||
formdata.store()
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
assert carddef.data_class().count() == 5
|
||
assert carddef.data_class().get(1).data['2'] == '3' # 2 + 1
|
||
assert carddef.data_class().get(2).data['2'] == '0'
|
||
assert carddef.data_class().get(3).data['2'] == '0'
|
||
assert carddef.data_class().get(4).data['2'] == '2'
|
||
assert carddef.data_class().get(5).data['2'] == '2'
|
||
assert two_pubs.loggederror_class.count() == 0
|
||
|
||
# target not found
|
||
edit.target_id = '42{{ form_var_string }}' # == '421'
|
||
wf.store()
|
||
formdata = formdef.data_class()()
|
||
formdata.data = {
|
||
'0': 'Parent',
|
||
'1': 'Foo',
|
||
'2': '2',
|
||
'3': '3', # set from datasource
|
||
'4': '1',
|
||
}
|
||
# set parent
|
||
formdata.submission_context = {
|
||
'orig_object_type': 'carddef',
|
||
'orig_formdata_id': '2',
|
||
'orig_formdef_id': str(carddef.id),
|
||
}
|
||
formdata.store()
|
||
formdata.just_created()
|
||
formdata.perform_workflow()
|
||
assert carddef.data_class().count() == 6
|
||
assert carddef.data_class().get(1).data['2'] == '3' # not changed
|
||
assert carddef.data_class().get(2).data['2'] == '0'
|
||
assert carddef.data_class().get(3).data['2'] == '0'
|
||
assert carddef.data_class().get(4).data['2'] == '2'
|
||
assert carddef.data_class().get(5).data['2'] == '2'
|
||
assert carddef.data_class().get(6).data['2'] == '2'
|
||
assert two_pubs.loggederror_class.count() == 1
|
||
logged_error = two_pubs.loggederror_class.select()[0]
|
||
assert logged_error.summary == 'Could not find targeted "Parent" object by id 421'
|