wcs/tests/test_workflow_import.py

688 lines
23 KiB
Python

# -*- coding: utf-8 -*-
import pytest
import sys
import shutil
import xml.etree.ElementTree as ET
from django.utils.six import BytesIO
from quixote import cleanup
from wcs import publisher
from wcs.workflows import (Workflow, CommentableWorkflowStatusItem,
WorkflowCriticalityLevel, WorkflowBackofficeFieldsFormDef,
SendmailWorkflowStatusItem, SendSMSWorkflowStatusItem,
WorkflowImportError)
from wcs.wf.wscall import WebserviceCallStatusItem
from wcs.wf.dispatch import DispatchWorkflowStatusItem
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
from wcs.wf.profile import UpdateUserProfileStatusItem
from wcs.wf.backoffice_fields import SetBackofficeFieldsWorkflowStatusItem
from wcs.roles import Role
from wcs.fields import StringField
from wcs.qommon.misc import indent_xml as indent
from utilities import create_temporary_pub, clean_temporary_pub
@pytest.fixture
def pub(request):
return create_temporary_pub()
def teardown_module(module):
clean_temporary_pub()
def export_to_indented_xml(workflow, include_id=False):
workflow_xml = workflow.export_to_xml(include_id=include_id)
indent(workflow_xml)
return workflow_xml
def assert_import_export_works(wf, include_id=False):
wf2 = Workflow.import_from_xml_tree(
ET.fromstring(ET.tostring(wf.export_to_xml(include_id))), include_id)
assert ET.tostring(export_to_indented_xml(wf)) == ET.tostring(export_to_indented_xml(wf2))
return wf2
def test_empty(pub):
wf = Workflow(name='empty')
assert_import_export_works(wf)
def test_status(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
assert_import_export_works(wf)
def test_status_actions(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
st1.items.append(commentable)
commentable.parent = st1
assert_import_export_works(wf)
def test_status_colour_css_class(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st1.extra_css_class = 'hello'
st1.colour = 'FF0000'
st2 = wf.add_status('Status2', 'st2')
assert_import_export_works(wf)
def test_status_forced_endpoint(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st1.forced_endpoint = True
st2 = wf.add_status('Status2', 'st2')
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].forced_endpoint is True
assert wf2.possible_status[1].forced_endpoint is False
def test_default_wf(pub):
wf = Workflow.get_default_workflow()
assert_import_export_works(wf)
def test_action_dispatch(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
role = Role()
role.id = '5'
role.name = 'Test Role'
role.store()
from wcs.wf.dispatch import DispatchWorkflowStatusItem
dispatch = DispatchWorkflowStatusItem()
dispatch.id = '_x'
dispatch.role_id = 5
dispatch.role_key = 'plop'
st1.items.append(dispatch)
dispatch.parent = st1
wf2 = assert_import_export_works(wf)
# checks role id is imported as integer
assert(wf2.possible_status[0].items[0].role_id == '5')
pub.cfg['sp'] = {'idp-manage-roles': True}
# now roles are managed: cannot create them
dispatch.role_id = 'unknown'
with pytest.raises(WorkflowImportError, match=r'.*Unknown referenced role.*'):
wf2 = assert_import_export_works(wf)
# but allow computed roles
dispatch.role_id = '=form_var_bar'
wf2 = assert_import_export_works(wf)
assert(wf2.possible_status[0].items[0].role_id == '=form_var_bar')
dispatch.role_id = 'Role {{ form_var_foo }}'
wf2 = assert_import_export_works(wf)
assert(wf2.possible_status[0].items[0].role_id == 'Role {{ form_var_foo }}')
dispatch.role_id = 'Role [form_var_foo]'
wf2 = assert_import_export_works(wf)
assert(wf2.possible_status[0].items[0].role_id == 'Role [form_var_foo]')
dispatch.role_id = 'Rolé [form_var_foo]'
wf2 = assert_import_export_works(wf, include_id=False)
assert wf2.possible_status[0].items[0].role_id == 'Rolé [form_var_foo]'
dispatch.role_id = 'Rolé [form_var_foo]'
wf2 = assert_import_export_works(wf, include_id=True)
assert wf2.possible_status[0].items[0].role_id == 'Rolé [form_var_foo]'
def test_status_actions_named_role(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['logged-users']
st1.items.append(commentable)
commentable.parent = st1
assert_import_export_works(wf)
def test_status_actions_named_existing_role(pub):
role = Role()
role.id = '2'
role.name = 'Test Role named existing role'
role.store()
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = [2]
st1.items.append(commentable)
commentable.parent = st1
wf2 = assert_import_export_works(wf)
assert b'<item role_id="2">Test Role named existing role</item>' in ET.tostring(indent(wf.export_to_xml()))
assert wf2.possible_status[0].items[0].by == ['2']
# check that it works even if the role_id is not set
xml_export_orig = ET.tostring(export_to_indented_xml(wf))
xml_export = xml_export_orig.replace(
b'<item role_id="2">Test Role named existing role</item>',
b'<item>Test Role named existing role</item>')
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
assert wf3.possible_status[0].items[0].by == ['2']
def test_status_actions_named_missing_role(pub):
role = Role()
role.id = '3'
role.name = 'Test Role A'
role.store()
role = Role()
role.id = '4'
role.name = 'Test Role B'
role.store()
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = [3]
st1.items.append(commentable)
commentable.parent = st1
wf2 = assert_import_export_works(wf)
# check that role name has precedence over id
xml_export_orig = ET.tostring(export_to_indented_xml(wf))
assert b'<item role_id="3">Test Role A</item>' in xml_export_orig
xml_export = xml_export_orig.replace(b'<item role_id="3">Test Role A</item>',
b'<item role_id="4">Test Role A</item>')
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
assert wf3.possible_status[0].items[0].by == ['3']
# check that it creates a new role if there's no match on id and name
xml_export = xml_export_orig.replace(b'<item role_id="3">Test Role A</item>',
b'<item role_id="999">foobar</item>')
nb_roles = Role.count()
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
assert Role.count() == nb_roles+1
# check that it doesn't fallback on the id if there's no match on the
# name
nb_roles = Role.count()
xml_export = xml_export_orig.replace(b'<item role_id="3">Test Role A</item>',
b'<item role_id="3">Test Role C</item>')
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
assert wf3.possible_status[0].items[0].by != ['3']
assert Role.count() == nb_roles+1
# on the other hand, check that it uses the id when included_id is True
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)),
include_id=True)
assert wf3.possible_status[0].items[0].by == ['3']
def test_display_form_action(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
display_form = FormWorkflowStatusItem()
display_form.id = '_x'
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields.append(StringField(label='Test', type='string'))
display_form.formdef.fields.append(StringField(label='Test2', type='string'))
st1.items.append(display_form)
display_form.parent = st1
wf2 = assert_import_export_works(wf)
# formdef.max_field_id is recalculated when importing a FormWorkflowStatusItem
assert wf2.possible_status[0].items[0].formdef.max_field_id == len(display_form.formdef.fields)
def test_export_to_model_action(pub):
wf = Workflow(name='status')
wf.store()
st1 = wf.add_status('Status1', 'st1')
from quixote.http_request import Upload
from wcs.qommon.form import UploadedFile
from wcs.workflows import ExportToModel
export_to = ExportToModel()
export_to.label = 'test'
upload = Upload('/foo/bar', content_type='application/vnd.oasis.opendocument.text')
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00'''
upload.fp = BytesIO()
upload.fp.write(file_content)
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.APP_DIR, None, upload)
st1.items.append(export_to)
export_to.parent = st1
assert wf.possible_status[0].items[0].model_file.base_filename == 'bar'
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].model_file.base_filename == 'bar'
assert wf2.possible_status[0].items[0].model_file.get_file().read() == file_content
# and test with an empty file
export_to = ExportToModel()
export_to.label = 'test'
upload = Upload('/foo/bar', content_type='text/rtf')
file_content = b''
upload.fp = BytesIO()
upload.fp.write(file_content)
upload.fp.seek(0)
export_to.model_file = UploadedFile(pub.APP_DIR, None, upload)
st1.items = [export_to]
export_to.parent = st1
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].model_file.get_file().read() == file_content
wf2 = assert_import_export_works(wf, include_id=True)
wf3 = assert_import_export_works(wf2, include_id=True)
assert wf2.possible_status[0].items[0].model_file.filename == \
wf3.possible_status[0].items[0].model_file.filename
wf3 = assert_import_export_works(wf2, include_id=False)
assert wf2.possible_status[0].items[0].model_file.filename != \
wf3.possible_status[0].items[0].model_file.filename
def test_export_roles(pub):
wf = Workflow(name='roles')
wf.roles = {'foo': 'Bar'}
wf2 = assert_import_export_works(wf)
assert wf2.roles == wf.roles
def test_jump_action(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
from wcs.wf.jump import JumpWorkflowStatusItem
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.condition = {'type': 'python', 'value': '"foo"'}
jump.trigger = 'bar'
jump.timeout = 1200
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].condition == {'type': 'python', 'value': '"foo"'}
assert wf2.possible_status[0].items[0].trigger == 'bar'
assert wf2.possible_status[0].items[0].timeout == 1200
# check with a formula as timeout
jump.timeout = '=1200'
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].timeout == '=1200'
# legacy condition value
workflow_xml = wf.export_to_xml(wf)
condition = workflow_xml.findall('possible_status/*/items/item/condition')[0]
condition.clear()
condition.text = '"foo"'
wf2 = Workflow.import_from_xml_tree(workflow_xml)
assert wf2.possible_status[0].items[0].condition == {'type': 'python', 'value': '"foo"'}
def test_commentable_action(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
commentable.button_label = None
st1.items.append(commentable)
commentable.parent = st1
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].button_label is None
assert wf2.possible_status[0].items[0].required is False
commentable.required = True
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].required is True
# import legacy comment without required attribute
xml_export = ET.tostring(export_to_indented_xml(wf))
assert b'<required>True</required>' in xml_export
xml_export = xml_export.replace(b'<required>True</required>', b'')
assert b'<required>True</required>' not in xml_export
wf2 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)))
assert wf2.possible_status[0].items[0].required is False
commentable.button_label = 'button label'
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].button_label == 'button label'
def test_variables_formdef(pub):
wf = Workflow(name='variables')
from wcs.workflows import WorkflowVariablesFieldsFormDef
wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
wf.variables_formdef.fields.append(StringField(label='Test', type='string'))
wf2 = assert_import_export_works(wf)
assert wf2.variables_formdef.fields[0].label == 'Test'
def test_wscall_action(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
wscall = WebserviceCallStatusItem()
wscall.id = '_wscall'
wscall.url = 'http://test/'
wscall.varname = 'varname'
wscall.post = False
wscall.request_signature_key = 'key'
wscall.post_data = {'one': '1', 'two': '=2', 'good:name': 'ok'}
wscall.qs_data = {'one': '2', 'two': '=3', 'good:name': 'ok'}
st1.items.append(wscall)
wscall.parent = st1
wf2 = assert_import_export_works(wf)
wscall2 = wf2.possible_status[0].items[0]
assert wscall2.url == 'http://test/'
assert wscall2.varname == 'varname'
assert wscall2.post == False
assert wscall2.request_signature_key == 'key'
assert wscall2.post_data == {'one': '1', 'two': '=2', 'good:name': 'ok'}
assert wscall2.qs_data == {'one': '2', 'two': '=3', 'good:name': 'ok'}
def test_backoffice_info_text(pub):
wf = Workflow(name='info texts')
st1 = wf.add_status('Status1', 'st1')
st1.backoffice_info_text = '<p>Foo</p>'
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
commentable.backoffice_info_text = '<p>Bar</p>'
st1.items.append(commentable)
commentable.parent = st1
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].backoffice_info_text == '<p>Foo</p>'
assert wf2.possible_status[0].items[0].backoffice_info_text == '<p>Bar</p>'
def test_global_actions(pub):
role = Role()
role.id = '5'
role.name = 'Test Role'
role.store()
wf = Workflow(name='global actions')
ac1 = wf.add_global_action('Action', 'ac1')
ac1.backoffice_info_text = '<p>Foo</p>'
add_to_journal = RegisterCommenterWorkflowStatusItem()
add_to_journal.id = '_add_to_journal'
add_to_journal.comment = 'HELLO WORLD'
ac1.items.append(add_to_journal)
add_to_journal.parent = ac1
trigger = ac1.triggers[0]
assert trigger.key == 'manual'
trigger.roles = [role.id]
wf2 = assert_import_export_works(wf)
assert wf2.global_actions[0].triggers[0].roles == [role.id]
wf2 = assert_import_export_works(wf, True)
def test_backoffice_fields(pub):
wf = Workflow(name='bo fields')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
StringField(id='bo1', label='1st backoffice field',
type='string', varname='backoffice_blah'),
]
wf2 = assert_import_export_works(wf, True)
def test_complex_dispatch_action(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
Role.wipe()
role1 = Role()
role1.name = 'Test Role 1'
role1.store()
role2 = Role()
role2.name = 'Test Role 2'
role2.store()
dispatch = DispatchWorkflowStatusItem()
dispatch.id = '_dispatch'
dispatch.role_key = '_receiver'
dispatch.dispatch_type = 'automatic'
dispatch.variable = 'plop'
dispatch.rules = [{'value': 'a', 'role_id': role1.id},
{'value': 'b', 'role_id': role2.id}]
st1.items.append(dispatch)
dispatch.parent = st1
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].variable == dispatch.variable
assert wf2.possible_status[0].items[0].rules == dispatch.rules
assert wf2.possible_status[0].items[0].dispatch_type == 'automatic'
Role.wipe()
role3 = Role()
role3.name = 'Test Role 1'
role3.store()
role4 = Role()
role4.name = 'Test Role 2'
role4.store()
role1.remove_self()
role2.remove_self()
xml_export_orig = export_to_indented_xml(wf, include_id=True)
wf2 = Workflow.import_from_xml_tree(xml_export_orig)
assert wf2.possible_status[0].items[0].variable == dispatch.variable
assert wf2.possible_status[0].items[0].rules == [
{'value': 'a', 'role_id': role3.id}, {'value': 'b', 'role_id': role4.id}]
assert wf2.possible_status[0].items[0].dispatch_type == 'automatic'
# check rules are not exported with dispatch type is not automatic
dispatch.dispatch_type = 'manual'
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].dispatch_type == 'manual'
assert not wf2.possible_status[0].items[0].rules
xml_export = export_to_indented_xml(wf, include_id=True)
assert xml_export.find('possible_status/status/items/item/rules') is None
def test_display_message_action(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
from wcs.workflows import DisplayMessageWorkflowStatusItem
display = DisplayMessageWorkflowStatusItem()
display.message = 'hey'
display.to = ['_submitter', '1']
st1.items.append(display)
display.parent = st1
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].message == display.message
for role_id in display.to:
assert role_id in wf2.possible_status[0].items[0].to
wf2 = assert_import_export_works(wf, include_id=True)
def test_sendmail_other_destination(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
sendmail = SendmailWorkflowStatusItem()
sendmail.to = ['_submitter']
st1.items.append(sendmail)
sendmail.parent = st1
Role.wipe()
wf2 = assert_import_export_works(wf)
assert Role.count() == 0
sendmail.to = ['_submitter', '=form_var_plop', '[form_var_plop]', '{{ form_var_plop }}', 'foobar@localhost']
wf2 = assert_import_export_works(wf)
assert Role.count() == 0
assert wf2.possible_status[0].items[0].to == sendmail.to
def test_sendmail_attachments(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
sendmail = SendmailWorkflowStatusItem()
st1.items.append(sendmail)
sendmail.parent = st1
sendmail.attachments = ['form_var_file_raw', 'form_fbo1']
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].attachments == sendmail.attachments
sendmail.attachments = []
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].attachments == []
def test_sms(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
sendsms = SendSMSWorkflowStatusItem()
sendsms.to = ['0123456789', '']
sendsms.body = 'hello'
st1.items.append(sendsms)
sendsms.parent = st1
Role.wipe()
wf2 = assert_import_export_works(wf)
assert Role.count() == 0
assert wf2.possible_status[0].items[0].to == sendsms.to
def test_criticality_level(pub):
wf = Workflow(name='criticality level')
wf.criticality_levels = [
WorkflowCriticalityLevel(name='green'),
WorkflowCriticalityLevel(name='yellow'),
WorkflowCriticalityLevel(name='red', colour='FF0000'),
]
wf2 = assert_import_export_works(wf)
assert wf2.criticality_levels[0].name == 'green'
assert wf2.criticality_levels[1].name == 'yellow'
def test_global_timeout_trigger(pub):
wf = Workflow(name='global actions')
ac1 = wf.add_global_action('Action', 'ac1')
trigger = ac1.append_trigger('timeout')
trigger.timeout = '2'
trigger.anchor = 'creation'
wf2 = assert_import_export_works(wf, include_id=True)
assert wf2.global_actions[0].triggers[-1].id == trigger.id
assert wf2.global_actions[0].triggers[-1].anchor == trigger.anchor
def test_global_webservice_trigger(pub):
wf = Workflow(name='global actions')
ac1 = wf.add_global_action('Action', 'ac1')
trigger = ac1.append_trigger('webservice')
trigger.identifier = 'plop'
wf2 = assert_import_export_works(wf, include_id=True)
assert wf2.global_actions[0].triggers[-1].id == trigger.id
assert wf2.global_actions[0].triggers[-1].identifier == trigger.identifier
def test_profile_action(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
item = UpdateUserProfileStatusItem()
item.id = '_item'
item.fields = [{'field_id': '__email', 'value': '=form_var_foo'}]
st1.items.append(item)
item.parent = st1
wf2 = assert_import_export_works(wf)
item2 = wf2.possible_status[0].items[0]
assert item2.fields == [{'field_id': '__email', 'value': '=form_var_foo'}]
def test_set_backoffice_fields_action(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
item = SetBackofficeFieldsWorkflowStatusItem()
item.id = '_item'
item.fields = [{'field_id': 'bo1', 'value': '=form_var_foo'}]
st1.items.append(item)
item.parent = st1
wf2 = assert_import_export_works(wf)
item2 = wf2.possible_status[0].items[0]
assert item2.fields == [{'field_id': 'bo1', 'value': '=form_var_foo'}]
def test_action_condition(pub):
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
sendmail = SendmailWorkflowStatusItem()
st1.items.append(sendmail)
sendmail.parent = st1
wf2 = assert_import_export_works(wf)
sendmail.condition = {} # should not be created any longer
wf2 = Workflow.import_from_xml_tree(wf.export_to_xml(False), False)
assert wf2.possible_status[0].items[0].condition is None
sendmail.condition = {'type': 'python', 'value': 'True'}
wf2 = assert_import_export_works(wf)