workflow_tests: apply global action timeout trigger on skip time (#88404)
gitea/wcs/pipeline/head This commit looks good
Details
gitea/wcs/pipeline/head This commit looks good
Details
This commit is contained in:
parent
462228fd23
commit
6a26c0ef91
|
@ -528,6 +528,99 @@ def test_workflow_tests_automatic_jump_timeout(pub):
|
|||
testdef.run(formdef)
|
||||
|
||||
|
||||
@pytest.mark.freeze_time('2024-02-19 12:00')
|
||||
def test_workflow_tests_global_action_timeout(pub):
|
||||
user = pub.user_class(name='test user')
|
||||
user.store()
|
||||
|
||||
workflow = Workflow(name='Workflow One')
|
||||
new_status = workflow.add_status(name='New status')
|
||||
end_status = workflow.add_status(name='End status')
|
||||
|
||||
global_action = workflow.add_global_action('Go to end status')
|
||||
trigger = global_action.append_trigger('timeout')
|
||||
trigger.anchor = 'creation'
|
||||
trigger.timeout = 1
|
||||
|
||||
jump = global_action.add_action('jump')
|
||||
jump.status = end_status.id
|
||||
|
||||
# add choice so that new_status is not flagged as endpoint
|
||||
choice = new_status.add_action('choice')
|
||||
choice.label = 'Go to end status'
|
||||
choice.status = end_status.id
|
||||
|
||||
workflow.store()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test title'
|
||||
formdef.workflow_id = workflow.id
|
||||
formdef.store()
|
||||
|
||||
formdata = formdef.data_class()()
|
||||
formdata.just_created()
|
||||
|
||||
testdef = TestDef.create_from_formdata(formdef, formdata)
|
||||
testdef.agent_id = user.id
|
||||
testdef.workflow_tests.actions = [
|
||||
workflow_tests.AssertStatus(status_name='New status'),
|
||||
workflow_tests.SkipTime(seconds=60 * 60), # 1 hour
|
||||
workflow_tests.AssertStatus(status_name='New status'),
|
||||
workflow_tests.SkipTime(seconds=24 * 60 * 60), # 1 day
|
||||
workflow_tests.AssertStatus(status_name='End status'),
|
||||
]
|
||||
|
||||
testdef.run(formdef)
|
||||
|
||||
trigger.anchor = '1st-arrival'
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
testdef.run(formdef)
|
||||
|
||||
trigger.anchor = 'latest-arrival'
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
testdef.run(formdef)
|
||||
|
||||
trigger.anchor = 'template'
|
||||
trigger.anchor_template = '{{ form_receipt_date|date:"Y-m-d" }}'
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
testdef.run(formdef)
|
||||
|
||||
trigger.anchor = 'finalized'
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'Form should be in status "End status" but is in status "New status".'
|
||||
|
||||
# remove choice so new status becomes endpoint
|
||||
new_status.items = [x for x in new_status.items if x.id != choice.id]
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
testdef.run(formdef)
|
||||
|
||||
trigger.anchor = 'anonymisation'
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
with pytest.raises(WorkflowTestError) as excinfo:
|
||||
testdef.run(formdef)
|
||||
assert str(excinfo.value) == 'Form should be in status "End status" but is in status "New status".'
|
||||
|
||||
new_status.add_action('anonymise')
|
||||
workflow.store()
|
||||
formdef.refresh_from_storage()
|
||||
|
||||
testdef.run(formdef)
|
||||
|
||||
|
||||
@mock.patch('wcs.qommon.emails.send_email')
|
||||
def test_workflow_tests_sendmail(mocked_send_email, pub):
|
||||
role = pub.role_class(name='test role')
|
||||
|
|
|
@ -38,6 +38,7 @@ from wcs.qommon.xml_storage import XmlStorableObject
|
|||
from wcs.testdef import TestError
|
||||
from wcs.wf.backoffice_fields import SetBackofficeFieldRowWidget, SetBackofficeFieldsTableWidget
|
||||
from wcs.wf.profile import FieldNode
|
||||
from wcs.workflows import WorkflowGlobalActionTimeoutTrigger
|
||||
|
||||
|
||||
class WorkflowTestError(TestError):
|
||||
|
@ -551,6 +552,10 @@ class SkipTime(WorkflowTestAction):
|
|||
def perform(self, formdata):
|
||||
formdata.frozen_datetime.tick(self.seconds)
|
||||
|
||||
self.apply_jumps(formdata)
|
||||
self.apply_global_actions_timeout(formdata)
|
||||
|
||||
def apply_jumps(self, formdata):
|
||||
jump_actions = []
|
||||
status = formdata.get_status()
|
||||
for item in status.items:
|
||||
|
@ -570,6 +575,15 @@ class SkipTime(WorkflowTestAction):
|
|||
wf.jump.jump_and_perform(formdata, jump_action)
|
||||
break
|
||||
|
||||
def apply_global_actions_timeout(self, formdata):
|
||||
data_class = formdata.formdef.data_class()
|
||||
data_class.select_iterator = lambda *args, **kwargs: [formdata]
|
||||
formdata.formdef.data_class = lambda: data_class
|
||||
formdata.formdef.workflow.formdefs = lambda: [formdata.formdef]
|
||||
formdata.formdef.workflow.carddefs = lambda: []
|
||||
|
||||
WorkflowGlobalActionTimeoutTrigger.apply(formdata.formdef.workflow)
|
||||
|
||||
def fill_admin_form(self, form, formdef):
|
||||
form.add(
|
||||
StringWidget,
|
||||
|
|
Loading…
Reference in New Issue