319 lines
10 KiB
Python
319 lines
10 KiB
Python
import sys
|
|
import shutil
|
|
import StringIO
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from quixote import cleanup
|
|
from wcs import publisher
|
|
|
|
from wcs.workflows import Workflow, CommentableWorkflowStatusItem
|
|
from wcs.wf.wscall import WebserviceCallStatusItem
|
|
from wcs.roles import Role
|
|
from wcs.fields import StringField
|
|
|
|
from qommon.misc import indent_xml as indent
|
|
|
|
from utilities import create_temporary_pub
|
|
|
|
|
|
def setup_module(module):
|
|
cleanup()
|
|
global pub
|
|
pub = create_temporary_pub()
|
|
|
|
|
|
def teardown_module(module):
|
|
shutil.rmtree(pub.APP_DIR)
|
|
|
|
def export_to_indented_xml(workflow, include_id=False):
|
|
workflow_xml = workflow.export_to_xml(include_id=include_id)
|
|
indent(workflow_xml)
|
|
return workflow_xml
|
|
|
|
def assert_import_export_works(wf, include_id=False):
|
|
wf2 = Workflow.import_from_xml_tree(wf.export_to_xml(include_id), include_id)
|
|
assert ET.tostring(export_to_indented_xml(wf)) == ET.tostring(export_to_indented_xml(wf2))
|
|
return wf2
|
|
|
|
def test_empty():
|
|
wf = Workflow(name='empty')
|
|
assert_import_export_works(wf)
|
|
|
|
def test_status():
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
st2 = wf.add_status('Status2', 'st2')
|
|
assert_import_export_works(wf)
|
|
|
|
def test_status_actions():
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
st2 = wf.add_status('Status2', 'st2')
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = ['_submitter', '_receiver']
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
assert_import_export_works(wf)
|
|
|
|
def test_default_wf():
|
|
wf = Workflow.get_default_workflow()
|
|
assert_import_export_works(wf)
|
|
|
|
|
|
def test_action_dispatch():
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
role = Role()
|
|
role.id = '5'
|
|
role.name = 'Test Role'
|
|
role.store()
|
|
|
|
|
|
from wcs.wf.dispatch import DispatchWorkflowStatusItem
|
|
|
|
dispatch = DispatchWorkflowStatusItem()
|
|
dispatch.id = '_x'
|
|
dispatch.role_id = 5
|
|
dispatch.role_key = 'plop'
|
|
st1.items.append(dispatch)
|
|
dispatch.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
|
|
# checks role id is imported as integer
|
|
assert(wf2.possible_status[0].items[0].role_id == '5')
|
|
|
|
|
|
def test_status_actions_named_role():
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
st2 = wf.add_status('Status2', 'st2')
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = ['logged-users']
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
assert_import_export_works(wf)
|
|
|
|
|
|
def test_status_actions_named_existing_role():
|
|
role = Role()
|
|
role.id = '2'
|
|
role.name = 'Test Role named existing role'
|
|
role.store()
|
|
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
st2 = wf.add_status('Status2', 'st2')
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = [2]
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
assert '<item role_id="2">Test Role named existing role</item>' in ET.tostring(indent(wf.export_to_xml()))
|
|
assert wf2.possible_status[0].items[0].by == ['2']
|
|
|
|
# check that it works even if the role_id is not set
|
|
xml_export_orig = ET.tostring(export_to_indented_xml(wf))
|
|
xml_export = xml_export_orig.replace(
|
|
'<item role_id="2">Test Role named existing role</item>',
|
|
'<item>Test Role named existing role</item>')
|
|
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)))
|
|
assert wf3.possible_status[0].items[0].by == ['2']
|
|
|
|
|
|
def test_status_actions_named_missing_role():
|
|
role = Role()
|
|
role.id = '3'
|
|
role.name = 'Test Role A'
|
|
role.store()
|
|
|
|
role = Role()
|
|
role.id = '4'
|
|
role.name = 'Test Role B'
|
|
role.store()
|
|
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
st2 = wf.add_status('Status2', 'st2')
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = [3]
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
|
|
# check that role name has precedence over id
|
|
xml_export_orig = ET.tostring(export_to_indented_xml(wf))
|
|
assert '<item role_id="3">Test Role A</item>' in xml_export_orig
|
|
xml_export = xml_export_orig.replace('<item role_id="3">Test Role A</item>',
|
|
'<item role_id="4">Test Role A</item>')
|
|
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)))
|
|
assert wf3.possible_status[0].items[0].by == ['3']
|
|
|
|
# check that it creates a new role if there's no match on id and name
|
|
xml_export = xml_export_orig.replace('<item role_id="3">Test Role A</item>',
|
|
'<item role_id="999">foobar</item>')
|
|
nb_roles = Role.count()
|
|
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)))
|
|
assert Role.count() == nb_roles+1
|
|
|
|
# check that it doesn't fallback on the id if there's no match on the
|
|
# name
|
|
nb_roles = Role.count()
|
|
xml_export = xml_export_orig.replace('<item role_id="3">Test Role A</item>',
|
|
'<item role_id="3">Test Role C</item>')
|
|
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)))
|
|
assert wf3.possible_status[0].items[0].by != ['3']
|
|
assert Role.count() == nb_roles+1
|
|
|
|
# on the other hand, check that it uses the id when included_id is True
|
|
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)),
|
|
include_id=True)
|
|
assert wf3.possible_status[0].items[0].by == ['3']
|
|
|
|
|
|
def test_display_form_action():
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
|
|
|
|
display_form = FormWorkflowStatusItem()
|
|
display_form.id = '_x'
|
|
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
|
display_form.formdef.fields.append(StringField(label='Test', type='string'))
|
|
display_form.formdef.fields.append(StringField(label='Test2', type='string'))
|
|
st1.items.append(display_form)
|
|
display_form.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
# formdef.max_field_id is recalculated when importing a FormWorkflowStatusItem
|
|
assert wf2.possible_status[0].items[0].formdef.max_field_id == len(display_form.formdef.fields)-1
|
|
|
|
|
|
def test_export_to_model_action():
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
from quixote.http_request import Upload
|
|
from wcs.qommon.form import UploadedFile
|
|
from wcs.workflows import ExportToModel
|
|
|
|
export_to = ExportToModel()
|
|
export_to.label = 'test'
|
|
upload = Upload('/foo/bar', content_type='text/rtf')
|
|
upload.fp = StringIO.StringIO()
|
|
upload.fp.write('HELLO WORLD')
|
|
upload.fp.seek(0)
|
|
export_to.model_file = UploadedFile(publisher.WcsPublisher.APP_DIR, None, upload)
|
|
st1.items.append(export_to)
|
|
export_to.parent = st1
|
|
|
|
assert wf.possible_status[0].items[0].model_file.base_filename == 'bar'
|
|
assert wf.possible_status[0].items[0].model_file.base_filename == 'bar'
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].model_file.base_filename == 'bar'
|
|
|
|
|
|
def test_export_roles():
|
|
wf = Workflow(name='roles')
|
|
wf.roles = {'foo': 'Bar'}
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.roles == wf.roles
|
|
|
|
|
|
def test_jump_action():
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
st2 = wf.add_status('Status2', 'st2')
|
|
|
|
from wcs.wf.jump import JumpWorkflowStatusItem
|
|
jump = JumpWorkflowStatusItem()
|
|
jump.id = '_jump'
|
|
jump.by = ['_submitter', '_receiver']
|
|
jump.condition = '"foo"'
|
|
jump.trigger = 'bar'
|
|
jump.timeout = 1200
|
|
jump.status = 'st2'
|
|
st1.items.append(jump)
|
|
jump.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].condition == '"foo"'
|
|
assert wf2.possible_status[0].items[0].trigger == 'bar'
|
|
assert wf2.possible_status[0].items[0].timeout == 1200
|
|
|
|
|
|
def test_commentable_action():
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = ['_submitter', '_receiver']
|
|
commentable.button_label = None
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].items[0].button_label is None
|
|
|
|
|
|
def test_variables_formdef():
|
|
wf = Workflow(name='variables')
|
|
from wcs.workflows import WorkflowVariablesFieldsFormDef
|
|
wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
|
|
wf.variables_formdef.fields.append(StringField(label='Test', type='string'))
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.variables_formdef.fields[0].label == 'Test'
|
|
|
|
def test_wscall_action():
|
|
wf = Workflow(name='status')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
|
|
wscall = WebserviceCallStatusItem()
|
|
wscall.id = '_wscall'
|
|
wscall.url = 'http://test/'
|
|
wscall.varname = 'varname'
|
|
wscall.post = False
|
|
wscall.request_signature_key = 'key'
|
|
wscall.post_data = {'one': '1', 'two': '=2', 'good:name': 'ok'}
|
|
st1.items.append(wscall)
|
|
wscall.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
wscall2 = wf2.possible_status[0].items[0]
|
|
assert wscall2.url == 'http://test/'
|
|
assert wscall2.varname == 'varname'
|
|
assert wscall2.post == False
|
|
assert wscall2.request_signature_key == 'key'
|
|
assert wscall2.post_data == {'one': '1', 'two': '=2', 'good:name': 'ok'}
|
|
|
|
def test_backoffice_info_text():
|
|
wf = Workflow(name='info texts')
|
|
st1 = wf.add_status('Status1', 'st1')
|
|
st1.backoffice_info_text = '<p>Foo</p>'
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = ['_submitter', '_receiver']
|
|
commentable.backoffice_info_text = '<p>Bar</p>'
|
|
st1.items.append(commentable)
|
|
commentable.parent = st1
|
|
|
|
wf2 = assert_import_export_works(wf)
|
|
assert wf2.possible_status[0].backoffice_info_text == '<p>Foo</p>'
|
|
assert wf2.possible_status[0].items[0].backoffice_info_text == '<p>Bar</p>'
|