1604 lines
52 KiB
Python
1604 lines
52 KiB
Python
import datetime
|
|
import pytest
|
|
import shutil
|
|
import time
|
|
import urllib2
|
|
import urlparse
|
|
|
|
import mock
|
|
|
|
from quixote import cleanup, get_response
|
|
from wcs.qommon.http_request import HTTPRequest
|
|
from qommon.form import *
|
|
|
|
from wcs.formdef import FormDef
|
|
from wcs import sessions
|
|
from wcs.fields import StringField, DateField, MapField, FileField
|
|
from wcs.roles import Role
|
|
from wcs.workflows import (Workflow, WorkflowStatusItem,
|
|
SendmailWorkflowStatusItem, SendSMSWorkflowStatusItem,
|
|
DisplayMessageWorkflowStatusItem,
|
|
AbortActionException, WorkflowCriticalityLevel,
|
|
AttachmentEvolutionPart)
|
|
from wcs.wf.anonymise import AnonymiseWorkflowStatusItem
|
|
from wcs.wf.criticality import ModifyCriticalityWorkflowStatusItem, MODE_INC, MODE_DEC, MODE_SET
|
|
from wcs.wf.dispatch import DispatchWorkflowStatusItem
|
|
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
|
|
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
|
|
from wcs.wf.profile import UpdateUserProfileStatusItem
|
|
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
|
|
from wcs.wf.remove import RemoveWorkflowStatusItem
|
|
from wcs.wf.roles import AddRoleWorkflowStatusItem, RemoveRoleWorkflowStatusItem
|
|
from wcs.wf.wscall import WebserviceCallStatusItem
|
|
from wcs.wf.export_to_model import transform_to_pdf
|
|
from wcs.wf.geolocate import GeolocateWorkflowStatusItem
|
|
|
|
from utilities import (create_temporary_pub, MockSubstitutionVariables, emails,
|
|
http_requests, clean_temporary_pub, sms_mocking)
|
|
|
|
def setup_module(module):
|
|
cleanup()
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
|
|
def pytest_generate_tests(metafunc):
|
|
if 'two_pubs' in metafunc.fixturenames:
|
|
metafunc.parametrize('two_pubs', ['pickle', 'sql'], indirect=True)
|
|
|
|
@pytest.fixture
|
|
def pub(request):
|
|
pub = create_temporary_pub()
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
|
|
req.response.filter = {}
|
|
req.user = None
|
|
pub._set_request(req)
|
|
req.session = sessions.BasicSession(id=1)
|
|
pub.set_config(req)
|
|
return pub
|
|
|
|
@pytest.fixture
|
|
def two_pubs(request):
|
|
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
|
|
req.response.filter = {}
|
|
req.user = None
|
|
pub._set_request(req)
|
|
req.session = sessions.BasicSession(id=1)
|
|
pub.set_config(req)
|
|
return pub
|
|
|
|
def test_get_json_export_dict(pub):
|
|
workflow = Workflow(name='wf')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
st2.forced_endpoint = True
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = ['_submitter', '_receiver']
|
|
jump.timeout = 0.1
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
workflow.roles['_other'] = 'Other Function'
|
|
root = workflow.get_json_export_dict()
|
|
assert set(root.keys()) >= set(['statuses', 'name', 'functions'])
|
|
|
|
assert root['name'] == 'wf'
|
|
assert len(root['statuses']) == 2
|
|
assert set(st['id'] for st in root['statuses']) == set(['st1', 'st2'])
|
|
assert all(set(status.keys()) >= set(['id', 'name', 'forced_endpoint']) for status in
|
|
root['statuses'])
|
|
assert root['statuses'][0]['id'] == 'st1'
|
|
assert root['statuses'][0]['name'] == 'Status1'
|
|
assert root['statuses'][0]['forced_endpoint'] is False
|
|
assert root['statuses'][0]['endpoint'] is False
|
|
assert root['statuses'][1]['id'] == 'st2'
|
|
assert root['statuses'][1]['name'] == 'Status2'
|
|
assert root['statuses'][1]['forced_endpoint'] is True
|
|
assert root['statuses'][1]['endpoint'] is True
|
|
|
|
def test_variable_compute(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.fields = [StringField(id='1', label='Test', type='string', varname='foo'),]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'1': 'hello'}
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = JumpWorkflowStatusItem()
|
|
|
|
# straight string
|
|
assert item.compute('blah') == 'blah'
|
|
|
|
# ezt string
|
|
assert item.compute('[form_var_foo]') == 'hello'
|
|
# ezt string, but not ezt asked
|
|
assert item.compute('[form_var_foo]', do_ezt=False) == '[form_var_foo]'
|
|
# ezt string, with an error
|
|
assert item.compute('[end]', raises=False) == '[end]'
|
|
with pytest.raises(Exception):
|
|
item.compute('[end]', raises=True)
|
|
|
|
# python expression
|
|
assert item.compute('=form_var_foo') == 'hello'
|
|
# python expression, with an error
|
|
assert item.compute('=1/0', raises=False) == '=1/0'
|
|
with pytest.raises(Exception):
|
|
item.compute('=1/0', raises=True)
|
|
|
|
def test_jump_nothing(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
item = JumpWorkflowStatusItem()
|
|
assert item.must_jump(formdata) is True
|
|
|
|
def test_jump_datetime_condition(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
item = JumpWorkflowStatusItem()
|
|
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
|
item.condition = 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
|
|
yesterday.timetuple()[:3]
|
|
assert item.must_jump(formdata) is True
|
|
|
|
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
|
|
item.condition = 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
|
|
tomorrow.timetuple()[:3]
|
|
assert item.must_jump(formdata) is False
|
|
|
|
def test_jump_date_conditions(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.fields = [DateField(id='2', label='Date', type='date', varname='date')]
|
|
formdef.store()
|
|
|
|
# create/store/get, to make sure the date format is acceptable
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'2': DateField().convert_value_from_str('2015-01-04')}
|
|
formdata.store()
|
|
formdata = formdef.data_class().get(formdata.id)
|
|
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = JumpWorkflowStatusItem()
|
|
item.condition = 'utils.make_date(form_var_date) == utils.make_date("2015-01-04")'
|
|
assert item.must_jump(formdata) is True
|
|
|
|
item = JumpWorkflowStatusItem()
|
|
item.condition = 'utils.time_delta(form_var_date, "2015-01-04").days == 0'
|
|
assert item.must_jump(formdata) is True
|
|
|
|
item = JumpWorkflowStatusItem()
|
|
item.condition = 'utils.time_delta(utils.today(), "2015-01-04").days > 0'
|
|
assert item.must_jump(formdata) is True
|
|
|
|
item = JumpWorkflowStatusItem()
|
|
item.condition = 'utils.time_delta(datetime.datetime.now(), "2015-01-04").days > 0'
|
|
assert item.must_jump(formdata) is True
|
|
|
|
item = JumpWorkflowStatusItem()
|
|
item.condition = 'utils.time_delta(utils.time.localtime(), "2015-01-04").days > 0'
|
|
assert item.must_jump(formdata) is True
|
|
|
|
|
|
def test_jump_count_condition(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.store()
|
|
pub.substitutions.feed(formdef)
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
item = JumpWorkflowStatusItem()
|
|
item.condition = 'form_objects.count < 2'
|
|
assert item.must_jump(formdata) is True
|
|
|
|
for i in range(10):
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
|
|
item.condition = 'form_objects.count < 2'
|
|
assert item.must_jump(formdata) is False
|
|
|
|
def test_check_auth(pub):
|
|
user = pub.user_class(name='foo')
|
|
user.store()
|
|
|
|
role = Role(name='bar1')
|
|
role.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
|
|
status_item = WorkflowStatusItem()
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
status_item.by = []
|
|
assert status_item.check_auth(formdata, user) is False
|
|
|
|
status_item.by = ['logged-users']
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
status_item.by = [role.id]
|
|
assert status_item.check_auth(formdata, user) is False
|
|
status_item.by = [int(role.id)]
|
|
assert status_item.check_auth(formdata, user) is False
|
|
|
|
user.roles = [role.id]
|
|
status_item.by = [role.id]
|
|
assert status_item.check_auth(formdata, user) is True
|
|
status_item.by = [int(role.id)]
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
status_item.by = ['_submitter']
|
|
assert status_item.check_auth(formdata, user) is False
|
|
formdata.user_id = user.id
|
|
assert status_item.check_auth(formdata, user) is True
|
|
formdata.user_id = None
|
|
|
|
status_item.by = ['_receiver']
|
|
assert status_item.check_auth(formdata, user) is False
|
|
formdata.workflow_roles = {'_receiver': user.id}
|
|
assert status_item.check_auth(formdata, user) is True
|
|
formdef.workflow_roles = {'_receiver': user.id}
|
|
formdata.workflow_roles = None
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
def test_dispatch(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
Role.wipe()
|
|
role = Role(name='xxx')
|
|
role.store()
|
|
|
|
item = DispatchWorkflowStatusItem()
|
|
|
|
formdata = formdef.data_class()()
|
|
item.perform(formdata)
|
|
assert not formdata.workflow_roles
|
|
|
|
formdata = formdef.data_class()()
|
|
item.role_key = '_receiver'
|
|
item.role_id = role.id
|
|
item.perform(formdata)
|
|
assert formdata.workflow_roles == {'_receiver': role.id}
|
|
|
|
def test_dispatch_auto(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
StringField(id='1', label='Test', type='string', varname='foo'),
|
|
]
|
|
formdef.store()
|
|
|
|
item = DispatchWorkflowStatusItem()
|
|
item.role_key = '_receiver'
|
|
item.dispatch_type = 'automatic'
|
|
|
|
formdata = formdef.data_class()()
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert not formdata.workflow_roles
|
|
|
|
Role.wipe()
|
|
role1 = Role('xxx1')
|
|
role1.store()
|
|
role2 = Role('xxx2')
|
|
role2.store()
|
|
|
|
item.variable = 'form_var_foo'
|
|
item.rules = [
|
|
{'role_id': role1.id, 'value': 'foo'},
|
|
{'role_id': role2.id, 'value': 'bar'},
|
|
]
|
|
|
|
item.perform(formdata)
|
|
assert not formdata.workflow_roles
|
|
|
|
# no match
|
|
formdata.data = {'1': 'XXX'}
|
|
item.perform(formdata)
|
|
assert not formdata.workflow_roles
|
|
|
|
# match
|
|
formdata.data = {'1': 'foo'}
|
|
item.perform(formdata)
|
|
assert formdata.workflow_roles == {'_receiver': role1.id}
|
|
|
|
# other match
|
|
formdata.data = {'1': 'bar'}
|
|
item.perform(formdata)
|
|
assert formdata.workflow_roles == {'_receiver': role2.id}
|
|
|
|
def test_roles(pub):
|
|
user = pub.user_class()
|
|
user.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.user_id = user.id
|
|
|
|
item = AddRoleWorkflowStatusItem()
|
|
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
item.role_id = '1'
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == ['1']
|
|
|
|
user.roles = None
|
|
user.store()
|
|
item = RemoveRoleWorkflowStatusItem()
|
|
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
item.role_id = '1'
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
user.roles = ['1']
|
|
user.store()
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
user.roles = ['2', '1']
|
|
user.store()
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == ['2']
|
|
|
|
def test_roles_idp(pub):
|
|
pub.cfg['sp'] = {'idp-manage-user-attributes': True}
|
|
pub.cfg['idp'] = {'xxx': {'metadata_url': 'http://idp.example.net/idp/saml2/metadata'}}
|
|
pub.write_cfg()
|
|
user = pub.user_class()
|
|
user.name_identifiers = ['xxx']
|
|
user.store()
|
|
|
|
role = Role(name='bar1')
|
|
role.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.user_id = user.id
|
|
|
|
item = AddRoleWorkflowStatusItem()
|
|
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
|
|
http_post_request.return_value = (None, 201, '', None)
|
|
get_response().process_after_jobs()
|
|
assert http_post_request.call_count == 0
|
|
|
|
item.role_id = role.id
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == [role.id]
|
|
with mock.patch('wcs.wf.roles.http_post_request') as http_post_request:
|
|
http_post_request.return_value = (None, 201, '', None)
|
|
get_response().process_after_jobs()
|
|
assert http_post_request.call_count == 1
|
|
assert http_post_request.call_args[0][0].startswith(
|
|
'http://idp.example.net/api/roles/bar1/members/xxx/')
|
|
|
|
user.roles = None
|
|
user.store()
|
|
item = RemoveRoleWorkflowStatusItem()
|
|
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
|
|
http_delete_request.return_value = (None, 200, '', None)
|
|
get_response().process_after_jobs()
|
|
assert http_delete_request.call_count == 0
|
|
|
|
item.role_id = role.id
|
|
user.roles = [role.id]
|
|
user.store()
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
with mock.patch('wcs.wf.roles.http_delete_request') as http_delete_request:
|
|
http_delete_request.return_value = (None, 200, '', None)
|
|
get_response().process_after_jobs()
|
|
assert http_delete_request.call_count == 1
|
|
assert http_delete_request.call_args[0][0].startswith(
|
|
'http://idp.example.net/api/roles/bar1/members/xxx/')
|
|
|
|
def test_anonymise(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.user_id = '1'
|
|
formdata.submission_context = {'foo': 'bar'}
|
|
formdata.store()
|
|
|
|
item = AnonymiseWorkflowStatusItem()
|
|
item.perform(formdata)
|
|
assert formdef.data_class().get(formdata.id).user_id is None
|
|
assert formdef.data_class().get(formdata.id).anonymised
|
|
assert formdef.data_class().get(formdata.id).submission_context is None
|
|
|
|
def test_remove(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
|
|
item = RemoveWorkflowStatusItem()
|
|
assert formdef.data_class().has_key(formdata.id)
|
|
assert item.perform(formdata) == 'http://example.net'
|
|
assert not formdef.data_class().has_key(formdata.id)
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
|
|
item = RemoveWorkflowStatusItem()
|
|
req = pub.get_request()
|
|
req.response.filter['in_backoffice'] = True
|
|
assert formdef.data_class().has_key(formdata.id)
|
|
assert item.perform(formdata) == '..'
|
|
assert not formdef.data_class().has_key(formdata.id)
|
|
req.response.filter = {}
|
|
assert req.session.message
|
|
|
|
def test_register_comment(pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = RegisterCommenterWorkflowStatusItem()
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == ''
|
|
|
|
item.comment = 'Hello world'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>Hello world</p>'
|
|
|
|
item.comment = '<div>Hello world</div>'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div>Hello world</div>'
|
|
|
|
item.comment = '[test]'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>[test]</p>'
|
|
|
|
item.comment = '[bar]'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>Foobar</p>'
|
|
|
|
item.comment = '[foo]'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 < 3</p>'
|
|
|
|
item.comment = '<div>[foo]</div>'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
|
|
|
|
def test_email(pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
user = pub.user_class(name='foo')
|
|
user.email = 'zorg@localhost'
|
|
user.store()
|
|
|
|
Role.wipe()
|
|
role1 = Role(name='foo')
|
|
role1.emails = ['foo@localhost']
|
|
role1.store()
|
|
|
|
role2 = Role(name='bar')
|
|
role2.emails = ['bar@localhost', 'baz@localhost']
|
|
role2.store()
|
|
|
|
emails.empty()
|
|
# send using an uncompleted element
|
|
item = SendmailWorkflowStatusItem()
|
|
item.perform(formdata) # nothing
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
item.to = [role1.id]
|
|
item.perform(formdata) # no subject nor body
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
item.subject = 'foobar'
|
|
item.perform(formdata) # no body
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
# send for real
|
|
item.body = 'baz'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')
|
|
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost']
|
|
|
|
# two recipients
|
|
emails.empty()
|
|
item.to = [role1.id, role2.id]
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['to'] == 'Undisclosed recipients:;'
|
|
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost',
|
|
'bar@localhost', 'baz@localhost']
|
|
|
|
# submitter as recipient, no known email address
|
|
emails.empty()
|
|
item.to = ['_submitter']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
# submitter as recipient, known email address
|
|
emails.empty()
|
|
formdata.user_id = user.id
|
|
formdata.store()
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['email_rcpt'] == ['zorg@localhost']
|
|
|
|
# computed recipient
|
|
emails.empty()
|
|
item.to = ['=email']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['email_rcpt'] == ['sub@localhost']
|
|
|
|
# string as recipient
|
|
emails.empty()
|
|
item.to = 'xyz@localhost'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
|
|
|
|
# invalid recipient
|
|
emails.empty()
|
|
item.to = ['=foobar']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
# empty recipient
|
|
emails.empty()
|
|
item.to = ['=None']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
def test_webservice_call(pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'POST'
|
|
payload = json.loads(http_requests.get_last('body'))
|
|
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
|
|
assert payload['display_id'] == formdata.get_display_id()
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post = False
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'GET'
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post = False
|
|
item.post_data = {'str': 'abcd', 'one': '=1',
|
|
'evalme': '=form_number', 'error':'=1=3'}
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'POST'
|
|
payload = json.loads(http_requests.get_last('body'))
|
|
assert payload == {'one': 1, 'str': 'abcd', 'evalme': formdata.get_display_id()}
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post_data = {'str': 'abcd', 'one': '=1', 'decimal': '=Decimal(2)',
|
|
'evalme': '=form_number', 'error':'=1=3'}
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'POST'
|
|
payload = json.loads(http_requests.get_last('body'))
|
|
assert payload['extra'] == {'one': 1, 'str': 'abcd',
|
|
'decimal': '2', 'evalme': formdata.get_display_id()}
|
|
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
|
|
assert payload['display_id'] == formdata.get_display_id()
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json'
|
|
item.post = False
|
|
item.varname = 'xxx'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data['xxx_status'] == 200
|
|
assert formdata.workflow_data['xxx_response'] == {'foo': 'bar'}
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post = False
|
|
item.request_signature_key = 'xxx'
|
|
item.perform(formdata)
|
|
assert 'signature=' in http_requests.get_last('url')
|
|
assert http_requests.get_last('method') == 'GET'
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post = False
|
|
item.request_signature_key = '[empty]'
|
|
item.perform(formdata)
|
|
assert not 'signature=' in http_requests.get_last('url')
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post = False
|
|
item.request_signature_key = '[bar]'
|
|
item.perform(formdata)
|
|
assert 'signature=' in http_requests.get_last('url')
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/204'
|
|
item.post = False
|
|
item.varname = 'xxx'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 204
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
assert 'xxx_error_response' not in formdata.workflow_data
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/404'
|
|
item.post = False
|
|
item.varname = 'xxx'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 404
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
assert 'xxx_error_response' not in formdata.workflow_data
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/404-json'
|
|
item.post = False
|
|
item.varname = 'xxx'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 404
|
|
assert formdata.workflow_data.get('xxx_error_response') == {'err': 1}
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/404'
|
|
item.action_on_4xx = ':pass'
|
|
item.post = False
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/500'
|
|
item.post = False
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/500'
|
|
item.action_on_5xx = ':pass'
|
|
item.post = False
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/500'
|
|
item.action_on_5xx = '4' # jump to status
|
|
item.post = False
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.status == 'wf-4'
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.post = False
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.post = False
|
|
item.varname = 'xxx'
|
|
item.action_on_bad_data = ':stop'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 200
|
|
assert 'xxx_response' not in formdata.workflow_data
|
|
assert 'xxx_error_response' not in formdata.workflow_data
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/404'
|
|
item.post = False
|
|
item.record_errors = True
|
|
item.action_on_4xx = ':stop'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].parts[-1].summary == '404 whatever'
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.post = False
|
|
item.varname = 'xxx'
|
|
item.action_on_bad_data = ':stop'
|
|
item.record_errors = True
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].parts[-1].summary == 'ValueError: No JSON object could be decoded\n'
|
|
assert formdata.workflow_data.get('xxx_status') == 200
|
|
assert formdata.workflow_data.get('xxx_time')
|
|
assert 'xxx_error_response' not in formdata.workflow_data
|
|
formdata.workflow_data = None
|
|
|
|
# check storing response as attachment
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.post = False
|
|
item.varname = 'xxx'
|
|
item.response_type = 'attachment'
|
|
item.record_errors = True
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 200
|
|
assert formdata.workflow_data.get('xxx_content_type') == 'text/xml'
|
|
attachment = formdata.evolution[-1].parts[-1]
|
|
assert isinstance(attachment, AttachmentEvolutionPart)
|
|
assert attachment.base_filename == 'xxx.xml'
|
|
assert attachment.content_type == 'text/xml'
|
|
attachment.fp.seek(0)
|
|
assert attachment.fp.read(5) == '<?xml'
|
|
formdata.workflow_data = None
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/400-json'
|
|
item.post = False
|
|
item.record_errors = True
|
|
item.action_on_4xx = ':stop'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
rendered = formdata.evolution[-1].parts[-1].view()
|
|
assert not rendered # empty if not in backoffice
|
|
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': '/backoffice/'})
|
|
pub._set_request(req)
|
|
rendered = formdata.evolution[-1].parts[-1].view()
|
|
assert 'Error during webservice call' in str(rendered)
|
|
assert 'Error Code: 1' in str(rendered)
|
|
assert 'Error Description: :(' in str(rendered)
|
|
|
|
item.label = 'do that'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
rendered = formdata.evolution[-1].parts[-1].view()
|
|
assert 'Error during webservice call "do that"' in str(rendered)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.method = 'GET'
|
|
item.post = False
|
|
item.url = 'http://remote.example.net?in_url=1'
|
|
item.qs_data = {
|
|
'str': 'abcd',
|
|
'one': '=1',
|
|
'evalme': '=form_number',
|
|
'error': '=1=3',
|
|
'in_url': '2',
|
|
}
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('method') == 'GET'
|
|
qs = urlparse.parse_qs(http_requests.get_last('url').split('?')[1])
|
|
assert set(qs.keys()) == set(['in_url', 'str', 'one', 'evalme'])
|
|
assert qs['in_url'] == ['1', '2']
|
|
assert qs['one'] == ['1']
|
|
assert qs['evalme'] == [formdata.get_display_id()]
|
|
assert qs['str'] == ['abcd']
|
|
|
|
|
|
def test_timeout(pub):
|
|
workflow = Workflow(name='timeout')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = ['_submitter', '_receiver']
|
|
jump.timeout = 0.1
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
assert formdef.get_workflow().id == workflow.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
formdata_id = formdata.id
|
|
|
|
time.sleep(0.3)
|
|
_apply_timeouts(pub)
|
|
|
|
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
|
|
|
|
def test_timeout_then_remove(two_pubs):
|
|
workflow = Workflow(name='timeout-then-remove')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = ['_submitter', '_receiver']
|
|
jump.timeout = 0.1
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
remove = RemoveWorkflowStatusItem()
|
|
st2.items.append(remove)
|
|
remove.parent = st2
|
|
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz%s' % id(two_pubs)
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
assert formdef.get_workflow().id == workflow.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
formdata_id = formdata.id
|
|
|
|
assert str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
|
|
|
|
time.sleep(0.2)
|
|
_apply_timeouts(two_pubs)
|
|
|
|
assert not str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
|
|
|
|
def test_sms(pub):
|
|
pub.cfg['sms'] = {'mode': 'xxx'}
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = SendSMSWorkflowStatusItem()
|
|
item.to = ['000']
|
|
item.body = 'XXX'
|
|
item.perform(formdata) # nothing
|
|
assert sms_mocking.sms[-1]['destinations'] == ['000']
|
|
assert sms_mocking.sms[-1]['text'] == 'XXX'
|
|
sms_mocking.empty()
|
|
|
|
# check None as recipient is not passed to the SMS backend
|
|
item.to = [None]
|
|
item.perform(formdata) # nothing
|
|
assert len(sms_mocking.sms) == 0
|
|
|
|
item.to = ['000', None]
|
|
item.perform(formdata) # nothing
|
|
assert sms_mocking.sms[-1]['destinations'] == ['000']
|
|
assert sms_mocking.sms[-1]['text'] == 'XXX'
|
|
|
|
|
|
def test_display_form(two_pubs):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
display_form = FormWorkflowStatusItem()
|
|
display_form.id = '_x'
|
|
display_form.varname = 'xxx'
|
|
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
|
display_form.formdef.fields.append(StringField(id='1', label='Test',
|
|
type='string'))
|
|
display_form.formdef.fields.append(DateField(id='2', label='Date',
|
|
type='date', varname='date'))
|
|
st1.items.append(display_form)
|
|
display_form.parent = st1
|
|
|
|
form = Form(action='#')
|
|
display_form.fill_form(form, formdata, None)
|
|
assert form.widgets[0].title == 'Test'
|
|
assert form.widgets[1].title == 'Date'
|
|
|
|
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '2015-05-12', 'submit': 'submit'}
|
|
display_form.submit_form(form, formdata, None, None)
|
|
|
|
assert formdata.get_substitution_variables()['xxx_var_date'] == '2015-05-12'
|
|
|
|
two_pubs.cfg['language'] = {'language': 'fr'}
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
form = Form(action='#')
|
|
display_form.fill_form(form, formdata, None)
|
|
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '12/05/2015', 'submit': 'submit'}
|
|
display_form.submit_form(form, formdata, None, None)
|
|
assert formdata.get_substitution_variables()['xxx_var_date'] == '12/05/2015'
|
|
|
|
assert formdata.get_substitution_variables()['xxx_var_date_raw'] == \
|
|
time.strptime('2015-05-12', '%Y-%m-%d')
|
|
|
|
two_pubs.cfg['language'] = {'language': 'en'}
|
|
|
|
def test_workflow_role_type_migration(pub):
|
|
workflow = Workflow(name='role migration')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = [1, 2]
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
workflow.store()
|
|
|
|
reloaded_workflow = Workflow.get(workflow.id)
|
|
assert reloaded_workflow.possible_status[0].items[0].by == ['1', '2']
|
|
|
|
def test_workflow_display_message(pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
workflow = Workflow(name='display message')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
formdata.id = '1'
|
|
|
|
display_message = DisplayMessageWorkflowStatusItem()
|
|
display_message.parent = st1
|
|
|
|
display_message.message = 'test'
|
|
assert display_message.get_message(formdata) == 'test'
|
|
|
|
display_message.message = '[number]'
|
|
assert display_message.get_message(formdata) == str(formdata.id)
|
|
|
|
display_message.message = '[bar]'
|
|
assert display_message.get_message(formdata) == 'Foobar'
|
|
|
|
# makes sure the string is correctly escaped for HTML
|
|
display_message.message = '[foo]'
|
|
assert display_message.get_message(formdata) == '1 < 3'
|
|
|
|
def test_workflow_display_message_to(pub):
|
|
workflow = Workflow(name='display message to')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
|
|
role = Role(name='foorole')
|
|
role.store()
|
|
role2 = Role(name='no-one-role')
|
|
role2.store()
|
|
user = pub.user_class(name='baruser')
|
|
user.roles = []
|
|
user.store()
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.url_name = 'foobar'
|
|
formdef._workflow = workflow
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.status = 'wf-st1'
|
|
|
|
display_message = DisplayMessageWorkflowStatusItem()
|
|
display_message.parent = st1
|
|
st1.items.append(display_message)
|
|
|
|
display_message.message = 'all'
|
|
display_message.to = None
|
|
assert display_message.get_message(formdata) == 'all'
|
|
assert formdata.get_workflow_messages() == ['all']
|
|
|
|
display_message.message = 'to-role'
|
|
display_message.to = [role.id]
|
|
assert display_message.get_message(formdata) == ''
|
|
assert formdata.get_workflow_messages() == []
|
|
|
|
pub._request.user = user
|
|
display_message.message = 'to-role'
|
|
display_message.to = [role.id]
|
|
assert display_message.get_message(formdata) == ''
|
|
assert formdata.get_workflow_messages() == []
|
|
user.roles = [role.id]
|
|
assert display_message.get_message(formdata) == 'to-role'
|
|
assert formdata.get_workflow_messages() == ['to-role']
|
|
|
|
user.roles = []
|
|
display_message.message = 'to-submitter'
|
|
display_message.to = ['_submitter']
|
|
assert display_message.get_message(formdata) == ''
|
|
assert formdata.get_workflow_messages() == []
|
|
formdata.user_id = user.id
|
|
assert display_message.get_message(formdata) == 'to-submitter'
|
|
assert formdata.get_workflow_messages() == ['to-submitter']
|
|
|
|
display_message.message = 'to-role-or-submitter'
|
|
display_message.to = [role.id, '_submitter']
|
|
assert display_message.get_message(formdata) == 'to-role-or-submitter'
|
|
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
|
|
formdata.user_id = None
|
|
assert display_message.get_message(formdata) == ''
|
|
assert formdata.get_workflow_messages() == []
|
|
user.roles = [role.id]
|
|
assert display_message.get_message(formdata) == 'to-role-or-submitter'
|
|
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
|
|
formdata.user_id = user.id
|
|
assert display_message.get_message(formdata) == 'to-role-or-submitter'
|
|
assert formdata.get_workflow_messages() == ['to-role-or-submitter']
|
|
|
|
display_message.to = [role2.id]
|
|
assert display_message.get_message(formdata) == ''
|
|
assert formdata.get_workflow_messages() == []
|
|
|
|
display_message.message = 'd1'
|
|
display_message2 = DisplayMessageWorkflowStatusItem()
|
|
display_message2.parent = st1
|
|
st1.items.append(display_message2)
|
|
display_message2.message = 'd2'
|
|
display_message2.to = [role.id, '_submitter']
|
|
assert formdata.get_workflow_messages() == ['d2']
|
|
user.roles = [role.id, role2.id]
|
|
assert 'd1' in formdata.get_workflow_messages()
|
|
assert 'd2' in formdata.get_workflow_messages()
|
|
|
|
def test_workflow_roles(pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
user = pub.user_class(name='foo')
|
|
user.email = 'zorg@localhost'
|
|
user.store()
|
|
|
|
Role.wipe()
|
|
role1 = Role(name='foo')
|
|
role1.emails = ['foo@localhost']
|
|
role1.store()
|
|
|
|
role2 = Role(name='bar')
|
|
role2.emails = ['bar@localhost', 'baz@localhost']
|
|
role2.store()
|
|
|
|
workflow = Workflow(name='wf roles')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
item = SendmailWorkflowStatusItem()
|
|
item.to = ['_receiver', '_other']
|
|
item.subject = 'Foobar'
|
|
item.body = 'Hello'
|
|
st1.items.append(item)
|
|
item.parent = st1
|
|
workflow.roles['_other'] = 'Other Function'
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_roles = {'_receiver': role1.id, '_other': role2.id}
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
emails.empty()
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('Foobar')
|
|
assert set(emails.get('Foobar')['email_rcpt']) == set(
|
|
['foo@localhost', 'bar@localhost', 'baz@localhost'])
|
|
|
|
workflow.roles['_slug-with-dash'] = 'Dashed Function'
|
|
workflow.store()
|
|
formdef.workflow_roles['_slug-with-dash'] = role1.id
|
|
formdef.store()
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars.get('form_role_other_name') == 'bar'
|
|
assert substvars.get('form_role_slug_with_dash_name') == 'foo'
|
|
|
|
def test_criticality(pub):
|
|
FormDef.wipe()
|
|
|
|
workflow = Workflow(name='criticality')
|
|
workflow.criticality_levels = [
|
|
WorkflowCriticalityLevel(name='green'),
|
|
WorkflowCriticalityLevel(name='yellow'),
|
|
WorkflowCriticalityLevel(name='red'),
|
|
]
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
item = ModifyCriticalityWorkflowStatusItem()
|
|
|
|
formdata = formdef.data_class()()
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'yellow'
|
|
|
|
formdata = formdef.data_class()()
|
|
item.mode = MODE_INC
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'yellow'
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'red'
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'red'
|
|
|
|
item.mode = MODE_DEC
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'yellow'
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'green'
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'green'
|
|
|
|
item.mode = MODE_SET
|
|
item.absolute_value = 2
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'red'
|
|
item.absolute_value = 0
|
|
item.perform(formdata)
|
|
assert formdata.get_criticality_level_object().name == 'green'
|
|
|
|
def test_geolocate_address(pub):
|
|
formdef = FormDef()
|
|
formdef.geolocations = {'base': 'bla'}
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
StringField(id='1', label='String', type='string', varname='string'),
|
|
]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'1': '169 rue du chateau'}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = GeolocateWorkflowStatusItem()
|
|
item.method = 'address_string'
|
|
item.address_string = '[form_var_string], paris, france'
|
|
|
|
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
|
http_get_page.return_value = (None, 200,
|
|
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
|
|
item.perform(formdata)
|
|
assert urllib2.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
|
|
assert int(formdata.geolocations['base']['lat']) == 48
|
|
assert int(formdata.geolocations['base']['lon']) == 2
|
|
|
|
# check for invalid ezt
|
|
item.address_string = '[if-any], paris, france'
|
|
formdata.geolocations = None
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
# check for nominatim server error
|
|
formdata.geolocations = None
|
|
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
|
http_get_page.return_value = (None, 500,
|
|
json.dumps([{'lat':'48.8337085','lon':'2.3233693'}]), None)
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
# check for nominatim returning an empty result set
|
|
formdata.geolocations = None
|
|
with mock.patch('wcs.wf.geolocate.http_get_page') as http_get_page:
|
|
http_get_page.return_value = (None, 200, json.dumps([]), None)
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
def test_geolocate_image(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.geolocations = {'base': 'bla'}
|
|
formdef.fields = [
|
|
FileField(id='3', label='File', type='file', varname='file'),
|
|
]
|
|
formdef.store()
|
|
|
|
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
|
upload.receive([open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg')).read()])
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'3': upload}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = GeolocateWorkflowStatusItem()
|
|
item.method = 'photo_variable'
|
|
|
|
item.photo_variable = '=form_var_file_raw'
|
|
item.perform(formdata)
|
|
assert int(formdata.geolocations['base']['lat']) == -1
|
|
assert int(formdata.geolocations['base']['lon']) == 6
|
|
|
|
# invalid expression
|
|
formdata.geolocations = None
|
|
item.photo_variable = '=1/0'
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
# invalid type
|
|
formdata.geolocations = None
|
|
item.photo_variable = '="bla"'
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
# invalid photo
|
|
upload = PicklableUpload('test.jpeg', 'image/jpeg')
|
|
upload.receive([open(os.path.join(os.path.dirname(__file__), 'template.odt')).read()])
|
|
formdata.data = {'3': upload}
|
|
formdata.geolocations = None
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
def test_geolocate_map(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.geolocations = {'base': 'bla'}
|
|
formdef.fields = [
|
|
MapField(id='2', label='Map', type='map', varname='map'),
|
|
]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'2': '48.8337085;2.3233693'}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = GeolocateWorkflowStatusItem()
|
|
item.method = 'map_variable'
|
|
item.map_variable = '=form_var_map'
|
|
|
|
item.perform(formdata)
|
|
assert int(formdata.geolocations['base']['lat']) == 48
|
|
assert int(formdata.geolocations['base']['lon']) == 2
|
|
|
|
# invalid data
|
|
formdata.geolocations = None
|
|
item.map_variable = '=form_var'
|
|
item.perform(formdata)
|
|
assert formdata.geolocations == {}
|
|
|
|
def test_geolocate_overwrite(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.geolocations = {'base': 'bla'}
|
|
formdef.fields = [
|
|
MapField(id='2', label='Map', type='map', varname='map'),
|
|
]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'2': '48.8337085;2.3233693'}
|
|
formdata.just_created()
|
|
formdata.store()
|
|
pub.substitutions.feed(formdata)
|
|
|
|
item = GeolocateWorkflowStatusItem()
|
|
item.method = 'map_variable'
|
|
item.map_variable = '=form_var_map'
|
|
|
|
item.perform(formdata)
|
|
assert int(formdata.geolocations['base']['lat']) == 48
|
|
assert int(formdata.geolocations['base']['lon']) == 2
|
|
|
|
formdata.data = {'2': '48.8337085;3.3233693'}
|
|
item.perform(formdata)
|
|
assert int(formdata.geolocations['base']['lat']) == 48
|
|
assert int(formdata.geolocations['base']['lon']) == 3
|
|
|
|
formdata.data = {'2': '48.8337085;4.3233693'}
|
|
item.overwrite = False
|
|
item.perform(formdata)
|
|
assert int(formdata.geolocations['base']['lat']) == 48
|
|
assert int(formdata.geolocations['base']['lon']) == 3
|
|
|
|
@pytest.mark.skipif(transform_to_pdf is None, reason='libreoffice not found')
|
|
def test_transform_to_pdf():
|
|
instream = open(os.path.join(os.path.dirname(__file__), 'template.odt'))
|
|
outstream = transform_to_pdf(instream)
|
|
assert outstream is not False
|
|
assert outstream.read(10).startswith('%PDF-')
|
|
|
|
def test_global_timeouts(pub):
|
|
FormDef.wipe()
|
|
Workflow.wipe()
|
|
|
|
workflow = Workflow(name='global-timeouts')
|
|
workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
|
|
workflow.criticality_levels = [
|
|
WorkflowCriticalityLevel(name='green'),
|
|
WorkflowCriticalityLevel(name='yellow'),
|
|
WorkflowCriticalityLevel(name='red'),
|
|
]
|
|
action = workflow.add_global_action('Timeout Test')
|
|
item = action.append_item('modify_criticality')
|
|
trigger = action.append_trigger('timeout')
|
|
trigger.anchor = 'creation'
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
formdata1 = formdef.data_class()()
|
|
formdata1.just_created()
|
|
formdata1.store()
|
|
|
|
# delay isn't set yet, no crash
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
|
|
# delay didn't expire yet, no change
|
|
trigger.timeout = '2'
|
|
workflow.store()
|
|
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
|
|
formdata1.receipt_time = time.localtime(time.time()-3*86400)
|
|
formdata1.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
|
|
# make sure it's not triggered a second time
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
|
|
# change id so it's triggered again
|
|
trigger.id = 'XXX1'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'red'
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'red'
|
|
|
|
# reset formdata to initial state
|
|
formdata1.store()
|
|
|
|
trigger.anchor = '1st-arrival'
|
|
trigger.anchor_status_first = None
|
|
workflow.store()
|
|
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
|
|
formdata1.evolution[-1].time = time.localtime(time.time()-3*86400)
|
|
formdata1.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
|
|
formdata1.store() # reset
|
|
trigger.anchor = 'latest-arrival'
|
|
trigger.anchor_status_latest = None
|
|
workflow.store()
|
|
|
|
formdata1.evolution[-1].time = time.localtime()
|
|
formdata1.store()
|
|
formdata1.jump_status('new')
|
|
formdata1.evolution[-1].time = time.localtime(time.time()-7*86400)
|
|
formdata1.jump_status('accepted')
|
|
formdata1.jump_status('new')
|
|
formdata1.evolution[-1].time = time.localtime(time.time()-1*86400)
|
|
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
|
|
formdata1.evolution[-1].time = time.localtime(time.time()-4*86400)
|
|
formdata1.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
formdata1.store()
|
|
|
|
# limit trigger to formdata with "accepted" status
|
|
trigger.anchor_status_latest = 'wf-accepted'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
formdata1.store()
|
|
|
|
# limit trigger to formdata with "new" status
|
|
trigger.anchor_status_latest = 'wf-new'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
formdata1.store()
|
|
|
|
# use python expression as anchor
|
|
# timestamp
|
|
trigger.anchor = 'python'
|
|
trigger.anchor_expression = repr(time.time())
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
|
|
trigger.anchor = 'python'
|
|
trigger.anchor_expression = repr(time.time() - 10*86400)
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
formdata1.store()
|
|
|
|
# datetime object
|
|
trigger.anchor = 'python'
|
|
trigger.anchor_expression = 'datetime.datetime(%s, %s, %s, %s, %s)' % (
|
|
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:5]
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
formdata1.store()
|
|
|
|
# string object
|
|
trigger.anchor = 'python'
|
|
trigger.anchor_expression = '"%04d-%02d-%02d"' % (
|
|
datetime.datetime.now() - datetime.timedelta(days=10)).timetuple()[:3]
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'yellow'
|
|
formdata1.store()
|
|
|
|
# invalid variable
|
|
trigger.anchor = 'python'
|
|
trigger.anchor_expression = 'Ellipsis'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
formdata1.store()
|
|
|
|
# invalid expression
|
|
trigger.anchor = 'python'
|
|
trigger.anchor_expression = 'XXX'
|
|
workflow.store()
|
|
pub.apply_global_action_timeouts()
|
|
assert formdef.data_class().get(formdata1.id).get_criticality_level_object().name == 'green'
|
|
formdata1.store()
|
|
|
|
def test_profile(pub):
|
|
user = pub.user_class()
|
|
user.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = [
|
|
StringField(id='1', label='Test', type='string', varname='foo'),
|
|
]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.user_id = user.id
|
|
formdata.data = {'1': 'bar@localhost'}
|
|
|
|
item = UpdateUserProfileStatusItem()
|
|
item.fields = [{'field_id': '__email', 'value': '=form_var_foo'}]
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).email == 'bar@localhost'
|
|
|
|
formdata.data = {'1': 'Plop'}
|
|
item.fields = [{'field_id': '__name', 'value': '=form_var_foo'}]
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).name == 'Plop'
|
|
|
|
from wcs.admin.settings import UserFieldsFormDef
|
|
formdef = UserFieldsFormDef(pub)
|
|
formdef.fields = [StringField(id='3', label='test', type='string', varname='plop')]
|
|
formdef.store()
|
|
|
|
item.fields = [{'field_id': 'plop', 'value': '=form_var_foo'}]
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).form_data == {'3': 'Plop'}
|