983 lines
33 KiB
Python
983 lines
33 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import io
|
|
import xml.etree.ElementTree as ET
|
|
|
|
import pytest
|
|
from quixote.http_request import Upload
|
|
|
|
from wcs.carddef import CardDef
|
|
from wcs.fields import FileField, StringField
|
|
from wcs.formdef import FormDef
|
|
from wcs.mail_templates import MailTemplate
|
|
from wcs.qommon.form import UploadedFile
|
|
from wcs.qommon.misc import indent_xml as indent
|
|
from wcs.wf.attachment import AddAttachmentWorkflowStatusItem
|
|
from wcs.wf.backoffice_fields import SetBackofficeFieldsWorkflowStatusItem
|
|
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
|
|
from wcs.wf.dispatch import DispatchWorkflowStatusItem
|
|
from wcs.wf.export_to_model import ExportToModel
|
|
from wcs.wf.external_workflow import ExternalWorkflowGlobalAction
|
|
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
|
|
from wcs.wf.jump import JumpWorkflowStatusItem
|
|
from wcs.wf.profile import UpdateUserProfileStatusItem
|
|
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
|
|
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
|
|
from wcs.wf.wscall import WebserviceCallStatusItem
|
|
from wcs.workflows import (
|
|
ChoiceWorkflowStatusItem,
|
|
CommentableWorkflowStatusItem,
|
|
DisplayMessageWorkflowStatusItem,
|
|
JumpOnSubmitWorkflowStatusItem,
|
|
SendmailWorkflowStatusItem,
|
|
SendSMSWorkflowStatusItem,
|
|
Workflow,
|
|
WorkflowBackofficeFieldsFormDef,
|
|
WorkflowCriticalityLevel,
|
|
WorkflowImportError,
|
|
WorkflowVariablesFieldsFormDef,
|
|
)
|
|
|
|
from .utilities import clean_temporary_pub, create_temporary_pub
|
|
|
|
|
|
@pytest.fixture
|
|
def pub(request):
|
|
return create_temporary_pub()
|
|
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
|
|
|
|
def export_to_indented_xml(workflow, include_id=False):
|
|
workflow_xml = workflow.export_to_xml(include_id=include_id)
|
|
indent(workflow_xml)
|
|
return workflow_xml
|
|
|
|
|
|
def assert_import_export_works(wf, include_id=False):
|
|
wf2 = Workflow.import_from_xml_tree(ET.fromstring(ET.tostring(wf.export_to_xml(include_id))), include_id)
|
|
assert ET.tostring(export_to_indented_xml(wf)) == ET.tostring(export_to_indented_xml(wf2))
|
|
return wf2
|
|
|
|
|
|
def test_empty(pub):
|
|
wf = Workflow(name='empty')
|
|
assert_import_export_works(wf)
|
|
|
|
|
|
def test_status(pub):
|
|
wf = Workflow(name='status')
|
|
wf.add_status('Status1', 'st1')
|
|
wf.add_status('Status2', 'st2')
|
|
assert_import_export_works(wf)
|
|
|
|
|
|
def test_status_actions(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
wf.add_status('Status2', 'st2')
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = ['_submitter', '_receiver']
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
assert_import_export_works(wf)
|
|
|
|
|
|
def test_status_colour_css_class(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
st1.extra_css_class = 'hello'
|
|
st1.colour = 'FF0000'
|
|
wf.add_status('Status2', 'st2')
|
|
assert_import_export_works(wf)
|
|
|
|
|
|
def test_status_forced_endpoint(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
st1.forced_endpoint = True
|
|
wf.add_status('Status2', 'st2')
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].forced_endpoint is True
|
|
assert wf2.possible_status[1].forced_endpoint is False
|
|
|
|
|
|
def test_default_wf(pub):
|
|
wf = Workflow.get_default_workflow()
|
|
assert_import_export_works(wf)
|
|
|
|
|
|
def test_action_dispatch(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
role = pub.role_class()
|
|
role.id = '5'
|
|
role.name = 'Test Role'
|
|
role.store()
|
|
|
|
dispatch = DispatchWorkflowStatusItem()
|
|
dispatch.id = '_x'
|
|
dispatch.role_id = 5
|
|
dispatch.role_key = 'plop'
|
|
st1.items.append(dispatch)
|
|
dispatch.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
|
|
# checks role id is imported as integer
|
|
assert wf2.possible_status[0].items[0].role_id == '5'
|
|
|
|
pub.cfg['sp'] = {'idp-manage-roles': True}
|
|
# now roles are managed: cannot create them
|
|
dispatch.role_id = 'unknown'
|
|
with pytest.raises(WorkflowImportError, match=r'.*Unknown referenced role.*'):
|
|
wf2 = assert_import_export_works(wf)
|
|
# but allow computed roles
|
|
dispatch.role_id = '=form_var_bar'
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].role_id == '=form_var_bar'
|
|
dispatch.role_id = 'Role {{ form_var_foo }}'
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].role_id == 'Role {{ form_var_foo }}'
|
|
dispatch.role_id = 'Role [form_var_foo]'
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].role_id == 'Role [form_var_foo]'
|
|
|
|
dispatch.role_id = 'Rolé [form_var_foo]'
|
|
wf2 = assert_import_export_works(wf, include_id=False)
|
|
assert wf2.possible_status[0].items[0].role_id == 'Rolé [form_var_foo]'
|
|
|
|
dispatch.role_id = 'Rolé [form_var_foo]'
|
|
wf2 = assert_import_export_works(wf, include_id=True)
|
|
assert wf2.possible_status[0].items[0].role_id == 'Rolé [form_var_foo]'
|
|
|
|
|
|
def test_status_actions_named_role(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
wf.add_status('Status2', 'st2')
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = ['logged-users']
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
assert_import_export_works(wf)
|
|
|
|
|
|
def test_status_actions_named_existing_role(pub):
|
|
role = pub.role_class()
|
|
role.id = '2'
|
|
role.name = 'Test Role named existing role'
|
|
role.store()
|
|
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
wf.add_status('Status2', 'st2')
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = [2]
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
assert b'<item role_id="2">Test Role named existing role</item>' in ET.tostring(
|
|
indent(wf.export_to_xml())
|
|
)
|
|
assert wf2.possible_status[0].items[0].by == ['2']
|
|
|
|
# check that it works even if the role_id is not set
|
|
xml_export_orig = ET.tostring(export_to_indented_xml(wf))
|
|
xml_export = xml_export_orig.replace(
|
|
b'<item role_id="2">Test Role named existing role</item>',
|
|
b'<item>Test Role named existing role</item>',
|
|
)
|
|
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
|
assert wf3.possible_status[0].items[0].by == ['2']
|
|
|
|
|
|
def test_status_actions_named_missing_role(pub):
|
|
role = pub.role_class()
|
|
role.id = '3'
|
|
role.name = 'Test Role A'
|
|
role.store()
|
|
|
|
role = pub.role_class()
|
|
role.id = '4'
|
|
role.name = 'Test Role B'
|
|
role.store()
|
|
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
wf.add_status('Status2', 'st2')
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = [3]
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
assert_import_export_works(wf)
|
|
|
|
# check that role name has precedence over id
|
|
xml_export_orig = ET.tostring(export_to_indented_xml(wf))
|
|
assert b'<item role_id="3">Test Role A</item>' in xml_export_orig
|
|
xml_export = xml_export_orig.replace(
|
|
b'<item role_id="3">Test Role A</item>', b'<item role_id="4">Test Role A</item>'
|
|
)
|
|
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
|
assert wf3.possible_status[0].items[0].by == ['3']
|
|
|
|
# check that it creates a new role if there's no match on id and name
|
|
xml_export = xml_export_orig.replace(
|
|
b'<item role_id="3">Test Role A</item>', b'<item role_id="999">foobar</item>'
|
|
)
|
|
nb_roles = pub.role_class.count()
|
|
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
|
assert pub.role_class.count() == nb_roles + 1
|
|
|
|
# check that it doesn't fallback on the id if there's no match on the
|
|
# name
|
|
nb_roles = pub.role_class.count()
|
|
xml_export = xml_export_orig.replace(
|
|
b'<item role_id="3">Test Role A</item>', b'<item role_id="3">Test Role C</item>'
|
|
)
|
|
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
|
assert wf3.possible_status[0].items[0].by != ['3']
|
|
assert pub.role_class.count() == nb_roles + 1
|
|
|
|
# on the other hand, check that it uses the id when included_id is True
|
|
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)), include_id=True)
|
|
assert wf3.possible_status[0].items[0].by == ['3']
|
|
|
|
|
|
def test_display_form_action(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
display_form = FormWorkflowStatusItem()
|
|
display_form.id = '_x'
|
|
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
|
display_form.formdef.fields.append(StringField(label='Test', type='string'))
|
|
display_form.formdef.fields.append(StringField(label='Test2', type='string'))
|
|
st1.items.append(display_form)
|
|
display_form.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
# formdef.max_field_id is recalculated when importing a FormWorkflowStatusItem
|
|
assert wf2.possible_status[0].items[0].formdef.max_field_id == len(display_form.formdef.fields)
|
|
|
|
# check action id is not lost when using include_id
|
|
wf2 = assert_import_export_works(wf, include_id=True)
|
|
assert wf2.possible_status[0].items[0].id == display_form.id
|
|
|
|
|
|
def test_export_to_model_action(pub):
|
|
wf = Workflow(name='status')
|
|
wf.store()
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
export_to = ExportToModel()
|
|
export_to.label = 'test'
|
|
upload = Upload('/foo/bar', content_type='application/vnd.oasis.opendocument.text')
|
|
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00'''
|
|
upload.fp = io.BytesIO()
|
|
upload.fp.write(file_content)
|
|
upload.fp.seek(0)
|
|
export_to.model_file = UploadedFile(pub.APP_DIR, None, upload)
|
|
st1.items.append(export_to)
|
|
export_to.parent = st1
|
|
|
|
assert wf.possible_status[0].items[0].model_file.base_filename == 'bar'
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].model_file.base_filename == 'bar'
|
|
assert wf2.possible_status[0].items[0].model_file.get_file().read() == file_content
|
|
|
|
# and test with an empty file
|
|
export_to = ExportToModel()
|
|
export_to.label = 'test'
|
|
upload = Upload('/foo/bar', content_type='text/rtf')
|
|
file_content = b''
|
|
upload.fp = io.BytesIO()
|
|
upload.fp.write(file_content)
|
|
upload.fp.seek(0)
|
|
export_to.model_file = UploadedFile(pub.APP_DIR, None, upload)
|
|
st1.items = [export_to]
|
|
export_to.parent = st1
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].model_file.get_file().read() == file_content
|
|
|
|
wf2 = assert_import_export_works(wf, include_id=True)
|
|
wf3 = assert_import_export_works(wf2, include_id=True)
|
|
assert (
|
|
wf2.possible_status[0].items[0].model_file.filename
|
|
== wf3.possible_status[0].items[0].model_file.filename
|
|
)
|
|
|
|
wf3 = assert_import_export_works(wf2, include_id=False)
|
|
assert (
|
|
wf2.possible_status[0].items[0].model_file.filename
|
|
!= wf3.possible_status[0].items[0].model_file.filename
|
|
)
|
|
|
|
|
|
def test_export_roles(pub):
|
|
wf = Workflow(name='roles')
|
|
wf.roles = {'foo': 'Bar'}
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.roles == wf.roles
|
|
|
|
|
|
def test_jump_action(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
wf.add_status('Status2', 'st2')
|
|
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = ['_submitter', '_receiver']
|
|
jump.condition = {'type': 'python', 'value': '"foo"'}
|
|
jump.trigger = 'bar'
|
|
jump.timeout = 1200
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].condition == {'type': 'python', 'value': '"foo"'}
|
|
assert wf2.possible_status[0].items[0].trigger == 'bar'
|
|
assert wf2.possible_status[0].items[0].timeout == 1200
|
|
|
|
# check with a formula as timeout
|
|
jump.timeout = '=1200'
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].timeout == '=1200'
|
|
|
|
# legacy condition value
|
|
workflow_xml = wf.export_to_xml(wf)
|
|
condition = workflow_xml.findall('possible_status/*/items/item/condition')[0]
|
|
condition.clear()
|
|
condition.text = '"foo"'
|
|
|
|
wf2 = Workflow.import_from_xml_tree(workflow_xml)
|
|
assert wf2.possible_status[0].items[0].condition == {'type': 'python', 'value': '"foo"'}
|
|
|
|
|
|
def test_commentable_action(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = ['_submitter', '_receiver']
|
|
commentable.button_label = None
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].button_label is None
|
|
assert wf2.possible_status[0].items[0].required is False
|
|
|
|
commentable.required = True
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].required is True
|
|
|
|
# import legacy comment without required attribute
|
|
xml_export = ET.tostring(export_to_indented_xml(wf))
|
|
assert b'<required>True</required>' in xml_export
|
|
xml_export = xml_export.replace(b'<required>True</required>', b'')
|
|
assert b'<required>True</required>' not in xml_export
|
|
wf2 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
|
assert wf2.possible_status[0].items[0].required is False
|
|
|
|
commentable.button_label = 'button label'
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].button_label == 'button label'
|
|
|
|
|
|
def test_variables_formdef(pub):
|
|
wf = Workflow(name='variables')
|
|
wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
|
|
wf.variables_formdef.fields.append(StringField(label='Test', type='string'))
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.variables_formdef.fields[0].label == 'Test'
|
|
|
|
|
|
def test_wscall_action(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
wscall = WebserviceCallStatusItem()
|
|
wscall.id = '_wscall'
|
|
wscall.url = 'http://test/'
|
|
wscall.varname = 'varname'
|
|
wscall.post = False
|
|
wscall.request_signature_key = 'key'
|
|
wscall.post_data = {'one': '1', 'two': '=2', 'good:name': 'ok'}
|
|
wscall.qs_data = {'one': '2', 'two': '=3', 'good:name': 'ok'}
|
|
st1.items.append(wscall)
|
|
wscall.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
wscall2 = wf2.possible_status[0].items[0]
|
|
assert wscall2.url == 'http://test/'
|
|
assert wscall2.varname == 'varname'
|
|
assert wscall2.post is False
|
|
assert wscall2.request_signature_key == 'key'
|
|
assert wscall2.post_data == {'one': '1', 'two': '=2', 'good:name': 'ok'}
|
|
assert wscall2.qs_data == {'one': '2', 'two': '=3', 'good:name': 'ok'}
|
|
|
|
|
|
def test_backoffice_info_text(pub):
|
|
wf = Workflow(name='info texts')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
st1.backoffice_info_text = '<p>Foo</p>'
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = ['_submitter', '_receiver']
|
|
commentable.backoffice_info_text = '<p>Bar</p>'
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].backoffice_info_text == '<p>Foo</p>'
|
|
assert wf2.possible_status[0].items[0].backoffice_info_text == '<p>Bar</p>'
|
|
|
|
|
|
def test_global_actions(pub):
|
|
role = pub.role_class()
|
|
role.id = '5'
|
|
role.name = 'Test Role'
|
|
role.store()
|
|
|
|
wf = Workflow(name='global actions')
|
|
ac1 = wf.add_global_action('Action', 'ac1')
|
|
ac1.backoffice_info_text = '<p>Foo</p>'
|
|
|
|
add_to_journal = RegisterCommenterWorkflowStatusItem()
|
|
add_to_journal.id = '_add_to_journal'
|
|
add_to_journal.comment = 'HELLO WORLD'
|
|
ac1.items.append(add_to_journal)
|
|
add_to_journal.parent = ac1
|
|
|
|
trigger = ac1.triggers[0]
|
|
assert trigger.key == 'manual'
|
|
trigger.roles = [role.id]
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.global_actions[0].triggers[0].roles == [role.id]
|
|
|
|
wf2 = assert_import_export_works(wf, True)
|
|
|
|
|
|
def test_register_comment_to(pub):
|
|
role = pub.role_class()
|
|
role.id = '5'
|
|
role.name = 'Test Role'
|
|
role.store()
|
|
|
|
wf = Workflow(name='global actions')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
add_to_journal1 = RegisterCommenterWorkflowStatusItem()
|
|
add_to_journal1.id = '_add_to_journal1'
|
|
add_to_journal1.comment = 'HELLO WORLD'
|
|
st1.items.append(add_to_journal1)
|
|
add_to_journal1.parent = st1
|
|
|
|
add_to_journal2 = RegisterCommenterWorkflowStatusItem()
|
|
add_to_journal2.id = '_add_to_journal2'
|
|
add_to_journal2.comment = 'OLA MUNDO'
|
|
add_to_journal2.to = [role.id]
|
|
st1.items.append(add_to_journal2)
|
|
add_to_journal2.parent = st1
|
|
assert wf.possible_status[0].items[0].to is None
|
|
assert wf.possible_status[0].items[1].to == [role.id]
|
|
|
|
xml_root = wf.export_to_xml()
|
|
assert 'to' not in [x.tag for x in xml_root.findall('possible_status/status/items/item[1]/')]
|
|
assert 'to' in [x.tag for x in xml_root.findall('possible_status/status/items/item[2]/')]
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].to == []
|
|
assert wf2.possible_status[0].items[1].to == [role.id]
|
|
|
|
|
|
def test_backoffice_fields(pub):
|
|
wf = Workflow(name='bo fields')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
wf.backoffice_fields_formdef.fields = [
|
|
StringField(id='bo1', label='1st backoffice field', type='string', varname='backoffice_blah'),
|
|
]
|
|
assert_import_export_works(wf, True)
|
|
|
|
|
|
def test_complex_dispatch_action(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
pub.role_class.wipe()
|
|
|
|
role1 = pub.role_class()
|
|
role1.name = 'Test Role 1'
|
|
role1.store()
|
|
|
|
role2 = pub.role_class()
|
|
role2.name = 'Test Role 2'
|
|
role2.store()
|
|
|
|
dispatch = DispatchWorkflowStatusItem()
|
|
dispatch.id = '_dispatch'
|
|
dispatch.role_key = '_receiver'
|
|
dispatch.dispatch_type = 'automatic'
|
|
dispatch.variable = 'plop'
|
|
dispatch.rules = [{'value': 'a', 'role_id': role1.id}, {'value': 'b', 'role_id': role2.id}]
|
|
st1.items.append(dispatch)
|
|
dispatch.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].variable == dispatch.variable
|
|
assert wf2.possible_status[0].items[0].rules == dispatch.rules
|
|
assert wf2.possible_status[0].items[0].dispatch_type == 'automatic'
|
|
|
|
pub.role_class.wipe()
|
|
|
|
role3 = pub.role_class()
|
|
role3.name = 'Test Role 1'
|
|
role3.store()
|
|
|
|
role4 = pub.role_class()
|
|
role4.name = 'Test Role 2'
|
|
role4.store()
|
|
|
|
xml_export_orig = export_to_indented_xml(wf, include_id=True)
|
|
wf2 = Workflow.import_from_xml_tree(xml_export_orig)
|
|
assert wf2.possible_status[0].items[0].variable == dispatch.variable
|
|
assert wf2.possible_status[0].items[0].rules == [
|
|
{'value': 'a', 'role_id': role3.id},
|
|
{'value': 'b', 'role_id': role4.id},
|
|
]
|
|
assert wf2.possible_status[0].items[0].dispatch_type == 'automatic'
|
|
|
|
# check rules are not exported with dispatch type is not automatic
|
|
dispatch.dispatch_type = 'manual'
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].dispatch_type == 'manual'
|
|
assert not wf2.possible_status[0].items[0].rules
|
|
xml_export = export_to_indented_xml(wf, include_id=True)
|
|
assert xml_export.find('possible_status/status/items/item/rules') is None
|
|
|
|
|
|
def test_display_message_action(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
display = DisplayMessageWorkflowStatusItem()
|
|
display.message = 'hey'
|
|
display.to = ['_submitter', '1']
|
|
st1.items.append(display)
|
|
display.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].message == display.message
|
|
for role_id in display.to:
|
|
assert role_id in wf2.possible_status[0].items[0].to
|
|
|
|
wf2 = assert_import_export_works(wf, include_id=True)
|
|
|
|
|
|
def test_sendmail_other_destination(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
sendmail = SendmailWorkflowStatusItem()
|
|
sendmail.to = ['_submitter']
|
|
st1.items.append(sendmail)
|
|
sendmail.parent = st1
|
|
|
|
pub.role_class.wipe()
|
|
wf2 = assert_import_export_works(wf)
|
|
assert pub.role_class.count() == 0
|
|
|
|
sendmail.to = [
|
|
'_submitter',
|
|
'=form_var_plop',
|
|
'[form_var_plop]',
|
|
'{{ form_var_plop }}',
|
|
'foobar@localhost',
|
|
]
|
|
wf2 = assert_import_export_works(wf)
|
|
assert pub.role_class.count() == 0
|
|
assert wf2.possible_status[0].items[0].to == sendmail.to
|
|
|
|
|
|
def test_sendmail_attachments(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
sendmail = SendmailWorkflowStatusItem()
|
|
st1.items.append(sendmail)
|
|
sendmail.parent = st1
|
|
|
|
sendmail.attachments = ['form_var_file_raw', 'form_fbo1']
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].attachments == sendmail.attachments
|
|
|
|
sendmail.attachments = []
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].attachments == []
|
|
|
|
|
|
def test_sms(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
sendsms = SendSMSWorkflowStatusItem()
|
|
sendsms.to = ['0123456789', '']
|
|
sendsms.body = 'hello'
|
|
st1.items.append(sendsms)
|
|
sendsms.parent = st1
|
|
|
|
pub.role_class.wipe()
|
|
wf2 = assert_import_export_works(wf)
|
|
assert pub.role_class.count() == 0
|
|
assert wf2.possible_status[0].items[0].to == sendsms.to
|
|
|
|
|
|
def test_criticality_level(pub):
|
|
wf = Workflow(name='criticality level')
|
|
wf.criticality_levels = [
|
|
WorkflowCriticalityLevel(name='green'),
|
|
WorkflowCriticalityLevel(name='yellow'),
|
|
WorkflowCriticalityLevel(name='red', colour='FF0000'),
|
|
]
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.criticality_levels[0].name == 'green'
|
|
assert wf2.criticality_levels[1].name == 'yellow'
|
|
|
|
|
|
def test_global_timeout_trigger(pub):
|
|
wf = Workflow(name='global actions')
|
|
ac1 = wf.add_global_action('Action', 'ac1')
|
|
trigger = ac1.append_trigger('timeout')
|
|
trigger.timeout = '2'
|
|
trigger.anchor = 'creation'
|
|
|
|
wf2 = Workflow.import_from_xml_tree(ET.fromstring(ET.tostring(wf.export_to_xml(True))), True)
|
|
assert wf2.global_actions[0].triggers[-1].id == trigger.id
|
|
assert wf2.global_actions[0].triggers[-1].anchor == trigger.anchor
|
|
|
|
|
|
def test_global_anchor_expression_trigger(pub):
|
|
wf = Workflow(name='global actions')
|
|
ac1 = wf.add_global_action('Action', 'ac1')
|
|
trigger = ac1.append_trigger('timeout')
|
|
trigger.anchor_expression = 'False'
|
|
trigger.anchor_template = '{{x}}'
|
|
trigger.anchor = 'python'
|
|
|
|
wf2 = assert_import_export_works(wf, include_id=True)
|
|
assert wf2.global_actions[0].triggers[-1].id == trigger.id
|
|
assert wf2.global_actions[0].triggers[-1].anchor == trigger.anchor
|
|
assert wf2.global_actions[0].triggers[-1].anchor_expression == trigger.anchor_expression
|
|
assert wf2.global_actions[0].triggers[-1].anchor_template == trigger.anchor_template
|
|
|
|
|
|
def test_global_webservice_trigger(pub):
|
|
wf = Workflow(name='global actions')
|
|
ac1 = wf.add_global_action('Action', 'ac1')
|
|
trigger = ac1.append_trigger('webservice')
|
|
trigger.identifier = 'plop'
|
|
|
|
wf2 = assert_import_export_works(wf, include_id=True)
|
|
assert wf2.global_actions[0].triggers[-1].id == trigger.id
|
|
assert wf2.global_actions[0].triggers[-1].identifier == trigger.identifier
|
|
|
|
|
|
def test_profile_action(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
item = UpdateUserProfileStatusItem()
|
|
item.id = '_item'
|
|
item.fields = [{'field_id': '__email', 'value': '=form_var_foo'}]
|
|
st1.items.append(item)
|
|
item.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
item2 = wf2.possible_status[0].items[0]
|
|
assert item2.fields == [{'field_id': '__email', 'value': '=form_var_foo'}]
|
|
|
|
|
|
def test_attachment_action(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
item = AddAttachmentWorkflowStatusItem()
|
|
item.id = '_foo'
|
|
item.document_type = {
|
|
'id': '_audio',
|
|
'label': 'Sound files',
|
|
'mimetypes': ['audio/*'],
|
|
}
|
|
st1.items.append(item)
|
|
item.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
item2 = wf2.possible_status[0].items[0]
|
|
assert item2.document_type == {
|
|
'id': '_audio',
|
|
'label': 'Sound files',
|
|
'mimetypes': ['audio/*'],
|
|
}
|
|
|
|
|
|
def test_set_backoffice_fields_action(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.id = '_item'
|
|
item.fields = [{'field_id': 'bo1', 'value': '=form_var_foo'}]
|
|
st1.items.append(item)
|
|
item.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
item2 = wf2.possible_status[0].items[0]
|
|
assert item2.fields == [{'field_id': 'bo1', 'value': '=form_var_foo'}]
|
|
|
|
|
|
def test_set_backoffice_fields_action_boolean(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
item.id = '_item'
|
|
item.fields = [{'field_id': 'bo1', 'value': 'True'}]
|
|
st1.items.append(item)
|
|
item.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
item2 = wf2.possible_status[0].items[0]
|
|
assert item2.fields == [{'field_id': 'bo1', 'value': 'True'}]
|
|
|
|
|
|
def test_action_condition(pub):
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
sendmail = SendmailWorkflowStatusItem()
|
|
st1.items.append(sendmail)
|
|
sendmail.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
|
|
sendmail.condition = {} # should not be created any longer
|
|
wf2 = Workflow.import_from_xml_tree(wf.export_to_xml(False), False)
|
|
assert wf2.possible_status[0].items[0].condition is None
|
|
|
|
sendmail.condition = {'type': 'python', 'value': 'True'}
|
|
wf2 = assert_import_export_works(wf)
|
|
|
|
|
|
def test_create_formdata(pub):
|
|
target_formdef = FormDef()
|
|
target_formdef.name = 'target form'
|
|
target_formdef.fields = [
|
|
StringField(id='0', label='string', varname='foo_string'),
|
|
FileField(id='1', label='file', type='file', varname='foo_file'),
|
|
]
|
|
target_formdef.store()
|
|
|
|
wf = Workflow(name='create-formdata')
|
|
|
|
st1 = wf.add_status('New')
|
|
st2 = wf.add_status('Resubmit')
|
|
|
|
jump = ChoiceWorkflowStatusItem()
|
|
jump.id = '_resubmit'
|
|
jump.label = 'Resubmit'
|
|
jump.by = ['_submitter']
|
|
jump.status = st2.id
|
|
jump.parent = st1
|
|
st1.items.append(jump)
|
|
|
|
create_formdata = CreateFormdataWorkflowStatusItem()
|
|
create_formdata.id = '_create_formdata'
|
|
create_formdata.varname = 'resubmitted'
|
|
create_formdata.formdef_slug = target_formdef.url_name
|
|
create_formdata.mappings = [
|
|
Mapping(field_id='0', expression='=form_var_toto_string'),
|
|
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
|
|
]
|
|
create_formdata.parent = st2
|
|
st2.items.append(create_formdata)
|
|
|
|
redirect = RedirectToUrlWorkflowStatusItem()
|
|
redirect.id = '_redirect'
|
|
redirect.url = '{{ form_links_resubmitted.form_url }}'
|
|
redirect.parent = st2
|
|
st2.items.append(redirect)
|
|
|
|
jump = JumpOnSubmitWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.status = st1.id
|
|
jump.parent = st2
|
|
st2.items.append(jump)
|
|
|
|
wf.store()
|
|
|
|
assert_import_export_works(wf, include_id=True)
|
|
|
|
|
|
def test_external_workflow(pub):
|
|
target_wf = Workflow(name='External global action')
|
|
action = target_wf.add_global_action('Delete', 'delete')
|
|
trigger = action.append_trigger('webservice')
|
|
trigger.trigger_id = 'Cleanup'
|
|
target_wf.store()
|
|
|
|
target_formdef = FormDef()
|
|
target_formdef.name = 'target form'
|
|
target_formdef.workflow = target_wf
|
|
target_formdef.store()
|
|
|
|
wf = Workflow(name='External workflow call')
|
|
st1 = wf.add_status('New')
|
|
st2 = wf.add_status('Call external workflow')
|
|
|
|
jump = ChoiceWorkflowStatusItem()
|
|
jump.id = '_external'
|
|
jump.label = 'Cleanup'
|
|
jump.by = ['_submitter']
|
|
jump.status = st2.id
|
|
jump.parent = st1
|
|
st1.items.append(jump)
|
|
|
|
external_workflow = ExternalWorkflowGlobalAction()
|
|
external_workflow.id = '_external_workflow'
|
|
external_workflow.slug = 'formdef:%s' % target_formdef.url_name
|
|
external_workflow.event = trigger.id
|
|
|
|
external_workflow.parent = st2
|
|
st2.items.append(external_workflow)
|
|
|
|
wf.store()
|
|
assert_import_export_works(wf, include_id=True)
|
|
|
|
|
|
def test_worklow_with_mail_template(pub):
|
|
mail_template = MailTemplate(name='test mail template')
|
|
mail_template.subject = 'test subject'
|
|
mail_template.body = 'test body'
|
|
mail_template.store()
|
|
|
|
wf = Workflow(name='test mail template')
|
|
st1 = wf.add_status('Status1')
|
|
item = SendmailWorkflowStatusItem()
|
|
item.to = ['_receiver']
|
|
item.mail_template = mail_template.slug
|
|
st1.items.append(item)
|
|
item.parent = st1
|
|
wf.store()
|
|
assert_import_export_works(wf, include_id=True)
|
|
|
|
# import with non existing mail template
|
|
MailTemplate.wipe()
|
|
export = ET.tostring(wf.export_to_xml(include_id=True))
|
|
with pytest.raises(WorkflowImportError, match='Unknown referenced mail template'):
|
|
Workflow.import_from_xml_tree(ET.fromstring(export), include_id=True)
|
|
|
|
|
|
def test_unknown_data_source(pub):
|
|
wf1 = Workflow(name='status')
|
|
st1 = wf1.add_status('Status1', 'st1')
|
|
display_form = FormWorkflowStatusItem()
|
|
display_form.id = '_x'
|
|
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
|
display_form.formdef.fields = [StringField(label='Test', type='string', data_source={'type': 'foobar'})]
|
|
st1.items.append(display_form)
|
|
display_form.parent = st1
|
|
|
|
wf2 = Workflow(name='variables')
|
|
wf2.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf2)
|
|
wf2.variables_formdef.fields = [StringField(label='Test', type='string', data_source={'type': 'foobar'})]
|
|
|
|
wf3 = Workflow(name='bo fields')
|
|
wf3.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf3)
|
|
wf3.backoffice_fields_formdef.fields = [
|
|
StringField(
|
|
id='bo1',
|
|
label='1st backoffice field',
|
|
type='string',
|
|
varname='backoffice_blah',
|
|
data_source={'type': 'foobar'},
|
|
)
|
|
]
|
|
|
|
for wf in [wf1, wf2, wf3]:
|
|
export = ET.tostring(export_to_indented_xml(wf))
|
|
with pytest.raises(WorkflowImportError, match='Unknown datasources'):
|
|
Workflow.import_from_xml(io.BytesIO(export))
|
|
|
|
# carddef as datasource
|
|
CardDef.wipe()
|
|
carddef = CardDef()
|
|
carddef.name = 'foo'
|
|
carddef.fields = [StringField(id='1', label='Test', type='string', varname='foo')]
|
|
carddef.store()
|
|
|
|
display_form.formdef.fields[0].data_source = {'type': 'carddef:foo'}
|
|
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:foo'}
|
|
wf3.backoffice_fields_formdef.fields[0].data_source = {'type': 'carddef:foo'}
|
|
|
|
for wf in [wf1, wf2, wf3]:
|
|
export = ET.tostring(export_to_indented_xml(wf))
|
|
Workflow.import_from_xml(io.BytesIO(export))
|
|
|
|
display_form.formdef.fields[0].data_source = {'type': 'carddef:unknown'}
|
|
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:unknown'}
|
|
wf3.backoffice_fields_formdef.fields[0].data_source = {'type': 'carddef:unknown'}
|
|
|
|
for wf in [wf1, wf2, wf3]:
|
|
export = ET.tostring(export_to_indented_xml(wf))
|
|
with pytest.raises(WorkflowImportError, match='Unknown datasources'):
|
|
Workflow.import_from_xml(io.BytesIO(export))
|
|
|
|
# carddef custom view as datasource
|
|
pub.custom_view_class.wipe()
|
|
custom_view = pub.custom_view_class()
|
|
custom_view.title = 'card view'
|
|
custom_view.formdef = carddef
|
|
custom_view.columns = {'list': [{'id': 'id'}]}
|
|
custom_view.filters = {}
|
|
custom_view.visibility = 'datasource'
|
|
custom_view.store()
|
|
|
|
display_form.formdef.fields[0].data_source = {'type': 'carddef:foo:card-view'}
|
|
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:foo:card-view'}
|
|
wf3.backoffice_fields_formdef.fields[0].data_source = {'type': 'carddef:foo:card-view'}
|
|
|
|
for wf in [wf1, wf2, wf3]:
|
|
export = ET.tostring(export_to_indented_xml(wf))
|
|
Workflow.import_from_xml(io.BytesIO(export))
|
|
|
|
display_form.formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'}
|
|
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'}
|
|
wf3.backoffice_fields_formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'}
|
|
|
|
for wf in [wf1, wf2, wf3]:
|
|
export = ET.tostring(export_to_indented_xml(wf))
|
|
with pytest.raises(WorkflowImportError, match='Unknown datasources'):
|
|
Workflow.import_from_xml(io.BytesIO(export))
|