tests: add new timeout & remove workflow tests

This commit is contained in:
Frédéric Péters 2015-03-20 11:47:31 +01:00
parent 596838802d
commit 9e75fe6793
1 changed files with 83 additions and 5 deletions

View File

@ -1,5 +1,7 @@
import datetime
import pytest
import shutil
import time
from quixote import cleanup
from quixote.http_request import HTTPRequest
@ -7,17 +9,17 @@ from quixote.http_request import HTTPRequest
from wcs.formdef import FormDef
from wcs import sessions
from wcs.roles import Role
from wcs.workflows import WorkflowStatusItem, SendmailWorkflowStatusItem
from wcs.workflows import Workflow, WorkflowStatusItem, SendmailWorkflowStatusItem
from wcs.wf.anonymise import AnonymiseWorkflowStatusItem
from wcs.wf.dispatch import DispatchWorkflowStatusItem
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.wf.jump import JumpWorkflowStatusItem, _apply_timeouts
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
from wcs.wf.remove import RemoveWorkflowStatusItem
from wcs.wf.roles import AddRoleWorkflowStatusItem, RemoveRoleWorkflowStatusItem
from wcs.wf.wscall import WebserviceCallStatusItem
from utilities import (create_temporary_pub, MockSubstitutionVariables, emails,
http_requests)
http_requests, clean_temporary_pub)
def setup_module(module):
cleanup()
@ -30,9 +32,15 @@ def setup_module(module):
req.session = sessions.BasicSession(id=1)
def teardown_module(module):
global pub
shutil.rmtree(pub.APP_DIR)
clean_temporary_pub()
def pytest_generate_tests(metafunc):
if 'two_pubs' in metafunc.fixturenames:
metafunc.parametrize('two_pubs', ['pickle', 'sql'], indirect=True)
@pytest.fixture
def two_pubs(request):
return create_temporary_pub(sql_mode=(request.param == 'sql'))
def test_jump_nothing():
FormDef.wipe()
@ -374,3 +382,73 @@ def test_webservice_call():
item.request_signature_key = '[bar]'
item.perform(formdata)
assert 'signature=' in http_requests.get_last('url')
def test_timeout():
workflow = Workflow(name='timeout')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_id = workflow.id
assert formdef.get_workflow().id == workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdata_id = formdata.id
time.sleep(0.3)
_apply_timeouts(pub)
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
def test_timeout_then_remove(two_pubs):
workflow = Workflow(name='timeout-then-remove')
st1 = workflow.add_status('Status1', 'st1')
st2 = workflow.add_status('Status2', 'st2')
jump = JumpWorkflowStatusItem()
jump.id = '_jump'
jump.by = ['_submitter', '_receiver']
jump.timeout = 0.1
jump.status = 'st2'
st1.items.append(jump)
jump.parent = st1
remove = RemoveWorkflowStatusItem()
st2.items.append(remove)
remove.parent = st2
workflow.store()
formdef = FormDef()
formdef.name = 'baz%s' % id(two_pubs)
formdef.fields = []
formdef.workflow_id = workflow.id
assert formdef.get_workflow().id == workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdata_id = formdata.id
assert str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
time.sleep(0.2)
_apply_timeouts(pub)
assert not str(formdata_id) in [str(x) for x in formdef.data_class().keys()]