328 lines
9.2 KiB
Python
328 lines
9.2 KiB
Python
import datetime
|
|
import shutil
|
|
|
|
from quixote import cleanup
|
|
from quixote.http_request import HTTPRequest
|
|
|
|
from wcs.formdef import FormDef
|
|
from wcs.qommon import sessions
|
|
from wcs.roles import Role
|
|
from wcs.workflows import WorkflowStatusItem, SendmailWorkflowStatusItem
|
|
from wcs.wf.anonymise import AnonymiseWorkflowStatusItem
|
|
from wcs.wf.dispatch import DispatchWorkflowStatusItem
|
|
from wcs.wf.jump import JumpWorkflowStatusItem
|
|
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
|
|
from wcs.wf.remove import RemoveWorkflowStatusItem
|
|
from wcs.wf.roles import AddRoleWorkflowStatusItem, RemoveRoleWorkflowStatusItem
|
|
|
|
from utilities import create_temporary_pub, MockSubstitutionVariables, emails
|
|
|
|
def setup_module(module):
|
|
cleanup()
|
|
global pub, req
|
|
pub = create_temporary_pub()
|
|
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
|
|
req.response.filter = {}
|
|
req.user = None
|
|
pub._set_request(req)
|
|
req.session = sessions.Session(id=1)
|
|
|
|
def teardown_module(module):
|
|
global pub
|
|
shutil.rmtree(pub.APP_DIR)
|
|
|
|
|
|
def test_jump_nothing():
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
item = JumpWorkflowStatusItem()
|
|
assert item.must_jump(formdata) is True
|
|
|
|
def test_jump_datetime_condition():
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
item = JumpWorkflowStatusItem()
|
|
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
|
item.condition = 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
|
|
yesterday.timetuple()[:3]
|
|
assert item.must_jump(formdata) is True
|
|
|
|
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
|
|
item.condition = 'datetime.datetime.now() > datetime.datetime(%s, %s, %s)' % \
|
|
tomorrow.timetuple()[:3]
|
|
assert item.must_jump(formdata) is False
|
|
|
|
def test_check_auth():
|
|
user = pub.user_class(name='foo')
|
|
user.store()
|
|
|
|
role = Role(name='bar1')
|
|
role.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
|
|
status_item = WorkflowStatusItem()
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
status_item.by = []
|
|
assert status_item.check_auth(formdata, user) is False
|
|
|
|
status_item.by = ['logged-users']
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
status_item.by = [role.id]
|
|
assert status_item.check_auth(formdata, user) is False
|
|
status_item.by = [int(role.id)]
|
|
assert status_item.check_auth(formdata, user) is False
|
|
|
|
user.roles = [role.id]
|
|
status_item.by = [role.id]
|
|
assert status_item.check_auth(formdata, user) is True
|
|
status_item.by = [int(role.id)]
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
status_item.by = ['_submitter']
|
|
assert status_item.check_auth(formdata, user) is False
|
|
formdata.user_id = user.id
|
|
assert status_item.check_auth(formdata, user) is True
|
|
formdata.user_id = None
|
|
|
|
status_item.by = ['_receiver']
|
|
assert status_item.check_auth(formdata, user) is False
|
|
formdata.workflow_roles = {'_receiver': user.id}
|
|
assert status_item.check_auth(formdata, user) is True
|
|
formdef.workflow_roles = {'_receiver': user.id}
|
|
formdata.workflow_roles = None
|
|
assert status_item.check_auth(formdata, user) is True
|
|
|
|
def test_dispatch():
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
item = DispatchWorkflowStatusItem()
|
|
|
|
formdata = formdef.data_class()()
|
|
item.perform(formdata)
|
|
assert not formdata.workflow_roles
|
|
|
|
formdata = formdef.data_class()()
|
|
item.role_key = '_receiver'
|
|
item.role_id = '1'
|
|
item.perform(formdata)
|
|
assert formdata.workflow_roles == {'_receiver': '1'}
|
|
|
|
def test_roles():
|
|
user = pub.user_class()
|
|
user.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.user_id = user.id
|
|
|
|
item = AddRoleWorkflowStatusItem()
|
|
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
item.role_id = '1'
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == ['1']
|
|
|
|
user.roles = None
|
|
user.store()
|
|
item = RemoveRoleWorkflowStatusItem()
|
|
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
item.role_id = '1'
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
user.roles = ['1']
|
|
user.store()
|
|
item.perform(formdata)
|
|
assert not pub.user_class.get(user.id).roles
|
|
|
|
user.roles = ['2', '1']
|
|
user.store()
|
|
item.perform(formdata)
|
|
assert pub.user_class.get(user.id).roles == ['2']
|
|
|
|
def test_anonymise():
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.user_id = '1'
|
|
formdata.store()
|
|
|
|
item = AnonymiseWorkflowStatusItem()
|
|
item.perform(formdata)
|
|
assert formdef.data_class().get(formdata.id).user_id is None
|
|
assert formdef.data_class().get(formdata.id).anonymised
|
|
|
|
def test_remove():
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
|
|
item = RemoveWorkflowStatusItem()
|
|
assert formdef.data_class().has_key(formdata.id)
|
|
assert item.perform(formdata) == 'http://example.net'
|
|
assert not formdef.data_class().has_key(formdata.id)
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
|
|
item = RemoveWorkflowStatusItem()
|
|
req.response.filter['in_backoffice'] = True
|
|
assert formdef.data_class().has_key(formdata.id)
|
|
assert item.perform(formdata) == '..'
|
|
assert not formdef.data_class().has_key(formdata.id)
|
|
req.response.filter = {}
|
|
assert req.session.message
|
|
|
|
def test_register_comment():
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
item = RegisterCommenterWorkflowStatusItem()
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == ''
|
|
|
|
item.comment = 'Hello world'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>Hello world</p>'
|
|
|
|
item.comment = '<div>Hello world</div>'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div>Hello world</div>'
|
|
|
|
item.comment = '[test]'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>[test]</p>'
|
|
|
|
item.comment = '[bar]'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>Foobar</p>'
|
|
|
|
item.comment = '[foo]'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<p>1 < 3</p>'
|
|
|
|
item.comment = '<div>[foo]</div>'
|
|
item.perform(formdata)
|
|
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
|
|
|
|
def test_email():
|
|
pub.substitutions.feed(MockSubstitutionVariables())
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'baz'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
|
|
user = pub.user_class(name='foo')
|
|
user.email = 'zorg@localhost'
|
|
user.store()
|
|
|
|
Role.wipe()
|
|
role1 = Role(name='foo')
|
|
role1.emails = ['foo@localhost']
|
|
role1.store()
|
|
|
|
role2 = Role(name='bar')
|
|
role2.emails = ['bar@localhost', 'baz@localhost']
|
|
role2.store()
|
|
|
|
emails.empty()
|
|
# send using an uncompleted element
|
|
item = SendmailWorkflowStatusItem()
|
|
item.perform(formdata) # nothing
|
|
assert emails.count() == 0
|
|
|
|
item.to = [role1.id]
|
|
item.perform(formdata) # no subject nor body
|
|
assert emails.count() == 0
|
|
|
|
item.subject = 'foobar'
|
|
item.perform(formdata) # no body
|
|
assert emails.count() == 0
|
|
|
|
# send for real
|
|
item.body = 'baz'
|
|
item.perform(formdata)
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')
|
|
assert emails.get('foobar')['kwargs']['email_rcpt'] == ['foo@localhost']
|
|
|
|
# two recipients
|
|
emails.empty()
|
|
item.to = [role1.id, role2.id]
|
|
item.perform(formdata)
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['kwargs']['email_rcpt'] is None
|
|
assert emails.get('foobar')['kwargs']['bcc'] == ['foo@localhost',
|
|
'bar@localhost', 'baz@localhost']
|
|
|
|
# submitter as recipient, no known email address
|
|
emails.empty()
|
|
item.to = ['_submitter']
|
|
item.perform(formdata)
|
|
assert emails.count() == 0
|
|
|
|
# submitter as recipient, known email address
|
|
emails.empty()
|
|
formdata.user_id = user.id
|
|
formdata.store()
|
|
item.perform(formdata)
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['kwargs']['email_rcpt'] == ['zorg@localhost']
|
|
|
|
# computed recipient
|
|
emails.empty()
|
|
item.to = ['=email']
|
|
item.perform(formdata)
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['kwargs']['email_rcpt'] == ['sub@localhost']
|
|
|
|
# string as recipient
|
|
emails.empty()
|
|
item.to = 'xyz@localhost'
|
|
item.perform(formdata)
|
|
assert emails.count() == 1
|
|
assert emails.get('foobar')['kwargs']['email_rcpt'] == ['xyz@localhost']
|