wcs/tests/test_workflows.py

775 lines
23 KiB
Python

import datetime
import pytest
import shutil
import time
from quixote import cleanup, get_response
from wcs.qommon.http_request import HTTPRequest
from qommon.form import *
from wcs.formdef import FormDef
from wcs import sessions
from wcs.fields import StringField, DateField
from wcs.roles import Role
from wcs.workflows import (Workflow, WorkflowStatusItem,
SendmailWorkflowStatusItem, SendSMSWorkflowStatusItem,
DisplayMessageWorkflowStatusItem,
AbortActionException)
from wcs.wf.anonymise import AnonymiseWorkflowStatusItem
from wcs.wf.dispatch import DispatchWorkflowStatusItem
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
from wcs.wf.remove import RemoveWorkflowStatusItem
from wcs.wf.roles import AddRoleWorkflowStatusItem, RemoveRoleWorkflowStatusItem
from wcs.wf.wscall import WebserviceCallStatusItem
from utilities import (create_temporary_pub, MockSubstitutionVariables, emails,
http_requests, clean_temporary_pub, sms_mocking)
def setup_module(module):
cleanup()
def teardown_module(module):
clean_temporary_pub()
def pytest_generate_tests(metafunc):
if 'two_pubs' in metafunc.fixturenames:
metafunc.parametrize('two_pubs', ['pickle', 'sql'], indirect=True)
@pytest.fixture
def pub(request):
pub = create_temporary_pub()
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
req.response.filter = {}
req.user = None
pub._set_request(req)
req.session = sessions.BasicSession(id=1)
return pub
@pytest.fixture
def two_pubs(request):
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
req.response.filter = {}
req.user = None
pub._set_request(req)
req.session = sessions.BasicSession(id=1)
return pub
def test_jump_nothing(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.store()
formdata = formdef.data_class()()
item = JumpWorkflowStatusItem()
assert item.must_jump(formdata) is True
def test_jump_datetime_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.store()
formdata = formdef.data_class()()
item = JumpWorkflowStatusItem()
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
item.condition = 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
yesterday.timetuple()[:3]
assert item.must_jump(formdata) is True
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
item.condition = 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
tomorrow.timetuple()[:3]
assert item.must_jump(formdata) is False
def test_jump_count_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.store()
pub.substitutions.feed(formdef)
formdata = formdef.data_class()()
item = JumpWorkflowStatusItem()
item.condition = 'form_objects.count < 2'
assert item.must_jump(formdata) is True
for i in range(10):
formdata = formdef.data_class()()
formdata.store()
item.condition = 'form_objects.count < 2'
assert item.must_jump(formdata) is False
def test_check_auth(pub):
user = pub.user_class(name='foo')
user.store()
role = Role(name='bar1')
role.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
status_item = WorkflowStatusItem()
assert status_item.check_auth(formdata, user) is True
status_item.by = []
assert status_item.check_auth(formdata, user) is False
status_item.by = ['logged-users']
assert status_item.check_auth(formdata, user) is True
status_item.by = [role.id]
assert status_item.check_auth(formdata, user) is False
status_item.by = [int(role.id)]
assert status_item.check_auth(formdata, user) is False
user.roles = [role.id]
status_item.by = [role.id]
assert status_item.check_auth(formdata, user) is True
status_item.by = [int(role.id)]
assert status_item.check_auth(formdata, user) is True
status_item.by = ['_submitter']
assert status_item.check_auth(formdata, user) is False
formdata.user_id = user.id
assert status_item.check_auth(formdata, user) is True
formdata.user_id = None
status_item.by = ['_receiver']
assert status_item.check_auth(formdata, user) is False
formdata.workflow_roles = {'_receiver': user.id}
assert status_item.check_auth(formdata, user) is True
formdef.workflow_roles = {'_receiver': user.id}
formdata.workflow_roles = None
assert status_item.check_auth(formdata, user) is True
def test_dispatch(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
item = DispatchWorkflowStatusItem()
formdata = formdef.data_class()()
item.perform(formdata)
assert not formdata.workflow_roles
formdata = formdef.data_class()()
item.role_key = '_receiver'
item.role_id = '1'
item.perform(formdata)
assert formdata.workflow_roles == {'_receiver': '1'}
def test_roles(pub):
user = pub.user_class()
user.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
formdata.user_id = user.id
item = AddRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
item.role_id = '1'
item.perform(formdata)
assert pub.user_class.get(user.id).roles == ['1']
user.roles = None
user.store()
item = RemoveRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
item.role_id = '1'
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
user.roles = ['1']
user.store()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
user.roles = ['2', '1']
user.store()
item.perform(formdata)
assert pub.user_class.get(user.id).roles == ['2']
def test_anonymise(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.user_id = '1'
formdata.store()
item = AnonymiseWorkflowStatusItem()
item.perform(formdata)
assert formdef.data_class().get(formdata.id).user_id is None
assert formdef.data_class().get(formdata.id).anonymised
def test_remove(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
formdata.store()
item = RemoveWorkflowStatusItem()
assert formdef.data_class().has_key(formdata.id)
assert item.perform(formdata) == 'http://example.net'
assert not formdef.data_class().has_key(formdata.id)
formdata = formdef.data_class()()
formdata.store()
item = RemoveWorkflowStatusItem()
req = pub.get_request()
req.response.filter['in_backoffice'] = True
assert formdef.data_class().has_key(formdata.id)
assert item.perform(formdata) == '..'
assert not formdef.data_class().has_key(formdata.id)
req.response.filter = {}
assert req.session.message
def test_register_comment(pub):
pub.substitutions.feed(MockSubstitutionVariables())
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = RegisterCommenterWorkflowStatusItem()
item.perform(formdata)
assert formdata.evolution[-1].display_parts()[-1] == ''
item.comment = 'Hello world'
item.perform(formdata)
assert formdata.evolution[-1].display_parts()[-1] == '<p>Hello world</p>'
item.comment = '<div>Hello world</div>'
item.perform(formdata)
assert formdata.evolution[-1].display_parts()[-1] == '<div>Hello world</div>'
item.comment = '[test]'
item.perform(formdata)
assert formdata.evolution[-1].display_parts()[-1] == '<p>[test]</p>'
item.comment = '[bar]'
item.perform(formdata)
assert formdata.evolution[-1].display_parts()[-1] == '<p>Foobar</p>'
item.comment = '[foo]'
item.perform(formdata)
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 &lt; 3</p>'
item.comment = '<div>[foo]</div>'
item.perform(formdata)
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 &lt; 3</div>'
def test_email(pub):
pub.substitutions.feed(MockSubstitutionVariables())
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
user = pub.user_class(name='foo')
user.email = 'zorg@localhost'
user.store()
Role.wipe()
role1 = Role(name='foo')
role1.emails = ['foo@localhost']
role1.store()
role2 = Role(name='bar')
role2.emails = ['bar@localhost', 'baz@localhost']
role2.store()
emails.empty()
# send using an uncompleted element
item = SendmailWorkflowStatusItem()
item.perform(formdata) # nothing
get_response().process_after_jobs()
assert emails.count() == 0
item.to = [role1.id]
item.perform(formdata) # no subject nor body
get_response().process_after_jobs()
assert emails.count() == 0
item.subject = 'foobar'
item.perform(formdata) # no body
get_response().process_after_jobs()
assert emails.count() == 0
# send for real
item.body = 'baz'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost']
# two recipients
emails.empty()
item.to = [role1.id, role2.id]
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['to'] == 'Undisclosed recipients:;'
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost',
'bar@localhost', 'baz@localhost']
# submitter as recipient, no known email address
emails.empty()
item.to = ['_submitter']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 0
# submitter as recipient, known email address
emails.empty()
formdata.user_id = user.id
formdata.store()
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['zorg@localhost']
# computed recipient
emails.empty()
item.to = ['=email']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['sub@localhost']
# string as recipient
emails.empty()
item.to = 'xyz@localhost'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
def test_webservice_call(pub):
pub.substitutions.feed(MockSubstitutionVariables())
FormDef.wipe()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'POST'
payload = json.loads(http_requests.get_last('body'))
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
assert int(payload['display_id']) == 1
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.post = False
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'GET'
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.post = False
item.post_data = {'str': 'abcd', 'one': '=1',
'evalme': '=form_number', 'error':'=1=3'}
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'POST'
payload = json.loads(http_requests.get_last('body'))
assert payload == {'one': 1, 'str': 'abcd', 'evalme': formdata.id}
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.post_data = {'str': 'abcd', 'one': '=1',
'evalme': '=form_number', 'error':'=1=3'}
pub.substitutions.feed(formdata)
item.perform(formdata)
assert http_requests.get_last('url') == 'http://remote.example.net'
assert http_requests.get_last('method') == 'POST'
payload = json.loads(http_requests.get_last('body'))
assert payload['extra'] == {'one': 1, 'str': 'abcd', 'evalme': formdata.id}
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
assert int(payload['display_id']) == 1
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/json'
item.post = False
item.varname = 'xxx'
item.perform(formdata)
assert formdata.workflow_data.get('xxx_status') == 200
assert formdata.workflow_data.get('xxx_response') == {'foo': 'bar'}
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.post = False
item.request_signature_key = 'xxx'
item.perform(formdata)
assert 'signature=' in http_requests.get_last('url')
assert http_requests.get_last('method') == 'GET'
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.post = False
item.request_signature_key = '[empty]'
item.perform(formdata)
assert not 'signature=' in http_requests.get_last('url')
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net'
item.post = False
item.request_signature_key = '[bar]'
item.perform(formdata)
assert 'signature=' in http_requests.get_last('url')
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/404'
item.post = False
with pytest.raises(AbortActionException):
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/404'
item.action_on_4xx = ':pass'
item.post = False
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/500'
item.post = False
with pytest.raises(AbortActionException):
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/500'
item.action_on_5xx = ':pass'
item.post = False
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/500'
item.action_on_5xx = '4' # jump to status
item.post = False
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.status == 'wf-4'
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.post = False
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.post = False
item.varname = 'xxx'
item.action_on_bad_data = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/404'
item.post = False
item.record_errors = True
item.action_on_4xx = ':stop'
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].summary == '404 whatever'
item = WebserviceCallStatusItem()
item.url = 'http://remote.example.net/xml'
item.post = False
item.varname = 'xxx'
item.action_on_bad_data = ':stop'
item.record_errors = True
with pytest.raises(AbortActionException):
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].summary == 'ValueError: No JSON object could be decoded\n'
def test_timeout(pub):
workflow = Workflow(name='timeout')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = workflow.id
assert formdef.get_workflow().id == workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdata_id = formdata.id
time.sleep(0.3)
_apply_timeouts(pub)
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
def test_timeout_then_remove(two_pubs):
workflow = Workflow(name='timeout-then-remove')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
remove = RemoveWorkflowStatusItem()
st2.items.append(remove)
remove.parent = st2
workflow.store()
formdef = FormDef()
formdef.name = 'baz%s' % id(two_pubs)
formdef.fields = []
formdef.workflow_id = workflow.id
assert formdef.get_workflow().id == workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdata_id = formdata.id
assert str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
time.sleep(0.2)
_apply_timeouts(two_pubs)
assert not str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
def test_sms(pub):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
item = SendSMSWorkflowStatusItem()
item.to = ['000']
item.body = 'XXX'
item.perform(formdata) # nothing
assert sms_mocking.sms[-1]['destinations'] == ['000']
assert sms_mocking.sms[-1]['text'] == 'XXX'
sms_mocking.empty()
# check None as recipient is not passed to the SMS backend
item.to = [None]
item.perform(formdata) # nothing
assert len(sms_mocking.sms) == 0
item.to = ['000', None]
item.perform(formdata) # nothing
assert sms_mocking.sms[-1]['destinations'] == ['000']
assert sms_mocking.sms[-1]['text'] == 'XXX'
def test_display_form(two_pubs):
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
display_form = FormWorkflowStatusItem()
display_form.id = '_x'
display_form.varname = 'xxx'
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields.append(StringField(id='1', label='Test',
type='string'))
display_form.formdef.fields.append(DateField(id='2', label='Date',
type='date', varname='date'))
st1.items.append(display_form)
display_form.parent = st1
form = Form(action='#')
display_form.fill_form(form, formdata, None)
assert form.widgets[0].title == 'Test'
assert form.widgets[1].title == 'Date'
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '2015-05-12', 'submit': 'submit'}
display_form.submit_form(form, formdata, None, None)
assert formdata.get_substitution_variables()['xxx_var_date'] == '2015-05-12'
two_pubs.cfg['language'] = {'language': 'fr'}
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
form = Form(action='#')
display_form.fill_form(form, formdata, None)
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '12/05/2015', 'submit': 'submit'}
display_form.submit_form(form, formdata, None, None)
assert formdata.get_substitution_variables()['xxx_var_date'] == '12/05/2015'
assert formdata.get_substitution_variables()['xxx_var_date_raw'] == \
time.strptime('2015-05-12', '%Y-%m-%d')
two_pubs.cfg['language'] = {'language': 'en'}
def test_workflow_role_type_migration(pub):
workflow = Workflow(name='role migration')
st1 = workflow.add_status('Status1', 'st1')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = [1, 2]
st1.items.append(jump)
jump.parent = st1
workflow.store()
reloaded_workflow = Workflow.get(workflow.id)
assert reloaded_workflow.possible_status[0].items[0].by == ['1', '2']
def test_workflow_display_message(pub):
pub.substitutions.feed(MockSubstitutionVariables())
workflow = Workflow(name='display message')
st1 = workflow.add_status('Status1', 'st1')
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = []
formdef.store()
formdata = formdef.data_class()()
formdata.id = '1'
display_message = DisplayMessageWorkflowStatusItem()
display_message.parent = st1
display_message.message = 'test'
assert display_message.get_message(formdata) == 'test'
display_message.message = '[number]'
assert display_message.get_message(formdata) == str(formdata.id)
display_message.message = '[bar]'
assert display_message.get_message(formdata) == 'Foobar'
# makes sure the string is correctly escaped for HTML
display_message.message = '[foo]'
assert display_message.get_message(formdata) == '1 &lt; 3'
def test_workflow_roles(pub):
pub.substitutions.feed(MockSubstitutionVariables())
user = pub.user_class(name='foo')
user.email = 'zorg@localhost'
user.store()
Role.wipe()
role1 = Role(name='foo')
role1.emails = ['foo@localhost']
role1.store()
role2 = Role(name='bar')
role2.emails = ['bar@localhost', 'baz@localhost']
role2.store()
workflow = Workflow(name='wf roles')
st1 = workflow.add_status('Status1', 'st1')
item = SendmailWorkflowStatusItem()
item.to = ['_receiver', '_other']
item.subject = 'Foobar'
item.body = 'Hello'
st1.items.append(item)
item.parent = st1
workflow.roles['_other'] = 'Other Function'
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_roles = {'_receiver': role1.id, '_other': role2.id}
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
emails.empty()
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert set(emails.get('Foobar')['email_rcpt']) == set(
['foo@localhost', 'bar@localhost', 'baz@localhost'])