775 lines
23 KiB
Python
775 lines
23 KiB
Python
import datetime
|
|
import pytest
|
|
import shutil
|
|
import time
|
|
|
|
from quixote import cleanup, get_response
|
|
from wcs.qommon.http_request import HTTPRequest
|
|
from qommon.form import *
|
|
|
|
from wcs.formdef import FormDef
|
|
from wcs import sessions
|
|
from wcs.fields import StringField, DateField
|
|
from wcs.roles import Role
|
|
from wcs.workflows import (Workflow, WorkflowStatusItem,
|
|
SendmailWorkflowStatusItem, SendSMSWorkflowStatusItem,
|
|
DisplayMessageWorkflowStatusItem,
|
|
AbortActionException)
|
|
from wcs.wf.anonymise import AnonymiseWorkflowStatusItem
|
|
from wcs.wf.dispatch import DispatchWorkflowStatusItem
|
|
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
|
|
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
|
|
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
|
|
from wcs.wf.remove import RemoveWorkflowStatusItem
|
|
from wcs.wf.roles import AddRoleWorkflowStatusItem, RemoveRoleWorkflowStatusItem
|
|
from wcs.wf.wscall import WebserviceCallStatusItem
|
|
|
|
from utilities import (create_temporary_pub, MockSubstitutionVariables, emails,
|
|
http_requests, clean_temporary_pub, sms_mocking)
|
|
|
|
def setup_module(module):
|
|
cleanup()
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
|
|
def pytest_generate_tests(metafunc):
|
|
if 'two_pubs' in metafunc.fixturenames:
|
|
metafunc.parametrize('two_pubs', ['pickle', 'sql'], indirect=True)
|
|
|
|
@pytest.fixture
|
|
def pub(request):
|
|
pub = create_temporary_pub()
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
|
|
req.response.filter = {}
|
|
req.user = None
|
|
pub._set_request(req)
|
|
req.session = sessions.BasicSession(id=1)
|
|
return pub
|
|
|
|
@pytest.fixture
|
|
def two_pubs(request):
|
|
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
|
|
req.response.filter = {}
|
|
req.user = None
|
|
pub._set_request(req)
|
|
req.session = sessions.BasicSession(id=1)
|
|
|
|
return pub
|
|
|
|
def test_jump_nothing(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
item = JumpWorkflowStatusItem()
|
|
assert item.must_jump(formdata) is True
|
|
|
|
def test_jump_datetime_condition(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
item = JumpWorkflowStatusItem()
|
|
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
|
item.condition = 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
|
|
yesterday.timetuple()[:3]
|
|
assert item.must_jump(formdata) is True
|
|
|
|
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
|
|
item.condition = 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
|
|
tomorrow.timetuple()[:3]
|
|
assert item.must_jump(formdata) is False
|
|
|
|
def test_jump_count_condition(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.store()
|
|
pub.substitutions.feed(formdef)
|
|
formdata = formdef.data_class()()
|
|
item = JumpWorkflowStatusItem()
|
|
item.condition = 'form_objects.count < 2'
|
|
assert item.must_jump(formdata) is True
|
|
|
|
for i in range(10):
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
|
|
item.condition = 'form_objects.count < 2'
|
|
assert item.must_jump(formdata) is False
|
|
|
|
def test_check_auth(pub):
|
|
user = pub.user_class(name='foo')
|
|
user.store()
|
|
|
|
role = Role(name='bar1')
|
|
role.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
|
|
status_item = WorkflowStatusItem()
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
status_item.by = []
|
|
assert status_item.check_auth(formdata, user) is False
|
|
|
|
status_item.by = ['logged-users']
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
status_item.by = [role.id]
|
|
assert status_item.check_auth(formdata, user) is False
|
|
status_item.by = [int(role.id)]
|
|
assert status_item.check_auth(formdata, user) is False
|
|
|
|
user.roles = [role.id]
|
|
status_item.by = [role.id]
|
|
assert status_item.check_auth(formdata, user) is True
|
|
status_item.by = [int(role.id)]
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
status_item.by = ['_submitter']
|
|
assert status_item.check_auth(formdata, user) is False
|
|
formdata.user_id = user.id
|
|
assert status_item.check_auth(formdata, user) is True
|
|
formdata.user_id = None
|
|
|
|
status_item.by = ['_receiver']
|
|
assert status_item.check_auth(formdata, user) is False
|
|
formdata.workflow_roles = {'_receiver': user.id}
|
|
assert status_item.check_auth(formdata, user) is True
|
|
formdef.workflow_roles = {'_receiver': user.id}
|
|
formdata.workflow_roles = None
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
def test_dispatch(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
item = DispatchWorkflowStatusItem()
|
|
|
|
formdata = formdef.data_class()()
|
|
item.perform(formdata)
|
|
assert not formdata.workflow_roles
|
|
|
|
formdata = formdef.data_class()()
|
|
item.role_key = '_receiver'
|
|
item.role_id = '1'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_roles == {'_receiver': '1'}
|
|
|
|
def test_roles(pub):
|
|
user = pub.user_class()
|
|
user.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.user_id = user.id
|
|
|
|
item = AddRoleWorkflowStatusItem()
|
|
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
item.role_id = '1'
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == ['1']
|
|
|
|
user.roles = None
|
|
user.store()
|
|
item = RemoveRoleWorkflowStatusItem()
|
|
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
item.role_id = '1'
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
user.roles = ['1']
|
|
user.store()
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
user.roles = ['2', '1']
|
|
user.store()
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == ['2']
|
|
|
|
def test_anonymise(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.user_id = '1'
|
|
formdata.store()
|
|
|
|
item = AnonymiseWorkflowStatusItem()
|
|
item.perform(formdata)
|
|
assert formdef.data_class().get(formdata.id).user_id is None
|
|
assert formdef.data_class().get(formdata.id).anonymised
|
|
|
|
def test_remove(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
|
|
item = RemoveWorkflowStatusItem()
|
|
assert formdef.data_class().has_key(formdata.id)
|
|
assert item.perform(formdata) == 'http://example.net'
|
|
assert not formdef.data_class().has_key(formdata.id)
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
|
|
item = RemoveWorkflowStatusItem()
|
|
req = pub.get_request()
|
|
req.response.filter['in_backoffice'] = True
|
|
assert formdef.data_class().has_key(formdata.id)
|
|
assert item.perform(formdata) == '..'
|
|
assert not formdef.data_class().has_key(formdata.id)
|
|
req.response.filter = {}
|
|
assert req.session.message
|
|
|
|
def test_register_comment(pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = RegisterCommenterWorkflowStatusItem()
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == ''
|
|
|
|
item.comment = 'Hello world'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>Hello world</p>'
|
|
|
|
item.comment = '<div>Hello world</div>'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div>Hello world</div>'
|
|
|
|
item.comment = '[test]'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>[test]</p>'
|
|
|
|
item.comment = '[bar]'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>Foobar</p>'
|
|
|
|
item.comment = '[foo]'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 < 3</p>'
|
|
|
|
item.comment = '<div>[foo]</div>'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
|
|
|
|
def test_email(pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
user = pub.user_class(name='foo')
|
|
user.email = 'zorg@localhost'
|
|
user.store()
|
|
|
|
Role.wipe()
|
|
role1 = Role(name='foo')
|
|
role1.emails = ['foo@localhost']
|
|
role1.store()
|
|
|
|
role2 = Role(name='bar')
|
|
role2.emails = ['bar@localhost', 'baz@localhost']
|
|
role2.store()
|
|
|
|
emails.empty()
|
|
# send using an uncompleted element
|
|
item = SendmailWorkflowStatusItem()
|
|
item.perform(formdata) # nothing
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
item.to = [role1.id]
|
|
item.perform(formdata) # no subject nor body
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
item.subject = 'foobar'
|
|
item.perform(formdata) # no body
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
# send for real
|
|
item.body = 'baz'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')
|
|
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost']
|
|
|
|
# two recipients
|
|
emails.empty()
|
|
item.to = [role1.id, role2.id]
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['to'] == 'Undisclosed recipients:;'
|
|
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost',
|
|
'bar@localhost', 'baz@localhost']
|
|
|
|
# submitter as recipient, no known email address
|
|
emails.empty()
|
|
item.to = ['_submitter']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 0
|
|
|
|
# submitter as recipient, known email address
|
|
emails.empty()
|
|
formdata.user_id = user.id
|
|
formdata.store()
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['email_rcpt'] == ['zorg@localhost']
|
|
|
|
# computed recipient
|
|
emails.empty()
|
|
item.to = ['=email']
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['email_rcpt'] == ['sub@localhost']
|
|
|
|
# string as recipient
|
|
emails.empty()
|
|
item.to = 'xyz@localhost'
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
|
|
|
|
def test_webservice_call(pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'POST'
|
|
payload = json.loads(http_requests.get_last('body'))
|
|
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
|
|
assert int(payload['display_id']) == 1
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post = False
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'GET'
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post = False
|
|
item.post_data = {'str': 'abcd', 'one': '=1',
|
|
'evalme': '=form_number', 'error':'=1=3'}
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'POST'
|
|
payload = json.loads(http_requests.get_last('body'))
|
|
assert payload == {'one': 1, 'str': 'abcd', 'evalme': formdata.id}
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post_data = {'str': 'abcd', 'one': '=1',
|
|
'evalme': '=form_number', 'error':'=1=3'}
|
|
pub.substitutions.feed(formdata)
|
|
item.perform(formdata)
|
|
assert http_requests.get_last('url') == 'http://remote.example.net'
|
|
assert http_requests.get_last('method') == 'POST'
|
|
payload = json.loads(http_requests.get_last('body'))
|
|
assert payload['extra'] == {'one': 1, 'str': 'abcd', 'evalme': formdata.id}
|
|
assert payload['url'] == 'http://example.net/baz/%s/' % formdata.id
|
|
assert int(payload['display_id']) == 1
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/json'
|
|
item.post = False
|
|
item.varname = 'xxx'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_data.get('xxx_status') == 200
|
|
assert formdata.workflow_data.get('xxx_response') == {'foo': 'bar'}
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post = False
|
|
item.request_signature_key = 'xxx'
|
|
item.perform(formdata)
|
|
assert 'signature=' in http_requests.get_last('url')
|
|
assert http_requests.get_last('method') == 'GET'
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post = False
|
|
item.request_signature_key = '[empty]'
|
|
item.perform(formdata)
|
|
assert not 'signature=' in http_requests.get_last('url')
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net'
|
|
item.post = False
|
|
item.request_signature_key = '[bar]'
|
|
item.perform(formdata)
|
|
assert 'signature=' in http_requests.get_last('url')
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/404'
|
|
item.post = False
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/404'
|
|
item.action_on_4xx = ':pass'
|
|
item.post = False
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/500'
|
|
item.post = False
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/500'
|
|
item.action_on_5xx = ':pass'
|
|
item.post = False
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/500'
|
|
item.action_on_5xx = '4' # jump to status
|
|
item.post = False
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.status == 'wf-4'
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.post = False
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.post = False
|
|
item.varname = 'xxx'
|
|
item.action_on_bad_data = ':stop'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/404'
|
|
item.post = False
|
|
item.record_errors = True
|
|
item.action_on_4xx = ':stop'
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].parts[-1].summary == '404 whatever'
|
|
|
|
item = WebserviceCallStatusItem()
|
|
item.url = 'http://remote.example.net/xml'
|
|
item.post = False
|
|
item.varname = 'xxx'
|
|
item.action_on_bad_data = ':stop'
|
|
item.record_errors = True
|
|
with pytest.raises(AbortActionException):
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].parts[-1].summary == 'ValueError: No JSON object could be decoded\n'
|
|
|
|
|
|
def test_timeout(pub):
|
|
workflow = Workflow(name='timeout')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = ['_submitter', '_receiver']
|
|
jump.timeout = 0.1
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
assert formdef.get_workflow().id == workflow.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
formdata_id = formdata.id
|
|
|
|
time.sleep(0.3)
|
|
_apply_timeouts(pub)
|
|
|
|
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
|
|
|
|
def test_timeout_then_remove(two_pubs):
|
|
workflow = Workflow(name='timeout-then-remove')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
st2 = workflow.add_status('Status2', 'st2')
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = ['_submitter', '_receiver']
|
|
jump.timeout = 0.1
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
remove = RemoveWorkflowStatusItem()
|
|
st2.items.append(remove)
|
|
remove.parent = st2
|
|
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz%s' % id(two_pubs)
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
assert formdef.get_workflow().id == workflow.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
formdata_id = formdata.id
|
|
|
|
assert str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
|
|
|
|
time.sleep(0.2)
|
|
_apply_timeouts(two_pubs)
|
|
|
|
assert not str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
|
|
|
|
def test_sms(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = SendSMSWorkflowStatusItem()
|
|
item.to = ['000']
|
|
item.body = 'XXX'
|
|
item.perform(formdata) # nothing
|
|
assert sms_mocking.sms[-1]['destinations'] == ['000']
|
|
assert sms_mocking.sms[-1]['text'] == 'XXX'
|
|
sms_mocking.empty()
|
|
|
|
# check None as recipient is not passed to the SMS backend
|
|
item.to = [None]
|
|
item.perform(formdata) # nothing
|
|
assert len(sms_mocking.sms) == 0
|
|
|
|
item.to = ['000', None]
|
|
item.perform(formdata) # nothing
|
|
assert sms_mocking.sms[-1]['destinations'] == ['000']
|
|
assert sms_mocking.sms[-1]['text'] == 'XXX'
|
|
|
|
|
|
def test_display_form(two_pubs):
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
display_form = FormWorkflowStatusItem()
|
|
display_form.id = '_x'
|
|
display_form.varname = 'xxx'
|
|
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
|
display_form.formdef.fields.append(StringField(id='1', label='Test',
|
|
type='string'))
|
|
display_form.formdef.fields.append(DateField(id='2', label='Date',
|
|
type='date', varname='date'))
|
|
st1.items.append(display_form)
|
|
display_form.parent = st1
|
|
|
|
form = Form(action='#')
|
|
display_form.fill_form(form, formdata, None)
|
|
assert form.widgets[0].title == 'Test'
|
|
assert form.widgets[1].title == 'Date'
|
|
|
|
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '2015-05-12', 'submit': 'submit'}
|
|
display_form.submit_form(form, formdata, None, None)
|
|
|
|
assert formdata.get_substitution_variables()['xxx_var_date'] == '2015-05-12'
|
|
|
|
two_pubs.cfg['language'] = {'language': 'fr'}
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
form = Form(action='#')
|
|
display_form.fill_form(form, formdata, None)
|
|
two_pubs.get_request().form = {'f1': 'Foobar', 'f2': '12/05/2015', 'submit': 'submit'}
|
|
display_form.submit_form(form, formdata, None, None)
|
|
assert formdata.get_substitution_variables()['xxx_var_date'] == '12/05/2015'
|
|
|
|
assert formdata.get_substitution_variables()['xxx_var_date_raw'] == \
|
|
time.strptime('2015-05-12', '%Y-%m-%d')
|
|
|
|
two_pubs.cfg['language'] = {'language': 'en'}
|
|
|
|
def test_workflow_role_type_migration(pub):
|
|
workflow = Workflow(name='role migration')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = [1, 2]
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
workflow.store()
|
|
|
|
reloaded_workflow = Workflow.get(workflow.id)
|
|
assert reloaded_workflow.possible_status[0].items[0].by == ['1', '2']
|
|
|
|
def test_workflow_display_message(pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
workflow = Workflow(name='display message')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
formdata.id = '1'
|
|
|
|
display_message = DisplayMessageWorkflowStatusItem()
|
|
display_message.parent = st1
|
|
|
|
display_message.message = 'test'
|
|
assert display_message.get_message(formdata) == 'test'
|
|
|
|
display_message.message = '[number]'
|
|
assert display_message.get_message(formdata) == str(formdata.id)
|
|
|
|
display_message.message = '[bar]'
|
|
assert display_message.get_message(formdata) == 'Foobar'
|
|
|
|
# makes sure the string is correctly escaped for HTML
|
|
display_message.message = '[foo]'
|
|
assert display_message.get_message(formdata) == '1 < 3'
|
|
|
|
def test_workflow_roles(pub):
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
user = pub.user_class(name='foo')
|
|
user.email = 'zorg@localhost'
|
|
user.store()
|
|
|
|
Role.wipe()
|
|
role1 = Role(name='foo')
|
|
role1.emails = ['foo@localhost']
|
|
role1.store()
|
|
|
|
role2 = Role(name='bar')
|
|
role2.emails = ['bar@localhost', 'baz@localhost']
|
|
role2.store()
|
|
|
|
workflow = Workflow(name='wf roles')
|
|
st1 = workflow.add_status('Status1', 'st1')
|
|
item = SendmailWorkflowStatusItem()
|
|
item.to = ['_receiver', '_other']
|
|
item.subject = 'Foobar'
|
|
item.body = 'Hello'
|
|
st1.items.append(item)
|
|
item.parent = st1
|
|
workflow.roles['_other'] = 'Other Function'
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.workflow_roles = {'_receiver': role1.id, '_other': role2.id}
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
emails.empty()
|
|
item.perform(formdata)
|
|
get_response().process_after_jobs()
|
|
assert emails.count() == 1
|
|
assert emails.get('Foobar')
|
|
assert set(emails.get('Foobar')['email_rcpt']) == set(
|
|
['foo@localhost', 'bar@localhost', 'baz@localhost'])
|