wcs/tests/test_workflow_import.py

268 lines
8.2 KiB
Python

import sys
import shutil
import StringIO
import xml.etree.ElementTree as ET
from quixote import cleanup
from wcs import publisher
from wcs.workflows import Workflow, CommentableWorkflowStatusItem
from wcs.roles import Role
from wcs.fields import StringField
from qommon.misc import indent_xml as indent
from utilities import create_temporary_pub
def setup_module(module):
cleanup()
global pub
pub = create_temporary_pub()
def teardown_module(module):
shutil.rmtree(pub.APP_DIR)
def export_to_indented_xml(workflow, include_id=False):
workflow_xml = workflow.export_to_xml(include_id=include_id)
indent(workflow_xml)
return workflow_xml
def assert_import_export_works(wf, include_id=False):
wf2 = Workflow.import_from_xml_tree(wf.export_to_xml(include_id), include_id)
assert ET.tostring(export_to_indented_xml(wf)) == ET.tostring(export_to_indented_xml(wf2))
return wf2
def test_empty():
wf = Workflow(name='empty')
assert_import_export_works(wf)
def test_status():
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
assert_import_export_works(wf)
def test_status_actions():
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
st1.items.append(commentable)
commentable.parent = st1
assert_import_export_works(wf)
def test_default_wf():
wf = Workflow.get_default_workflow()
assert_import_export_works(wf)
def test_action_dispatch():
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
role = Role()
role.id = 5
role.name = 'Test Role'
role.store()
from wcs.wf.dispatch import DispatchWorkflowStatusItem
dispatch = DispatchWorkflowStatusItem()
dispatch.id = '_x'
dispatch.role_id = 5
dispatch.role_key = 'plop'
st1.items.append(dispatch)
dispatch.parent = st1
wf2 = assert_import_export_works(wf)
# checks role id is imported as integer
assert(wf2.possible_status[0].items[0].role_id == 5)
def test_status_actions_named_role():
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['logged-users']
st1.items.append(commentable)
commentable.parent = st1
assert_import_export_works(wf)
def test_status_actions_named_existing_role():
role = Role()
role.id = 2
role.name = 'Test Role named existing role'
role.store()
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = [2]
st1.items.append(commentable)
commentable.parent = st1
wf2 = assert_import_export_works(wf)
assert '<item role_id="2">Test Role named existing role</item>' in ET.tostring(indent(wf.export_to_xml()))
assert wf2.possible_status[0].items[0].by == [2]
# check that it works even if the role_id is not set
xml_export_orig = ET.tostring(export_to_indented_xml(wf))
xml_export = xml_export_orig.replace(
'<item role_id="2">Test Role named existing role</item>',
'<item>Test Role named existing role</item>')
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)))
assert wf3.possible_status[0].items[0].by == [2]
def test_status_actions_named_missing_role():
role = Role()
role.id = 3
role.name = 'Test Role A'
role.store()
role = Role()
role.id = 4
role.name = 'Test Role B'
role.store()
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = [3]
st1.items.append(commentable)
commentable.parent = st1
wf2 = assert_import_export_works(wf)
# check that role name has precedence over id
xml_export_orig = ET.tostring(export_to_indented_xml(wf))
assert '<item role_id="3">Test Role A</item>' in xml_export_orig
xml_export = xml_export_orig.replace('<item role_id="3">Test Role A</item>',
'<item role_id="4">Test Role A</item>')
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)))
assert wf3.possible_status[0].items[0].by == [3]
# check that it creates a new role if there's no match on id and name
xml_export = xml_export_orig.replace('<item role_id="3">Test Role A</item>',
'<item role_id="999">foobar</item>')
nb_roles = Role.count()
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)))
assert Role.count() == nb_roles+1
# check that it doesn't fallback on the id if there's no match on the
# name
nb_roles = Role.count()
xml_export = xml_export_orig.replace('<item role_id="3">Test Role A</item>',
'<item role_id="3">Test Role C</item>')
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)))
assert wf3.possible_status[0].items[0].by != [3]
assert Role.count() == nb_roles+1
# on the other hand, check that it uses the id when included_id is True
wf3 = Workflow.import_from_xml_tree(ET.parse(StringIO.StringIO(xml_export)),
include_id=True)
assert wf3.possible_status[0].items[0].by == [3]
def test_display_form_action():
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
display_form = FormWorkflowStatusItem()
display_form.id = '_x'
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields.append(StringField(label='Test', type='string'))
st1.items.append(display_form)
display_form.parent = st1
wf2 = assert_import_export_works(wf)
def test_export_to_model_action():
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
from quixote.http_request import Upload
from wcs.qommon.form import UploadedFile
from wcs.workflows import ExportToModel
export_to = ExportToModel()
export_to.label = 'test'
upload = Upload('/foo/bar', content_type='text/rtf')
upload.fp = StringIO.StringIO()
upload.fp.write('HELLO WORLD')
upload.fp.seek(0)
export_to.model_file = UploadedFile(publisher.WcsPublisher.APP_DIR, None, upload)
st1.items.append(export_to)
export_to.parent = st1
assert wf.possible_status[0].items[0].model_file.base_filename == 'bar'
assert wf.possible_status[0].items[0].model_file.base_filename == 'bar'
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].model_file.base_filename == 'bar'
def test_export_roles():
wf = Workflow(name='roles')
wf.roles = {'foo': 'Bar'}
wf2 = assert_import_export_works(wf)
assert wf2.roles == wf.roles
def test_jump_action():
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
st2 = wf.add_status('Status2', 'st2')
from wcs.wf.jump import JumpWorkflowStatusItem
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.condition = '"foo"'
jump.trigger = 'bar'
jump.timeout = 1200
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].condition == '"foo"'
assert wf2.possible_status[0].items[0].trigger == 'bar'
assert wf2.possible_status[0].items[0].timeout == 1200
def test_commentable_action():
wf = Workflow(name='status')
st1 = wf.add_status('Status1', 'st1')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
commentable.button_label = None
st1.items.append(commentable)
commentable.parent = st1
wf2 = assert_import_export_works(wf)
assert wf2.possible_status[0].items[0].button_label is None