2508 lines
89 KiB
Python
2508 lines
89 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
import io
|
||
import os
|
||
import re
|
||
|
||
import mock
|
||
import pytest
|
||
from quixote.http_request import Upload as QuixoteUpload
|
||
from webtest import Upload
|
||
|
||
from wcs import fields
|
||
from wcs.carddef import CardDef
|
||
from wcs.formdef import FormDef
|
||
from wcs.qommon.errors import ConnectionError
|
||
from wcs.qommon.form import UploadedFile
|
||
from wcs.qommon.http_request import HTTPRequest
|
||
from wcs.wf.create_carddata import CreateCarddataWorkflowStatusItem
|
||
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
|
||
from wcs.wf.dispatch import DispatchWorkflowStatusItem
|
||
from wcs.wf.edit_carddata import EditCarddataWorkflowStatusItem
|
||
from wcs.wf.export_to_model import ExportToModel
|
||
from wcs.wf.external_workflow import ExternalWorkflowGlobalAction
|
||
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
|
||
from wcs.wf.jump import JumpWorkflowStatusItem
|
||
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
|
||
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
|
||
from wcs.wf.wscall import WebserviceCallStatusItem
|
||
from wcs.workflows import (
|
||
ChoiceWorkflowStatusItem,
|
||
CommentableWorkflowStatusItem,
|
||
DisplayMessageWorkflowStatusItem,
|
||
JumpOnSubmitWorkflowStatusItem,
|
||
Workflow,
|
||
WorkflowBackofficeFieldsFormDef,
|
||
WorkflowCriticalityLevel,
|
||
WorkflowVariablesFieldsFormDef,
|
||
item_classes,
|
||
)
|
||
|
||
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
|
||
from .test_all import create_superuser
|
||
|
||
|
||
def pytest_generate_tests(metafunc):
|
||
if 'pub' in metafunc.fixturenames:
|
||
metafunc.parametrize('pub', ['pickle', 'sql', 'pickle-templates'], indirect=True)
|
||
|
||
|
||
@pytest.fixture
|
||
def pub(request):
|
||
pub = create_temporary_pub(
|
||
sql_mode=bool('sql' in request.param), templates_mode=bool('templates' in request.param)
|
||
)
|
||
|
||
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
|
||
pub.set_app_dir(req)
|
||
pub.cfg['identification'] = {'methods': ['password']}
|
||
pub.cfg['language'] = {'language': 'en'}
|
||
pub.write_cfg()
|
||
|
||
return pub
|
||
|
||
|
||
def teardown_module(module):
|
||
clean_temporary_pub()
|
||
|
||
|
||
def test_workflows(pub):
|
||
create_superuser(pub)
|
||
app = login(get_app(pub))
|
||
app.get('/backoffice/workflows/')
|
||
|
||
|
||
def test_workflows_default(pub):
|
||
create_superuser(pub)
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/')
|
||
assert 'Default' in resp.text
|
||
resp = resp.click(href=r'^_default/')
|
||
assert 'Just Submitted' in resp.text
|
||
assert 'This is the default workflow' in resp.text
|
||
# makes sure it cannot be edited
|
||
assert 'Edit' not in resp.text
|
||
|
||
# and makes sure status are not editable either
|
||
resp = resp.click('Just Submitted')
|
||
assert '<h2>Just Submitted' in resp.text
|
||
assert 'Change Status Name' not in resp.text
|
||
assert 'Delete' not in resp.text
|
||
|
||
|
||
def test_workflows_new(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/')
|
||
|
||
# create a new workflow
|
||
resp = resp.click('New Workflow')
|
||
resp.forms[0]['name'] = 'a new workflow'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/'
|
||
resp = resp.follow()
|
||
assert 'There are not yet any status defined in this workflow' in resp.text
|
||
assert '<svg ' not in resp.text
|
||
|
||
# create a new status
|
||
resp.forms[0]['name'] = 'new status'
|
||
resp = resp.forms[0].submit()
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/'
|
||
resp = resp.follow()
|
||
assert '<svg ' in resp.text
|
||
assert '@import ' not in resp.text
|
||
|
||
# create a new action
|
||
resp = resp.click('new status')
|
||
resp.forms[0]['action-interaction'] = 'Alert'
|
||
resp = resp.forms[0].submit()
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/status/1/'
|
||
resp = resp.follow()
|
||
assert 'Use drag and drop' in resp.text
|
||
|
||
# fill action
|
||
resp = resp.click('Alert')
|
||
resp.forms[0]['message'] = 'bla bla bla'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/status/1/items/'
|
||
resp = resp.follow()
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/status/1/'
|
||
|
||
wf = Workflow.get(1)
|
||
assert wf.name == 'a new workflow'
|
||
assert wf.possible_status[0].name == 'new status'
|
||
assert wf.possible_status[0].items[0].message == 'bla bla bla'
|
||
|
||
|
||
def test_workflows_svg(pub):
|
||
create_superuser(pub)
|
||
pub.role_class.wipe()
|
||
role = pub.role_class(name='foobar')
|
||
role.store()
|
||
Workflow.wipe()
|
||
|
||
workflow = Workflow(name='foo')
|
||
st1 = workflow.add_status(name='baz')
|
||
commentable = CommentableWorkflowStatusItem()
|
||
commentable.id = '_commentable'
|
||
commentable.by = [role.id]
|
||
commentable.label = 'foobar'
|
||
st1.items.append(commentable)
|
||
commentable.parent = st1
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/svg' % workflow.id)
|
||
assert resp.content_type == 'image/svg+xml'
|
||
assert '/static/css/dc2/admin.css' in resp.text
|
||
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/svg' % (workflow.id, st1.id))
|
||
assert resp.content_type == 'image/svg+xml'
|
||
assert '/static/css/dc2/admin.css' in resp.text
|
||
|
||
|
||
def test_workflows_edit(pub):
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click(href='edit')
|
||
assert resp.forms[0]['name'].value == 'foo'
|
||
resp.forms[0]['name'] = 'baz'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/'
|
||
resp = resp.follow()
|
||
assert 'baz' in resp.text
|
||
|
||
|
||
def test_workflows_edit_status(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
resp = resp.click('Change Status Name')
|
||
resp.forms[0]['name'] = 'bza'
|
||
resp = resp.forms[0].submit()
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/status/1/'
|
||
resp = resp.follow()
|
||
assert Workflow.get(1).possible_status[0].name == 'bza'
|
||
|
||
resp = resp.click('Change Display Settings')
|
||
resp.forms[0]['hide_status_from_user'].checked = True
|
||
resp = resp.forms[0].submit()
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/status/1/'
|
||
resp = resp.follow()
|
||
assert Workflow.get(1).possible_status[0].visibility == ['_receiver']
|
||
|
||
resp = resp.click('Change Terminal Status')
|
||
resp.forms[0]['force_terminal_status'].checked = True
|
||
resp = resp.forms[0].submit()
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/status/1/'
|
||
resp = resp.follow()
|
||
assert Workflow.get(1).possible_status[0].forced_endpoint is True
|
||
|
||
resp = resp.click('Change Display Settings')
|
||
assert resp.forms[0]['colour'].value == 'FFFFFF'
|
||
assert resp.forms[0]['extra_css_class'].value == ''
|
||
resp.forms[0]['colour'] = 'FF0000'
|
||
resp.forms[0]['extra_css_class'] = 'plop'
|
||
resp = resp.forms[0].submit()
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/status/1/'
|
||
resp = resp.follow()
|
||
assert Workflow.get(1).possible_status[0].colour == 'FF0000'
|
||
assert Workflow.get(1).possible_status[0].extra_css_class == 'plop'
|
||
|
||
resp = resp.click('Change Display Settings')
|
||
assert resp.forms[0]['colour'].value == 'FF0000'
|
||
assert resp.forms[0]['extra_css_class'].value == 'plop'
|
||
resp.forms[0]['extra_css_class'] = 'xxx'
|
||
resp = resp.forms[0].submit('cancel')
|
||
assert Workflow.get(1).possible_status[0].extra_css_class == 'plop'
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/status/1/'
|
||
resp = resp.follow()
|
||
|
||
resp = resp.click('Change Backoffice Information Text')
|
||
assert resp.forms[0]['backoffice_info_text'].value == ''
|
||
resp.forms[0]['backoffice_info_text'] = '<p>Hello</p>'
|
||
resp = resp.forms[0].submit()
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/status/1/'
|
||
resp = resp.follow()
|
||
assert 'Hello' in Workflow.get(1).possible_status[0].backoffice_info_text
|
||
|
||
|
||
def test_workflows_delete_status(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
resp = resp.click('Delete')
|
||
resp = resp.form.submit('cancel')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/status/1/'
|
||
resp = resp.follow()
|
||
|
||
resp = resp.click('Delete')
|
||
resp = resp.form.submit('submit')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/'
|
||
resp = resp.follow()
|
||
|
||
|
||
def test_workflows_delete(pub):
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'Form title'
|
||
formdef.workflow = workflow
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
create_superuser(pub)
|
||
app = login(get_app(pub))
|
||
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click(href='delete')
|
||
assert 'This workflow is currently in use, you cannot remove it.' in resp.text
|
||
|
||
formdef.remove_self()
|
||
|
||
carddef = CardDef()
|
||
carddef.name = 'Card title'
|
||
carddef.workflow = workflow
|
||
carddef.fields = []
|
||
carddef.store()
|
||
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click(href='delete')
|
||
assert 'This workflow is currently in use, you cannot remove it.' in resp.text
|
||
|
||
carddef.remove_self()
|
||
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click(href='delete')
|
||
resp = resp.forms[0].submit()
|
||
assert resp.location == 'http://example.net/backoffice/workflows/'
|
||
resp = resp.follow()
|
||
assert Workflow.count() == 0
|
||
|
||
|
||
def test_workflows_usage(pub):
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.store()
|
||
|
||
create_superuser(pub)
|
||
app = login(get_app(pub))
|
||
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
assert 'Usage' not in resp.text
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'Form title'
|
||
formdef.workflow = workflow
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
assert 'Usage' in resp.text
|
||
assert '/forms/%s/' % formdef.id in resp.text
|
||
|
||
carddef = CardDef()
|
||
carddef.name = 'Card title'
|
||
carddef.workflow = workflow
|
||
carddef.fields = []
|
||
carddef.store()
|
||
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
assert 'Usage' in resp.text
|
||
assert '/cards/%s/' % carddef.id in resp.text
|
||
|
||
formdef.remove_self()
|
||
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
assert 'Usage' in resp.text
|
||
assert '/cards/%s/' % carddef.id in resp.text
|
||
|
||
|
||
def test_workflows_export_import(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('Export')
|
||
assert resp.content_type == 'application/x-wcs-workflow'
|
||
wf_export = resp.body
|
||
|
||
resp = app.get('/backoffice/workflows/')
|
||
resp = resp.click('Import')
|
||
resp.form['file'] = Upload('xxx.wcs', wf_export)
|
||
resp = resp.form.submit('submit')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/2/'
|
||
resp = resp.follow()
|
||
assert 'This workflow has been successfully imported' in resp.text
|
||
assert Workflow.get(2).name == 'Copy of foo'
|
||
|
||
resp = app.get('/backoffice/workflows/')
|
||
resp = resp.click('Import')
|
||
resp.form['file'] = Upload('xxx.wcs', b'garbage')
|
||
resp = resp.form.submit('submit')
|
||
assert 'Invalid File' in resp.text
|
||
assert Workflow.count() == 2
|
||
|
||
|
||
def test_workflows_import_from_url(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('Export')
|
||
assert resp.content_type == 'application/x-wcs-workflow'
|
||
wf_export = resp.body
|
||
|
||
resp = app.get('/backoffice/workflows/')
|
||
resp = resp.click('Import')
|
||
resp = resp.form.submit()
|
||
assert 'You have to enter a file or a URL' in resp
|
||
|
||
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
||
urlopen.side_effect = ConnectionError('...')
|
||
resp.form['url'] = 'http://remote.invalid/test.wcs'
|
||
resp = resp.form.submit()
|
||
assert 'Error loading form' in resp
|
||
urlopen.side_effect = lambda *args: io.StringIO(wf_export.decode())
|
||
resp.form['url'] = 'http://remote.invalid/test.wcs'
|
||
resp = resp.form.submit()
|
||
assert Workflow.count() == 2
|
||
|
||
|
||
def test_workflows_export_import_create_role(pub):
|
||
create_superuser(pub)
|
||
|
||
pub.role_class.wipe()
|
||
role = pub.role_class()
|
||
role.name = 'PLOP'
|
||
role.store()
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
st1 = workflow.add_status(name='baz')
|
||
commentable = CommentableWorkflowStatusItem()
|
||
commentable.id = '_commentable'
|
||
commentable.by = [role.id]
|
||
st1.items.append(commentable)
|
||
commentable.parent = st1
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('Export')
|
||
assert resp.content_type == 'application/x-wcs-workflow'
|
||
wf_export = resp.body
|
||
|
||
resp = app.get('/backoffice/workflows/')
|
||
resp = resp.click('Import')
|
||
resp.form['file'] = Upload('xxx.wcs', wf_export)
|
||
resp = resp.form.submit('submit')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/2/'
|
||
resp = resp.follow()
|
||
assert 'This workflow has been successfully imported' in resp.text
|
||
assert Workflow.get(2).name == 'Copy of foo'
|
||
assert Workflow.get(2).possible_status[0].items[0].by == [role.id]
|
||
|
||
role.remove_self()
|
||
|
||
# automatically create role
|
||
resp = app.get('/backoffice/workflows/')
|
||
resp = resp.click('Import')
|
||
resp.form['file'] = Upload('xxx.wcs', wf_export)
|
||
resp = resp.form.submit('submit')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/3/'
|
||
resp = resp.follow()
|
||
assert 'This workflow has been successfully imported' in resp.text
|
||
assert Workflow.get(3).name == 'Copy of foo (2)'
|
||
assert pub.role_class.count() == 1
|
||
assert pub.role_class.select()[0].name == 'PLOP'
|
||
assert Workflow.get(3).possible_status[0].items[0].by == [pub.role_class.select()[0].id]
|
||
|
||
# don't create role if they are managed by the identity provider
|
||
pub.role_class.wipe()
|
||
|
||
pub.cfg['sp'] = {'idp-manage-roles': True}
|
||
pub.write_cfg()
|
||
resp = app.get('/backoffice/workflows/')
|
||
resp = resp.click('Import')
|
||
resp.form['file'] = Upload('xxx.wcs', wf_export)
|
||
resp = resp.form.submit('submit')
|
||
assert 'Invalid File (Unknown referenced role (PLOP))' in resp.text
|
||
|
||
|
||
def test_workflows_duplicate(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('Duplicate')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/2/'
|
||
resp = resp.follow()
|
||
assert Workflow.get(2).name == 'foo (copy)'
|
||
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('Duplicate')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/3/'
|
||
resp = resp.follow()
|
||
assert Workflow.get(3).name == 'foo (copy 2)'
|
||
|
||
|
||
def test_workflows_add_all_actions(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
for category in ('status-change', 'interaction', 'formdata-action', 'user-action'):
|
||
for action in [x[0] for x in resp.forms[0]['action-%s' % category].options if x[0]]:
|
||
resp.forms[0]['action-%s' % category] = action
|
||
resp = resp.forms[0].submit()
|
||
resp = resp.follow()
|
||
|
||
i = 1
|
||
for category in ('status-change', 'interaction', 'formdata-action', 'user-action'):
|
||
for action in [x[0] for x in resp.forms[0]['action-%s' % category].options if x[0]]:
|
||
resp = resp.click('Edit', href='items/%d/' % i, index=0)
|
||
resp = resp.forms[0].submit('cancel')
|
||
resp = resp.follow() # redirect to items/
|
||
resp = resp.follow() # redirect to ./
|
||
i += 1
|
||
|
||
|
||
def test_workflows_check_available_actions(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
assert 'Criticality Levels' not in [x[0] for x in resp.forms[0]['action-formdata-action'].options]
|
||
assert 'SMS' not in [x[0] for x in resp.forms[0]['action-interaction'].options]
|
||
assert 'User Notification' not in [x[0] for x in resp.forms[0]['action-interaction'].options]
|
||
|
||
if not pub.site_options.has_section('variables'):
|
||
pub.site_options.add_section('variables')
|
||
pub.site_options.set('variables', 'portal_url', 'https://www.example.net/')
|
||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||
pub.site_options.write(fd)
|
||
|
||
pub.cfg['sms'] = {'passerelle_url': 'xx', 'sender': 'xx'}
|
||
pub.write_cfg()
|
||
workflow.criticality_levels = [WorkflowCriticalityLevel(name='green')]
|
||
workflow.store()
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
assert 'Criticality Levels' in [x[0] for x in resp.forms[0]['action-formdata-action'].options]
|
||
assert 'SMS' in [x[0] for x in resp.forms[0]['action-interaction'].options]
|
||
assert 'User Notification' in [x[0] for x in resp.forms[0]['action-interaction'].options]
|
||
|
||
for action in ('Criticality Levels', 'SMS', 'User Notification'):
|
||
for category in ('status-change', 'interaction', 'formdata-action', 'user-action'):
|
||
if action in [x[0] for x in resp.forms[0]['action-%s' % category].options if x[0]]:
|
||
resp.forms[0]['action-%s' % category] = action
|
||
resp = resp.forms[0].submit()
|
||
resp = resp.follow()
|
||
|
||
for i in range(3):
|
||
resp = resp.click('Edit', href='items/%d/' % (i + 1), index=0)
|
||
resp = resp.forms[0].submit('cancel')
|
||
resp = resp.follow() # redirect to items/
|
||
resp = resp.follow() # redirect to ./
|
||
|
||
|
||
def test_workflows_edit_dispatch_action(pub):
|
||
create_superuser(pub)
|
||
pub.role_class.wipe()
|
||
role = pub.role_class(name='foobar')
|
||
role.store()
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
resp.forms[0]['action-formdata-action'] = 'Function/Role Linking'
|
||
resp = resp.forms[0].submit()
|
||
resp = resp.follow()
|
||
|
||
resp = resp.click('Function/Role Linking')
|
||
resp.form['rules$element0$value'].value = 'FOOBAR'
|
||
resp.form['rules$element0$role_id'].value = str(role.id)
|
||
resp = resp.form.submit('submit')
|
||
resp = resp.follow()
|
||
resp = resp.follow()
|
||
|
||
resp = resp.click('Function/Role Linking')
|
||
assert resp.form['rules$element0$value'].value == 'FOOBAR'
|
||
resp = resp.form.submit('rules$add_element') # add one
|
||
resp.form['rules$element1$value'].value = 'BARFOO'
|
||
resp.form['rules$element1$role_id'].value = str(role.id)
|
||
resp = resp.form.submit('submit')
|
||
|
||
workflow = Workflow.get(workflow.id)
|
||
assert workflow.possible_status[0].items[0].rules == [
|
||
{'value': 'FOOBAR', 'role_id': '1'},
|
||
{'value': 'BARFOO', 'role_id': '1'},
|
||
]
|
||
|
||
|
||
def test_workflows_edit_dispatch_action_repeated_function(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.roles = {
|
||
'_receiver': 'Recipient',
|
||
'manager': 'Manager',
|
||
'manager2': 'Manager',
|
||
}
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
resp.forms[0]['action-formdata-action'] = 'Function/Role Linking'
|
||
resp = resp.forms[0].submit()
|
||
resp = resp.follow()
|
||
|
||
resp = resp.click('Function/Role Linking')
|
||
resp.form['role_key'].value = 'manager2'
|
||
resp = resp.form.submit('submit')
|
||
resp = resp.follow()
|
||
resp = resp.follow()
|
||
|
||
workflow = Workflow.get(workflow.id)
|
||
assert workflow.possible_status[0].items[0].role_key == 'manager2'
|
||
|
||
|
||
def test_workflows_edit_email_action(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
st1 = workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
resp.forms[0]['action-interaction'] = 'Email'
|
||
resp = resp.forms[0].submit()
|
||
resp = resp.follow()
|
||
|
||
resp = resp.click('Email')
|
||
item_url = resp.request.url
|
||
|
||
ok_strings = [
|
||
'Hello world', # ordinary string
|
||
'Hello [world]', # unknown reference
|
||
'Hello [if-any world][world][end]', # unknown reference, in condition
|
||
'Hello world ][', # random brackets
|
||
]
|
||
|
||
for field in ('body', 'subject'):
|
||
for ok_string in ok_strings:
|
||
resp = app.get(item_url)
|
||
resp.form[field] = ok_string
|
||
resp = resp.form.submit('submit')
|
||
assert resp.location
|
||
|
||
resp = app.get(item_url)
|
||
resp.form[field] = 'Hello {% if world %}{{ world }}{% else %}.'
|
||
resp = resp.form.submit('submit')
|
||
assert 'syntax error in Django template' in resp.text and 'Unclosed tag' in resp.text
|
||
|
||
resp = app.get(item_url)
|
||
resp.form[field] = 'Hello {% if world %}{{ world }}{% else %}.{% endif %}{% endif %}'
|
||
resp = resp.form.submit('submit')
|
||
assert 'syntax error in Django template' in resp.text and 'Invalid block tag' in resp.text
|
||
|
||
resp = app.get(item_url)
|
||
resp.form[field] = 'Hello [if-any world][world][else].'
|
||
resp = resp.form.submit('submit')
|
||
assert 'syntax error in ezt template' in resp.text and 'unclosed block' in resp.text
|
||
|
||
resp = app.get(item_url)
|
||
resp.form[field] = 'Hello [if-any world][world][else].[end] [end]'
|
||
resp = resp.form.submit('submit')
|
||
assert 'syntax error in ezt template' in resp.text and 'unmatched [end]' in resp.text
|
||
|
||
# attachments without backoffice fields: python expressions
|
||
resp = app.get(item_url)
|
||
assert "Attachments (templates or Python expressions)" in resp.text
|
||
resp.form['attachments$element0'] = 'form_var_upload_raw'
|
||
resp = resp.form.submit('submit')
|
||
assert resp.location
|
||
resp = app.get(item_url)
|
||
assert "Attachments (templates or Python expressions)" in resp.text
|
||
assert resp.form['attachments$element0'].value == 'form_var_upload_raw'
|
||
sendmail = Workflow.get(workflow.id).get_status(st1.id).items[0]
|
||
assert sendmail.attachments == ['form_var_upload_raw']
|
||
|
||
# attachments with backoffice fields: select-with-other inputs
|
||
workflow = Workflow.get(workflow.id)
|
||
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
|
||
workflow.backoffice_fields_formdef.fields = [
|
||
fields.FileField(id='bo1-1x', label='bo field 1', type='file', varname='upload'),
|
||
fields.FileField(id='bo2-2x', label='bo field 2', type='file', varname='upload2'),
|
||
fields.FileField(id='bo3-3x', label='bo field varnameless', type='file'),
|
||
]
|
||
workflow.store()
|
||
resp = app.get(item_url)
|
||
assert "Attachments" in resp.text
|
||
assert "Attachments (templates or Python expressions)" not in resp.text
|
||
assert resp.form['attachments$element0$choice'].value == 'form_var_upload_raw'
|
||
assert len(resp.form['attachments$element0$choice'].options) == 5
|
||
resp = resp.form.submit('attachments$add_element') # add one
|
||
resp.form['attachments$element1$choice'] = 'form_var_upload2_raw'
|
||
resp = resp.form.submit('submit')
|
||
assert resp.location
|
||
sendmail = Workflow.get(workflow.id).get_status(st1.id).items[0]
|
||
assert sendmail.attachments == ['form_var_upload_raw', 'form_var_upload2_raw']
|
||
|
||
resp = app.get(item_url)
|
||
resp = resp.form.submit('attachments$add_element') # add one
|
||
resp.form['attachments$element2$choice'] = 'form_fbo3_3x'
|
||
resp = resp.form.submit('submit')
|
||
assert resp.location
|
||
sendmail = Workflow.get(workflow.id).get_status(st1.id).items[0]
|
||
assert sendmail.attachments == ['form_var_upload_raw', 'form_var_upload2_raw', 'form_fbo3_3x']
|
||
|
||
resp = app.get(item_url)
|
||
resp = resp.form.submit('attachments$add_element') # add one
|
||
resp.form['attachments$element3$choice'] = '__other'
|
||
resp.form['attachments$element3$other'] = '{"content":"foo", "filename":"bar.txt"}'
|
||
resp = resp.form.submit('submit')
|
||
assert resp.location
|
||
sendmail = Workflow.get(workflow.id).get_status(st1.id).items[0]
|
||
assert sendmail.attachments == [
|
||
'form_var_upload_raw',
|
||
'form_var_upload2_raw',
|
||
'form_fbo3_3x',
|
||
'{"content":"foo", "filename":"bar.txt"}',
|
||
]
|
||
|
||
# remove some backoffice fields: varnameless fbo3 disapear
|
||
workflow = Workflow.get(workflow.id)
|
||
workflow.backoffice_fields_formdef.fields = [
|
||
fields.FileField(id='bo2', label='bo field 2', type='file', varname='upload2'),
|
||
]
|
||
workflow.store()
|
||
resp = app.get(item_url)
|
||
resp = resp.form.submit('submit')
|
||
assert resp.location
|
||
sendmail = Workflow.get(workflow.id).get_status(st1.id).items[0]
|
||
assert sendmail.attachments == [
|
||
'form_var_upload_raw',
|
||
'form_var_upload2_raw',
|
||
'{"content":"foo", "filename":"bar.txt"}',
|
||
]
|
||
# remove all backoffice fields
|
||
workflow = Workflow.get(workflow.id)
|
||
workflow.backoffice_fields_formdef.fields = []
|
||
workflow.store()
|
||
resp = app.get(item_url)
|
||
assert "Attachments (templates or Python expressions)" in resp.text
|
||
resp = resp.form.submit('submit')
|
||
assert resp.location
|
||
sendmail = Workflow.get(workflow.id).get_status(st1.id).items[0]
|
||
assert sendmail.attachments == [
|
||
'form_var_upload_raw',
|
||
'form_var_upload2_raw',
|
||
'{"content":"foo", "filename":"bar.txt"}',
|
||
]
|
||
|
||
# check condition has been saved as None, not {}.
|
||
assert sendmail.condition is None
|
||
|
||
resp = app.get(item_url)
|
||
resp.form['condition$type'] = 'python'
|
||
resp.form['condition$value_python'] = 'True'
|
||
resp = resp.form.submit('submit')
|
||
sendmail = Workflow.get(workflow.id).get_status(st1.id).items[0]
|
||
assert sendmail.condition == {'type': 'python', 'value': 'True'}
|
||
|
||
|
||
def test_workflows_edit_jump_previous(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
st1 = workflow.add_status(name='baz')
|
||
|
||
jump = JumpWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
jump.timeout = 86400
|
||
st1.items.append(jump)
|
||
jump.parent = st1
|
||
|
||
ac1 = workflow.add_global_action('Action', 'ac1')
|
||
|
||
jump_global = JumpWorkflowStatusItem()
|
||
jump_global.id = '_jump'
|
||
jump_global.timeout = 86400
|
||
ac1.items.append(jump_global)
|
||
jump_global.parent = ac1
|
||
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/status/1/items/_jump/')
|
||
assert 'Previously Marked Status' not in [x[2] for x in resp.form['status'].options]
|
||
|
||
jump.set_marker_on_status = True
|
||
workflow.store()
|
||
resp = app.get('/backoffice/workflows/1/status/1/items/_jump/')
|
||
assert 'Previously Marked Status' in [x[2] for x in resp.form['status'].options]
|
||
|
||
jump.set_marker_on_status = False
|
||
workflow.store()
|
||
resp = app.get('/backoffice/workflows/1/status/1/items/_jump/')
|
||
assert 'Previously Marked Status' not in [x[2] for x in resp.form['status'].options]
|
||
|
||
jump_global.set_marker_on_status = True
|
||
workflow.store()
|
||
resp = app.get('/backoffice/workflows/1/status/1/items/_jump/')
|
||
assert 'Previously Marked Status' in [x[2] for x in resp.form['status'].options]
|
||
|
||
resp = app.get('/backoffice/workflows/1/global-actions/ac1/items/_jump/')
|
||
assert 'Previously Marked Status' in [x[2] for x in resp.form['status'].options]
|
||
assert 'trigger' not in resp.form.fields
|
||
|
||
jump_global.set_marker_on_status = False
|
||
workflow.store()
|
||
resp = app.get('/backoffice/workflows/1/global-actions/ac1/items/_jump/')
|
||
assert 'Previously Marked Status' not in [x[2] for x in resp.form['status'].options]
|
||
|
||
|
||
def test_workflows_edit_jump_timeout(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
st1 = workflow.add_status(name='baz')
|
||
|
||
jump = JumpWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
jump.status = '1'
|
||
jump.timeout = 86400
|
||
st1.items.append(jump)
|
||
jump.parent = st1
|
||
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/status/1/items/_jump/')
|
||
resp.form['timeout'] = ''
|
||
resp = resp.form.submit('submit').follow().follow()
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].timeout is None
|
||
assert 'Automatic Jump (baz)' in resp.text
|
||
|
||
resp = app.get('/backoffice/workflows/1/status/1/items/_jump/')
|
||
resp.form['timeout'] = '90 minutes'
|
||
resp = resp.form.submit('submit').follow().follow()
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].timeout == 5400
|
||
assert 'Automatic Jump (to baz, timeout)' in resp.text
|
||
|
||
resp = app.get('/backoffice/workflows/1/status/1/items/_jump/')
|
||
resp.form['timeout'] = '=5400'
|
||
resp = resp.form.submit('submit').follow().follow()
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].timeout == '=5400'
|
||
assert 'Automatic Jump (to baz, timeout)' in resp.text
|
||
|
||
# check field switched to being a computed expression widget
|
||
resp = app.get('/backoffice/workflows/1/status/1/items/_jump/')
|
||
assert resp.text.count('ComputedExpressionWidget') == 1
|
||
|
||
|
||
def test_workflows_edit_sms_action(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
pub.cfg['sms'] = {'passerelle_url': 'xx', 'sender': 'xx'}
|
||
pub.write_cfg()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
resp.form['action-interaction'] = 'SMS'
|
||
resp = resp.form.submit().follow()
|
||
|
||
resp = resp.click('SMS')
|
||
resp = resp.form.submit('to$add_element')
|
||
resp = resp.form.submit('to$add_element')
|
||
resp = resp.form.submit('to$add_element')
|
||
resp.form['to$element1$value_text'] = '12345'
|
||
resp = resp.form.submit('submit')
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].to == ['12345']
|
||
|
||
|
||
def test_workflows_edit_attachment_action(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
resp.form['action-interaction'] = 'Attachment'
|
||
resp = resp.form.submit().follow()
|
||
|
||
resp = resp.click('Attachment')
|
||
assert not resp.form['document_type'].value
|
||
resp.form['document_type'] = '_audio'
|
||
resp = resp.form.submit('submit').follow().follow()
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].document_type == {
|
||
'label': 'Sound files',
|
||
'mimetypes': ['audio/*'],
|
||
'id': '_audio',
|
||
}
|
||
|
||
resp = resp.click('Attachment')
|
||
assert resp.form['document_type'].value == '_audio'
|
||
resp = resp.form.submit('submit').follow().follow()
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].document_type == {
|
||
'label': 'Sound files',
|
||
'mimetypes': ['audio/*'],
|
||
'id': '_audio',
|
||
}
|
||
|
||
# configure global filetypes
|
||
pub.cfg['filetypes'] = {
|
||
1: {'mimetypes': ['application/pdf', 'application/msword'], 'label': 'Text files'}
|
||
}
|
||
pub.write_cfg()
|
||
|
||
resp = resp.click('Attachment')
|
||
resp.form['document_type'] = '1'
|
||
resp = resp.form.submit('submit').follow().follow()
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].document_type == {
|
||
'label': 'Text files',
|
||
'mimetypes': ['application/pdf', 'application/msword'],
|
||
'id': 1,
|
||
}
|
||
|
||
# remove global filetype
|
||
pub.cfg['filetypes'] = {}
|
||
pub.write_cfg()
|
||
|
||
# check its value is still selected
|
||
resp = resp.click('Attachment')
|
||
assert 'Text files' in [x[2] for x in resp.form['document_type'].options]
|
||
assert resp.form['document_type'].value == '1'
|
||
resp = resp.form.submit('submit').follow().follow()
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].document_type == {
|
||
'label': 'Text files',
|
||
'mimetypes': ['application/pdf', 'application/msword'],
|
||
'id': 1,
|
||
}
|
||
|
||
|
||
def test_workflows_edit_display_form_action(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
resp.forms[0]['action-interaction'] = 'Form'
|
||
resp = resp.forms[0].submit().follow()
|
||
resp = resp.click(re.compile('^Form$'))
|
||
resp.form['varname'] = 'myform'
|
||
resp = resp.forms[0].submit('submit').follow()
|
||
# fields page
|
||
resp.form['label'] = 'foobar'
|
||
resp.form['type'] = 'string'
|
||
resp = resp.form.submit()
|
||
|
||
resp = resp.follow()
|
||
assert 'foobar' in resp.text
|
||
resp = resp.click('Edit')
|
||
assert 'display_locations' not in resp.form.fields.keys()
|
||
assert 'condition$type' in resp.form.fields.keys()
|
||
resp = resp.form.submit('cancel')
|
||
resp = resp.follow()
|
||
resp = resp.click('Remove')
|
||
assert 'You are about to remove the "foobar" field.' in resp.text
|
||
assert 'Warning:' not in resp.text
|
||
|
||
resp = app.get('/backoffice/workflows/1/status/1/items/1/')
|
||
resp.form['varname'] = 'form'
|
||
resp = resp.form.submit('submit')
|
||
assert 'Wrong identifier detected: "form" prefix is forbidden.' in resp.text
|
||
resp.form['varname'] = 'form_foo'
|
||
resp = resp.form.submit('submit')
|
||
assert 'Wrong identifier detected: "form" prefix is forbidden.' in resp.text
|
||
resp.form['varname'] = 'formfoo'
|
||
resp = resp.form.submit('submit')
|
||
assert 'Wrong identifier detected: "form" prefix is forbidden.' not in resp.text
|
||
|
||
|
||
def test_workflows_edit_choice_action(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
resp.forms[0]['action-status-change'] = 'Manual Jump'
|
||
resp = resp.forms[0].submit()
|
||
resp = resp.follow()
|
||
|
||
resp = resp.click(href='items/1/', index=0)
|
||
assert 'Previously Marked Status' not in [x[2] for x in resp.form['status'].options]
|
||
resp.form['status'].value = 'baz'
|
||
resp = resp.form.submit('submit')
|
||
resp = resp.follow()
|
||
resp = resp.follow()
|
||
|
||
resp = resp.click(href='items/1/', index=0)
|
||
resp.form['set_marker_on_status'].value = True
|
||
resp = resp.form.submit('submit')
|
||
resp = resp.follow()
|
||
resp = resp.follow()
|
||
|
||
resp = resp.click(href='items/1/', index=0)
|
||
assert 'Previously Marked Status' in [x[2] for x in resp.form['status'].options]
|
||
|
||
|
||
def test_workflows_edit_choice_action_functions_only(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
resp.forms[0]['action-status-change'] = 'Manual Jump'
|
||
resp = resp.forms[0].submit()
|
||
resp = resp.follow()
|
||
|
||
resp = resp.click(href='items/1/', index=0)
|
||
assert 'foobar' in [x[2] for x in resp.form['by$element0'].options]
|
||
assert '_receiver' in [x[0] for x in resp.form['by$element0'].options]
|
||
|
||
pub.site_options.set('options', 'workflow-functions-only', 'true')
|
||
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
|
||
pub.site_options.write(fd)
|
||
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
resp = resp.click(href='items/1/', index=0)
|
||
assert 'foobar' not in [x[2] for x in resp.form['by$element0'].options]
|
||
assert '_receiver' in [x[0] for x in resp.form['by$element0'].options]
|
||
|
||
|
||
def test_workflows_edit_choice_action_line_details(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
wf = Workflow(name='foo')
|
||
st1 = wf.add_status('New')
|
||
st2 = wf.add_status('Resubmit')
|
||
|
||
jump = ChoiceWorkflowStatusItem()
|
||
jump.id = '1'
|
||
jump.parent = st1
|
||
st1.items.append(jump)
|
||
wf.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st1.id))
|
||
assert resp.html.find('a', {'href': 'items/1/'}).text == 'Manual Jump (not completed)'
|
||
|
||
jump.label = 'Resubmit'
|
||
wf.store()
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st1.id))
|
||
assert resp.html.find('a', {'href': 'items/1/'}).text == 'Manual Jump (not completed)'
|
||
|
||
jump.status = st2.id
|
||
wf.store()
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st1.id))
|
||
assert resp.html.find('a', {'href': 'items/1/'}).text == 'Manual Jump ("Resubmit", to Resubmit)'
|
||
|
||
jump.label = 'Resubmit'
|
||
wf.store()
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st1.id))
|
||
assert resp.html.find('a', {'href': 'items/1/'}).text == 'Manual Jump ("Resubmit", to Resubmit)'
|
||
|
||
jump.set_marker_on_status = True
|
||
wf.store()
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st1.id))
|
||
assert (
|
||
resp.html.find('a', {'href': 'items/1/'}).text
|
||
== 'Manual Jump ("Resubmit", to Resubmit (and set marker))'
|
||
)
|
||
|
||
jump.set_marker_on_status = False
|
||
jump.by = ['_submitter']
|
||
wf.store()
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st1.id))
|
||
assert resp.html.find('a', {'href': 'items/1/'}).text == 'Manual Jump ("Resubmit", to Resubmit, by User)'
|
||
|
||
jump.set_marker_on_status = True
|
||
wf.store()
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st1.id))
|
||
assert (
|
||
resp.html.find('a', {'href': 'items/1/'}).text
|
||
== 'Manual Jump ("Resubmit", to Resubmit, by User (and set marker))'
|
||
)
|
||
|
||
jump.status = 'error'
|
||
wf.store()
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st1.id))
|
||
assert (
|
||
resp.html.find('a', {'href': 'items/1/'}).text == 'Manual Jump (broken, missing destination status)'
|
||
)
|
||
|
||
jump.status = '_previous'
|
||
wf.store()
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st1.id))
|
||
assert (
|
||
resp.html.find('a', {'href': 'items/1/'}).text
|
||
== 'Manual Jump ("Resubmit", to previously marked status, by User (and set marker))'
|
||
)
|
||
|
||
|
||
def test_workflows_edit_export_to_model_action(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
resp.forms[0]['action-formdata-action'] = 'Document Creation'
|
||
resp = resp.forms[0].submit()
|
||
resp = resp.follow()
|
||
|
||
resp = resp.click('Document Creation')
|
||
with open(os.path.join(os.path.dirname(__file__), '../template.odt'), 'rb') as fd:
|
||
model_content = fd.read()
|
||
resp.form['model_file$file'] = Upload('test.odt', model_content)
|
||
resp = resp.form.submit('submit')
|
||
resp = resp.follow()
|
||
resp = resp.follow()
|
||
resp = resp.click('Document Creation')
|
||
resp_model_content = resp.click('test.odt')
|
||
assert resp_model_content.body == model_content
|
||
resp = resp.form.submit('submit').follow().follow()
|
||
# check file model is still there
|
||
resp = resp.click('Document Creation')
|
||
resp_model_content = resp.click('test.odt')
|
||
assert resp_model_content.body == model_content
|
||
|
||
|
||
def test_workflows_action_subpath(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
baz_status = workflow.add_status(name='baz')
|
||
display_message = DisplayMessageWorkflowStatusItem()
|
||
display_message.parent = baz_status
|
||
baz_status.items.append(display_message)
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, baz_status.id))
|
||
|
||
app.get('/backoffice/workflows/%s/status/%s/items/1/crash' % (workflow.id, baz_status.id), status=404)
|
||
|
||
|
||
def test_workflows_display_action_ezt_validation(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
baz_status = workflow.add_status(name='baz')
|
||
display_message = DisplayMessageWorkflowStatusItem()
|
||
display_message.parent = baz_status
|
||
baz_status.items.append(display_message)
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, baz_status.id))
|
||
resp.form['message'] = 'Hello world'
|
||
resp = resp.form.submit('submit')
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].message == 'Hello world'
|
||
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, baz_status.id))
|
||
resp.form['message'] = '{% if test %}test{% endif %}' # valid Django
|
||
resp = resp.form.submit('submit')
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].message == '{% if test %}test{% endif %}'
|
||
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, baz_status.id))
|
||
resp.form['message'] = '{% if test %}test{% end %}' # invalid Django
|
||
resp = resp.form.submit('submit')
|
||
assert 'syntax error in Django template' in resp.text
|
||
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, baz_status.id))
|
||
resp.form['message'] = '[if-any test]test[end]' # valid ezt
|
||
resp = resp.form.submit('submit')
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].message == '[if-any test]test[end]'
|
||
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, baz_status.id))
|
||
resp.form['message'] = '[is test][end]' # invalid ezt
|
||
resp = resp.form.submit('submit')
|
||
assert 'syntax error in ezt template' in resp.text
|
||
|
||
|
||
def test_workflows_delete_action(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
|
||
resp.forms[0]['action-interaction'] = 'Email'
|
||
resp = resp.forms[0].submit()
|
||
resp = resp.follow()
|
||
assert 'Email' in resp.text
|
||
|
||
resp = resp.click(href='items/1/delete')
|
||
resp = resp.form.submit('cancel')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/status/1/'
|
||
resp = resp.follow()
|
||
resp = resp.click(href='items/1/delete')
|
||
resp = resp.form.submit('submit')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/status/1/'
|
||
resp = resp.follow()
|
||
assert Workflow.get(workflow.id).possible_status[0].items == []
|
||
|
||
|
||
def test_workflows_variables(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click(href='variables/')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/variables/fields/'
|
||
resp = resp.follow()
|
||
|
||
# makes sure we can't add page fields
|
||
assert 'value="Page"' not in resp.text
|
||
|
||
# add a simple field
|
||
resp.forms[0]['label'] = 'foobar'
|
||
resp.forms[0]['type'] = 'string'
|
||
resp = resp.forms[0].submit()
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/variables/fields/'
|
||
resp = resp.follow()
|
||
|
||
# check it's been saved correctly
|
||
assert 'foobar' in resp.text
|
||
assert len(Workflow.get(1).variables_formdef.fields) == 1
|
||
assert Workflow.get(1).variables_formdef.fields[0].key == 'string'
|
||
assert Workflow.get(1).variables_formdef.fields[0].label == 'foobar'
|
||
|
||
|
||
def test_workflows_variables_edit(pub):
|
||
test_workflows_variables(pub)
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click(href='variables/', index=0)
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/variables/fields/'
|
||
resp = resp.follow()
|
||
resp = resp.click('Edit', href='1/')
|
||
assert resp.forms[0]['varname$name'].value == 'foobar'
|
||
assert 'varname$select' not in resp.forms[0].fields
|
||
|
||
workflow = Workflow.get(1)
|
||
baz_status = workflow.add_status(name='baz')
|
||
display_message = DisplayMessageWorkflowStatusItem()
|
||
display_message.parent = baz_status
|
||
baz_status.items.append(display_message)
|
||
workflow.store()
|
||
|
||
resp = app.get('/backoffice/workflows/1/variables/fields/')
|
||
resp = resp.click('Edit', href='1/')
|
||
assert 'varname$select' in resp.forms[0].fields
|
||
resp.forms[0]['varname$select'].value = '1*1*message'
|
||
resp = resp.forms[0].submit('submit')
|
||
|
||
assert Workflow.get(1).variables_formdef.fields[0].key == 'string'
|
||
assert Workflow.get(1).variables_formdef.fields[0].varname == '1*1*message'
|
||
|
||
|
||
def test_workflows_variables_edit_with_all_action_types(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
status = workflow.add_status(name='baz')
|
||
for item_class in item_classes:
|
||
status.append_item(item_class.key)
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click(href='variables/')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/variables/fields/'
|
||
resp = resp.follow()
|
||
|
||
# add a simple field
|
||
resp.forms[0]['label'] = 'foobar'
|
||
resp.forms[0]['type'] = 'string'
|
||
resp = resp.forms[0].submit()
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/variables/fields/'
|
||
resp = resp.follow()
|
||
|
||
resp = app.get('/backoffice/workflows/1/variables/fields/')
|
||
resp = resp.click('Edit', href='1/')
|
||
assert 'varname$select' in resp.forms[0].fields
|
||
resp.forms[0]['varname$name'].value = 'xxx'
|
||
resp = resp.forms[0].submit('submit')
|
||
|
||
assert Workflow.get(1).variables_formdef.fields[0].key == 'string'
|
||
assert Workflow.get(1).variables_formdef.fields[0].varname == 'xxx'
|
||
|
||
|
||
def test_workflows_export_to_model_action_display(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
baz_status = workflow.add_status(name='baz')
|
||
export_to = ExportToModel()
|
||
export_to.label = 'create doc'
|
||
baz_status.items.append(export_to)
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/status/1/')
|
||
assert 'Document Creation (no model set)' in resp
|
||
|
||
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
|
||
upload.fp = io.BytesIO()
|
||
upload.fp.write(b'HELLO WORLD')
|
||
upload.fp.seek(0)
|
||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||
export_to.id = '_export_to'
|
||
export_to.by = ['_submitter']
|
||
workflow.store()
|
||
|
||
resp = app.get('/backoffice/workflows/1/status/1/')
|
||
assert 'Document Creation (with model named test.rtf of 11 bytes)' in resp
|
||
|
||
upload.fp.write(b'HELLO WORLD' * 4242)
|
||
upload.fp.seek(0)
|
||
export_to.model_file = UploadedFile(pub.app_dir, None, upload)
|
||
workflow.store()
|
||
|
||
resp = app.get('/backoffice/workflows/1/status/1/')
|
||
assert 'Document Creation (with model named test.rtf of 45.6 KB)' in resp
|
||
|
||
|
||
def test_workflows_variables_with_export_to_model_action(pub):
|
||
test_workflows_variables(pub)
|
||
|
||
workflow = Workflow.get(1)
|
||
baz_status = workflow.add_status(name='baz')
|
||
export_to = ExportToModel()
|
||
export_to.label = 'create doc'
|
||
baz_status.items.append(export_to)
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/variables/fields/')
|
||
resp = resp.click('Edit', href='1/')
|
||
|
||
|
||
def test_workflows_variables_replacement(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
baz_status = workflow.add_status(name='baz')
|
||
display_message = DisplayMessageWorkflowStatusItem()
|
||
display_message.parent = baz_status
|
||
baz_status.items.append(display_message)
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/variables/fields/' % workflow.id)
|
||
|
||
# add a field
|
||
resp.forms[0]['label'] = 'foobar'
|
||
resp.forms[0]['type'] = 'string'
|
||
resp = resp.forms[0].submit().follow()
|
||
|
||
# edit
|
||
resp = resp.click('Edit', href='1/')
|
||
resp.form['varname$select'].value = '1*1*message'
|
||
resp = resp.form.submit('submit').follow()
|
||
|
||
# make sure a wrong variable name is not displayed
|
||
assert 'form_option_1*1*message' not in resp.text
|
||
assert Workflow.get(workflow.id).variables_formdef.fields[0].varname == '1*1*message'
|
||
|
||
# and make sure it doesn't appear in formdata inspect page
|
||
formdef = FormDef()
|
||
formdef.name = 'Form title'
|
||
formdef.workflow = workflow
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
data_class = formdef.data_class()
|
||
data_class.wipe()
|
||
|
||
formdata = data_class()
|
||
formdata.data = {}
|
||
formdata.status = 'wf-new'
|
||
formdata.store()
|
||
|
||
resp = app.get(formdata.get_backoffice_url() + 'inspect')
|
||
assert 'form_option_1*1*message' not in resp.text
|
||
|
||
|
||
def test_workflows_backoffice_fields(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.add_status(name='baz')
|
||
workflow.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'form title'
|
||
formdef.workflow_id = workflow.id
|
||
formdef.fields = []
|
||
formdef.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
assert 'Set Backoffice Field' not in resp.text
|
||
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click(href='backoffice-fields/')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/backoffice-fields/fields/'
|
||
resp = resp.follow()
|
||
|
||
# makes sure we can't add page fields
|
||
assert 'value="Page"' not in resp.text
|
||
|
||
# add a simple field
|
||
resp.forms[0]['label'] = 'foobar'
|
||
resp.forms[0]['type'] = 'string'
|
||
resp = resp.forms[0].submit()
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/backoffice-fields/fields/'
|
||
resp = resp.follow()
|
||
assert Workflow.get(workflow.id).get_backoffice_fields()[0].required is True
|
||
|
||
# check it's been saved correctly
|
||
assert 'foobar' in resp.text
|
||
assert len(Workflow.get(1).backoffice_fields_formdef.fields) == 1
|
||
assert Workflow.get(1).backoffice_fields_formdef.fields[0].id.startswith('bo')
|
||
assert Workflow.get(1).backoffice_fields_formdef.fields[0].key == 'string'
|
||
assert Workflow.get(1).backoffice_fields_formdef.fields[0].label == 'foobar'
|
||
|
||
backoffice_field_id = Workflow.get(1).backoffice_fields_formdef.fields[0].id
|
||
formdef = FormDef.get(formdef.id)
|
||
data_class = formdef.data_class()
|
||
data_class.wipe()
|
||
formdata = data_class()
|
||
formdata.data = {backoffice_field_id: 'HELLO'}
|
||
formdata.status = 'wf-new'
|
||
formdata.store()
|
||
|
||
assert data_class.get(formdata.id).data[backoffice_field_id] == 'HELLO'
|
||
|
||
# check the "set backoffice fields" action is now available
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz')
|
||
resp.forms[0]['action-formdata-action'] = 'Backoffice Data'
|
||
resp = resp.forms[0].submit()
|
||
resp = resp.follow()
|
||
|
||
# add a second field
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click(href='backoffice-fields/', index=0)
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/backoffice-fields/fields/'
|
||
resp = resp.follow()
|
||
resp.forms[0]['label'] = 'foobar2'
|
||
resp.forms[0]['type'] = 'string'
|
||
resp = resp.forms[0].submit()
|
||
assert resp.location == 'http://example.net/backoffice/workflows/1/backoffice-fields/fields/'
|
||
resp = resp.follow()
|
||
workflow = Workflow.get(workflow.id)
|
||
assert len(workflow.backoffice_fields_formdef.fields) == 2
|
||
first_field_id = workflow.backoffice_fields_formdef.fields[0].id
|
||
assert workflow.backoffice_fields_formdef.fields[1].id != first_field_id
|
||
|
||
# check there's no prefill field
|
||
resp = app.get(
|
||
'/backoffice/workflows/1/backoffice-fields/fields/%s/'
|
||
% workflow.backoffice_fields_formdef.fields[1].id
|
||
)
|
||
assert 'prefill$type' not in resp.form.fields.keys()
|
||
|
||
# add a title field
|
||
resp = app.get('/backoffice/workflows/1/backoffice-fields/fields/')
|
||
resp.forms[0]['label'] = 'foobar3'
|
||
resp.forms[0]['type'] = 'title'
|
||
resp = resp.form.submit()
|
||
workflow = Workflow.get(workflow.id)
|
||
assert len(workflow.backoffice_fields_formdef.fields) == 3
|
||
|
||
# check backoffice fields are available in set backoffice fields action
|
||
resp = app.get('/backoffice/workflows/1/')
|
||
resp = resp.click('baz') # status
|
||
resp = resp.click('Backoffice Data')
|
||
options = [x[2] for x in resp.form['fields$element0$field_id'].options]
|
||
assert '' in options
|
||
assert 'foobar' in options
|
||
assert 'foobar2' in options
|
||
assert 'foobar3' not in options
|
||
|
||
resp.form['fields$element0$field_id'] = first_field_id
|
||
resp.form['fields$element0$value$value_text'] = 'Hello'
|
||
resp = resp.form.submit('submit')
|
||
workflow = Workflow.get(workflow.id)
|
||
assert workflow.possible_status[0].items[0].fields == [{'field_id': first_field_id, 'value': 'Hello'}]
|
||
|
||
|
||
def test_workflows_functions(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('add function')
|
||
resp = resp.forms[0].submit('cancel')
|
||
assert set(Workflow.get(workflow.id).roles.keys()) == set(['_receiver'])
|
||
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('add function')
|
||
resp.forms[0]['name'] = 'Other Function'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert set(Workflow.get(workflow.id).roles.keys()) == set(['_receiver', '_other-function'])
|
||
assert Workflow.get(workflow.id).roles['_other-function'] == 'Other Function'
|
||
|
||
# test rename
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('Other Function')
|
||
resp = resp.forms[0].submit('cancel')
|
||
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('Other Function')
|
||
resp.forms[0]['name'] = 'Other Renamed Function'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert set(Workflow.get(workflow.id).roles.keys()) == set(['_receiver', '_other-function'])
|
||
assert Workflow.get(workflow.id).roles['_other-function'] == 'Other Renamed Function'
|
||
|
||
# test new function with older name
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('add function')
|
||
resp.forms[0]['name'] = 'Other Function'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert set(Workflow.get(workflow.id).roles.keys()) == set(
|
||
['_receiver', '_other-function', '_other-function-2']
|
||
)
|
||
assert Workflow.get(workflow.id).roles['_other-function'] == 'Other Renamed Function'
|
||
assert Workflow.get(workflow.id).roles['_other-function-2'] == 'Other Function'
|
||
|
||
# test removal
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('Other Renamed Function')
|
||
resp = resp.forms[0].submit('delete')
|
||
assert set(Workflow.get(workflow.id).roles.keys()) == set(['_receiver', '_other-function-2'])
|
||
|
||
# make sure it's not possible to remove the "_receiver" key
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('Recipient')
|
||
assert 'delete' not in resp.forms[0].fields
|
||
|
||
|
||
def test_workflows_functions_vs_visibility(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
|
||
workflow.store()
|
||
|
||
# restrict visibility
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('Just Submitted')
|
||
resp = resp.click('Change Display Settings')
|
||
resp.forms[0]['hide_status_from_user'].checked = True
|
||
resp = resp.forms[0].submit()
|
||
assert Workflow.get(workflow.id).possible_status[0].visibility == ['_receiver']
|
||
|
||
# add function, make sure visibility follows
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('add function')
|
||
resp.forms[0]['name'] = 'Other Function'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert set(Workflow.get(workflow.id).roles.keys()) == set(['_receiver', '_other-function'])
|
||
assert Workflow.get(workflow.id).roles['_other-function'] == 'Other Function'
|
||
assert set(Workflow.get(workflow.id).possible_status[0].visibility) == set(
|
||
['_receiver', '_other-function']
|
||
)
|
||
|
||
# restrict visibility in a different status, check it gets all the
|
||
# functions
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('Rejected')
|
||
resp = resp.click('Change Display Settings')
|
||
resp.forms[0]['hide_status_from_user'].checked = True
|
||
resp = resp.forms[0].submit()
|
||
assert set(Workflow.get(workflow.id).possible_status[2].visibility) == set(
|
||
['_receiver', '_other-function']
|
||
)
|
||
|
||
|
||
def test_workflows_global_actions(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('add global action')
|
||
resp = resp.forms[0].submit('cancel')
|
||
assert not Workflow.get(workflow.id).global_actions
|
||
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('add global action')
|
||
resp.forms[0]['name'] = 'Global Action'
|
||
resp = resp.forms[0].submit('submit')
|
||
|
||
# test adding action with same name
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('add global action')
|
||
resp.forms[0]['name'] = 'Global Action'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert 'There is already an action with that name.' in resp.text
|
||
|
||
# test rename
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('Global Action')
|
||
resp = resp.click('Change Action Name')
|
||
resp = resp.form.submit('cancel')
|
||
resp = resp.follow()
|
||
resp = resp.click('Change Action Name')
|
||
resp.forms[0]['name'] = 'Renamed Action'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert Workflow.get(workflow.id).global_actions[0].name == 'Renamed Action'
|
||
|
||
# test removal
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('Renamed Action')
|
||
resp = resp.click('Delete')
|
||
resp = resp.form.submit('cancel')
|
||
resp = resp.follow()
|
||
resp = resp.click('Delete')
|
||
resp = resp.form.submit('delete')
|
||
assert not Workflow.get(workflow.id).global_actions
|
||
|
||
|
||
def test_workflows_global_actions_edit(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('add global action')
|
||
resp.forms[0]['name'] = 'Global Action'
|
||
resp = resp.forms[0].submit('submit')
|
||
resp = resp.follow()
|
||
|
||
# test adding all actions
|
||
for category in ('status-change', 'interaction', 'formdata-action', 'user-action'):
|
||
for action in [x[0] for x in resp.forms[0]['action-%s' % category].options if x[0]]:
|
||
resp.forms[0]['action-%s' % category] = action
|
||
resp = resp.forms[0].submit()
|
||
resp = resp.follow()
|
||
|
||
# test visiting
|
||
action_id = Workflow.get(workflow.id).global_actions[0].id
|
||
for item in Workflow.get(workflow.id).global_actions[0].items:
|
||
resp = app.get(
|
||
'/backoffice/workflows/%s/global-actions/%s/items/%s/' % (workflow.id, action_id, item.id)
|
||
)
|
||
|
||
# test modifying a trigger
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('Global Action')
|
||
assert len(Workflow.get(workflow.id).global_actions[0].triggers) == 1
|
||
resp = resp.click(
|
||
href='triggers/%s/' % Workflow.get(workflow.id).global_actions[0].triggers[0].id, index=0
|
||
)
|
||
assert resp.form['roles$element0'].value == 'None'
|
||
resp.form['roles$element0'].value = '_receiver'
|
||
resp = resp.form.submit('submit')
|
||
assert Workflow.get(workflow.id).global_actions[0].triggers[0].roles == ['_receiver']
|
||
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('Global Action')
|
||
resp = resp.click(
|
||
href='triggers/%s/' % Workflow.get(workflow.id).global_actions[0].triggers[0].id, index=0
|
||
)
|
||
assert resp.form['roles$element0'].value == '_receiver'
|
||
resp = resp.form.submit('roles$add_element')
|
||
resp.form['roles$element1'].value = '_submitter'
|
||
resp = resp.form.submit('submit')
|
||
assert Workflow.get(workflow.id).global_actions[0].triggers[0].roles == ['_receiver', '_submitter']
|
||
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('Global Action')
|
||
resp = resp.click(
|
||
href='triggers/%s/' % Workflow.get(workflow.id).global_actions[0].triggers[0].id, index=0
|
||
)
|
||
resp.form['roles$element1'].value = 'None'
|
||
resp = resp.form.submit('submit')
|
||
assert Workflow.get(workflow.id).global_actions[0].triggers[0].roles == ['_receiver']
|
||
|
||
|
||
def test_workflows_global_actions_timeout_triggers(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('add global action')
|
||
resp.forms[0]['name'] = 'Global Action'
|
||
resp = resp.forms[0].submit('submit')
|
||
resp = resp.follow()
|
||
|
||
# test removing the existing manual trigger
|
||
resp = resp.click(href='triggers/%s/delete' % Workflow.get(workflow.id).global_actions[0].triggers[0].id)
|
||
resp = resp.forms[0].submit()
|
||
resp = resp.follow()
|
||
|
||
assert len(Workflow.get(workflow.id).global_actions[0].triggers) == 0
|
||
|
||
# test adding a timeout trigger
|
||
resp.forms[1]['type'] = 'Automatic'
|
||
resp = resp.forms[1].submit()
|
||
resp = resp.follow()
|
||
|
||
assert 'Automatic (not configured)' in resp.text
|
||
|
||
resp = resp.click(
|
||
href='triggers/%s/' % Workflow.get(workflow.id).global_actions[0].triggers[0].id, index=0
|
||
)
|
||
for invalid_value in ('foobar', '-'):
|
||
resp.form['timeout'] = invalid_value
|
||
resp = resp.form.submit('submit')
|
||
assert 'wrong format' in resp.text
|
||
resp.form['timeout'] = ''
|
||
resp = resp.form.submit('submit')
|
||
assert 'required field' in resp.text
|
||
resp.form['timeout'] = '3'
|
||
resp = resp.form.submit('submit').follow()
|
||
|
||
assert Workflow.get(workflow.id).global_actions[0].triggers[0].timeout == '3'
|
||
|
||
resp = resp.click(
|
||
href='triggers/%s/' % Workflow.get(workflow.id).global_actions[0].triggers[0].id, index=0
|
||
)
|
||
resp.form['timeout'] = '-2'
|
||
resp = resp.form.submit('submit').follow()
|
||
assert Workflow.get(workflow.id).global_actions[0].triggers[0].timeout == '-2'
|
||
|
||
|
||
def test_workflows_global_actions_webservice_trigger(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('add global action')
|
||
resp.forms[0]['name'] = 'Global Action'
|
||
resp = resp.forms[0].submit('submit')
|
||
resp = resp.follow()
|
||
|
||
# test adding a timeout trigger
|
||
resp.forms[1]['type'] = 'External call'
|
||
resp = resp.forms[1].submit()
|
||
resp = resp.follow()
|
||
|
||
assert 'External call (not configured)' in resp
|
||
|
||
resp = resp.click(
|
||
href='triggers/%s/' % Workflow.get(workflow.id).global_actions[0].triggers[-1].id, index=0
|
||
)
|
||
resp.form['identifier'] = 'foobar'
|
||
resp = resp.form.submit('submit').follow()
|
||
assert 'External call (foobar)' in resp
|
||
|
||
|
||
def test_workflows_global_actions_external_workflow_action(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
|
||
wf = Workflow(name='external')
|
||
action = wf.add_global_action('Global action')
|
||
trigger = action.append_trigger('webservice')
|
||
action.append_item('remove')
|
||
wf.store()
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'external'
|
||
formdef.workflow = wf
|
||
formdef.store()
|
||
workflow = Workflow(name='foo')
|
||
st = workflow.add_status('New')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (workflow.id, st.id))
|
||
resp.forms[0]['action-formdata-action'] = 'External workflow'
|
||
resp = resp.forms[0].submit().follow()
|
||
assert 'External workflow (not completed)' in resp.text
|
||
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, st.id))
|
||
assert "No workflow with external triggerable global action." in resp.text
|
||
|
||
trigger.identifier = 'test'
|
||
wf.store()
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, st.id))
|
||
|
||
resp = resp.forms[0].submit('submit')
|
||
assert "required field" in resp.text
|
||
resp.forms[0]['slug'] = 'formdef:%s' % formdef.url_name
|
||
resp = resp.forms[0].submit('submit')
|
||
assert "required field" in resp.text
|
||
resp = resp.forms[0].submit('submit')
|
||
resp.forms[0]['trigger_id'] = 'action:%s' % trigger.identifier
|
||
resp = resp.forms[0].submit('submit').follow().follow()
|
||
assert 'External workflow (action "Global action" on external)' in resp.text
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].target_mode == 'all'
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].target_id is None
|
||
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, st.id))
|
||
resp.forms[0]['target_mode'] = 'manual'
|
||
resp.forms[0]['target_id$type'] = 'template'
|
||
resp.forms[0]['target_id$value_template'] = '{{ form_var_plop_id }}'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].target_mode == 'manual'
|
||
assert Workflow.get(workflow.id).possible_status[0].items[0].target_id == '{{ form_var_plop_id }}'
|
||
|
||
trigger.identifier = 'another_test'
|
||
wf.store()
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (workflow.id, st.id))
|
||
assert 'External workflow (not completed)' in resp.text
|
||
|
||
trigger.identifier = 'action:%s' % trigger.identifier
|
||
wf.store()
|
||
formdef.remove_self()
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (workflow.id, st.id))
|
||
assert 'External workflow (not completed)' in resp.text
|
||
resp = resp.click(href=re.compile(r'^items/1/$'), index=0)
|
||
|
||
|
||
def test_workflows_external_workflow_action_config(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
external_wf = Workflow(name='external')
|
||
action = external_wf.add_global_action('Global action')
|
||
trigger = action.append_trigger('webservice')
|
||
trigger.identifier = 'test'
|
||
action.append_item('remove')
|
||
external_wf.store()
|
||
|
||
FormDef.wipe()
|
||
formdef = FormDef()
|
||
formdef.name = 'external'
|
||
formdef.workflow = external_wf
|
||
formdef.store()
|
||
|
||
wf = Workflow(name='foo')
|
||
st = wf.add_status('New')
|
||
external = ExternalWorkflowGlobalAction()
|
||
external.parent = st
|
||
st.items.append(external)
|
||
wf.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (wf.id, st.id))
|
||
# only action error: custom error message
|
||
resp.forms[0]['slug'] = 'formdef:external'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert 'There were errors processing your form. See below for details.' not in resp
|
||
assert 'This action is configured in two steps. See below for details.' in resp
|
||
assert "required field" in resp
|
||
# multiple errors: do as usual
|
||
resp.forms[0]['slug'] = 'formdef:external'
|
||
resp.forms[0]['condition$type'] = 'django'
|
||
resp.forms[0]['condition$value_django'] = '{{ 42 }}'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert 'There were errors processing your form. See below for details.' in resp
|
||
assert 'This action is configured in two steps. See below for details.' not in resp
|
||
assert "required field" in resp
|
||
assert "syntax error: Could not parse the remainder: '{{' from '{{'" in resp
|
||
|
||
|
||
def test_workflows_create_formdata(pub):
|
||
create_superuser(pub)
|
||
|
||
FormDef.wipe()
|
||
target_formdef = FormDef()
|
||
target_formdef.name = 'target form'
|
||
target_formdef.enable_tracking_codes = True
|
||
target_formdef.fields = [
|
||
fields.StringField(id='0', label='string', varname='foo_string'),
|
||
fields.FileField(id='1', label='file', type='file', varname='foo_file'),
|
||
]
|
||
target_formdef.store()
|
||
|
||
Workflow.wipe()
|
||
wf = Workflow(name='create-formdata')
|
||
|
||
st1 = wf.add_status('New')
|
||
st2 = wf.add_status('Resubmit')
|
||
|
||
jump = ChoiceWorkflowStatusItem()
|
||
jump.id = '_resubmit'
|
||
jump.label = 'Resubmit'
|
||
jump.by = ['_submitter']
|
||
jump.status = st2.id
|
||
jump.parent = st1
|
||
st1.items.append(jump)
|
||
|
||
create_formdata = CreateFormdataWorkflowStatusItem()
|
||
create_formdata.id = '_create_formdata'
|
||
create_formdata.formdef_slug = target_formdef.url_name
|
||
create_formdata.mappings = [
|
||
Mapping(field_id='0', expression='=form_var_toto_string'),
|
||
Mapping(field_id='1', expression='=form_var_toto_file_raw'),
|
||
]
|
||
create_formdata.parent = st2
|
||
st2.items.append(create_formdata)
|
||
|
||
redirect = RedirectToUrlWorkflowStatusItem()
|
||
redirect.id = '_redirect'
|
||
redirect.url = '{{ form_links_resubmitted.form_url }}'
|
||
redirect.parent = st2
|
||
st2.items.append(redirect)
|
||
|
||
jump = JumpOnSubmitWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
jump.status = st1.id
|
||
jump.parent = st2
|
||
st2.items.append(jump)
|
||
|
||
wf.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st2.id))
|
||
pq = resp.pyquery.remove_namespaces()
|
||
assert pq('option[value="New Form Creation"]').text() == 'New Form Creation'
|
||
assert pq('#itemId__create_formdata a')[0].text == 'New Form Creation (target form)'
|
||
|
||
resp = resp.click(
|
||
'Edit',
|
||
href='items/_create_formdata/',
|
||
)
|
||
resp.form.set('varname', 'resubmitted')
|
||
resp = resp.form.submit(name='submit')
|
||
resp = resp.follow()
|
||
|
||
# checks that nothing changed after submit
|
||
wf2 = Workflow.select()[0]
|
||
item = wf2.get_status('2').items[0]
|
||
assert item.varname == 'resubmitted'
|
||
assert isinstance(item, CreateFormdataWorkflowStatusItem)
|
||
wf.get_status('2').items[0].label = 'really resubmit'
|
||
|
||
# duplicate
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/_create_formdata/' % (wf.id, st2.id))
|
||
resp.form.set('mappings$element1$field_id', '0')
|
||
resp = resp.form.submit(name='submit')
|
||
pq = resp.pyquery.remove_namespaces()
|
||
assert pq('.error').text() == 'Some destination fields are duplicated'
|
||
|
||
# check setting map_fields_by_varname on new action
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st2.id))
|
||
resp.forms['new-action-form']['action-formdata-action'] = 'New Form Creation'
|
||
resp = resp.forms['new-action-form'].submit()
|
||
resp = resp.follow()
|
||
resp = resp.click(r'New Form Creation \(not configured\)')
|
||
resp.form['formdef_slug'] = 'target-form' # set target form
|
||
resp = resp.form.submit('submit')
|
||
assert 'Please define new mappings' in resp
|
||
assert resp.form['map_fields_by_varname'].checked is False
|
||
resp.form['map_fields_by_varname'].checked = True
|
||
resp = resp.form.submit('submit')
|
||
resp = resp.follow()
|
||
wf = Workflow.get(wf.id)
|
||
st2 = wf.possible_status[-1]
|
||
assert wf.possible_status[-1].items[-1].map_fields_by_varname is True
|
||
assert wf.possible_status[-1].items[-1].mappings is None
|
||
assert wf.possible_status[-1].items[-1].formdef_slug == 'target-form'
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/%s/' % (wf.id, st2.id, st2.items[-1].id))
|
||
resp.form['formdef_slug'] = '' # unset target form
|
||
resp.form.submit('submit')
|
||
wf = Workflow.get(wf.id)
|
||
assert wf.possible_status[-1].items[-1].formdef_slug is None
|
||
assert wf.possible_status[-1].items[-1].map_fields_by_varname is True
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/%s/' % (wf.id, st2.id, st2.items[-1].id))
|
||
resp.form['formdef_slug'] = 'target-form' # reset target form
|
||
resp.form.submit('submit') # no error
|
||
|
||
|
||
def test_workflows_create_formdata_action_config(pub):
|
||
create_superuser(pub)
|
||
|
||
FormDef.wipe()
|
||
target_formdef = FormDef()
|
||
target_formdef.name = 'target form'
|
||
target_formdef.fields = []
|
||
target_formdef.store()
|
||
|
||
Workflow.wipe()
|
||
wf = Workflow(name='create-formdata')
|
||
st = wf.add_status('New')
|
||
create_formdata = CreateFormdataWorkflowStatusItem()
|
||
create_formdata.parent = st
|
||
st.items.append(create_formdata)
|
||
wf.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (wf.id, st.id))
|
||
# only mapping error: custom error message
|
||
resp.forms[0]['formdef_slug'] = 'target-form'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert 'There were errors processing your form. See below for details.' not in resp
|
||
assert 'This action is configured in two steps. See below for details.' in resp
|
||
assert 'Please define new mappings' in resp
|
||
# multiple errors: do as usual
|
||
resp.forms[0]['formdef_slug'] = 'target-form'
|
||
resp.forms[0]['condition$type'] = 'django'
|
||
resp.forms[0]['condition$value_django'] = '{{ 42 }}'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert 'There were errors processing your form. See below for details.' in resp
|
||
assert 'This action is configured in two steps. See below for details.' not in resp
|
||
assert 'Please define new mappings' in resp
|
||
assert "syntax error: Could not parse the remainder: '{{' from '{{'" in resp
|
||
|
||
|
||
def test_workflows_create_carddata_action_config(pub):
|
||
create_superuser(pub)
|
||
|
||
CardDef.wipe()
|
||
carddef = CardDef()
|
||
carddef.name = 'My card'
|
||
carddef.fields = []
|
||
carddef.store()
|
||
|
||
Workflow.wipe()
|
||
wf = Workflow(name='create-carddata')
|
||
st = wf.add_status('New')
|
||
create_carddata = CreateCarddataWorkflowStatusItem()
|
||
create_carddata.parent = st
|
||
st.items.append(create_carddata)
|
||
wf.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (wf.id, st.id))
|
||
# only mapping error: custom error message
|
||
resp.forms[0]['formdef_slug'] = 'my-card'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert 'There were errors processing your form. See below for details.' not in resp
|
||
assert 'This action is configured in two steps. See below for details.' in resp
|
||
assert 'Please define new mappings' in resp
|
||
# multiple errors: do as usual
|
||
resp.forms[0]['formdef_slug'] = 'my-card'
|
||
resp.forms[0]['condition$type'] = 'django'
|
||
resp.forms[0]['condition$value_django'] = '{{ 42 }}'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert 'There were errors processing your form. See below for details.' in resp
|
||
assert 'This action is configured in two steps. See below for details.' not in resp
|
||
assert 'Please define new mappings' in resp
|
||
assert "syntax error: Could not parse the remainder: '{{' from '{{'" in resp
|
||
|
||
|
||
def test_workflows_edit_carddata_action_config(pub):
|
||
create_superuser(pub)
|
||
|
||
CardDef.wipe()
|
||
carddef = CardDef()
|
||
carddef.name = 'My card'
|
||
carddef.fields = []
|
||
carddef.store()
|
||
|
||
Workflow.wipe()
|
||
wf = Workflow(name='edit-carddata')
|
||
st = wf.add_status('New')
|
||
edit_carddata = EditCarddataWorkflowStatusItem()
|
||
edit_carddata.parent = st
|
||
st.items.append(edit_carddata)
|
||
wf.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (wf.id, st.id))
|
||
# only mapping error: custom error message
|
||
resp.forms[0]['formdef_slug'] = 'my-card'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert 'There were errors processing your form. See below for details.' not in resp
|
||
assert 'This action is configured in two steps. See below for details.' in resp
|
||
assert 'Please define new mappings' in resp
|
||
# multiple errors: do as usual
|
||
resp.forms[0]['formdef_slug'] = 'my-card'
|
||
resp.forms[0]['condition$type'] = 'django'
|
||
resp.forms[0]['condition$value_django'] = '{{ 42 }}'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert 'There were errors processing your form. See below for details.' in resp
|
||
assert 'This action is configured in two steps. See below for details.' not in resp
|
||
assert 'Please define new mappings' in resp
|
||
assert "syntax error: Could not parse the remainder: '{{' from '{{'" in resp
|
||
|
||
|
||
def test_workflows_edit_carddata_action(pub):
|
||
create_superuser(pub)
|
||
Workflow.wipe()
|
||
CardDef.wipe()
|
||
|
||
wf = Workflow(name='edit card')
|
||
st = wf.add_status('Update card', 'st')
|
||
wf.store()
|
||
|
||
carddef = CardDef()
|
||
carddef.name = 'My card'
|
||
carddef.fields = [
|
||
fields.StringField(id='1', label='string'),
|
||
]
|
||
carddef.store()
|
||
|
||
app = login(get_app(pub))
|
||
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st.id))
|
||
assert 'Edit Card Data' in [o[0] for o in resp.forms[0]['action-formdata-action'].options]
|
||
|
||
resp.forms[0]['action-formdata-action'] = 'Edit Card Data'
|
||
resp = resp.forms[0].submit().follow()
|
||
assert 'Edit Card Data (not configured)' in resp.text
|
||
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (wf.id, st.id))
|
||
resp.forms[0]['formdef_slug'] = 'my-card'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert 'Leaving the field blank will empty the value.' in resp.text
|
||
resp.forms[0]['mappings$element0$field_id'] = '1'
|
||
resp = resp.forms[0].submit('submit').follow()
|
||
assert 'Edit Card Data (not configured)' not in resp.text
|
||
assert Workflow.get(wf.id).possible_status[0].items[0].target_mode == 'all'
|
||
assert Workflow.get(wf.id).possible_status[0].items[0].target_id is None
|
||
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (wf.id, st.id))
|
||
resp.forms[0]['target_mode'] = 'manual'
|
||
resp.forms[0]['target_id$type'] = 'template'
|
||
resp.forms[0]['target_id$value_template'] = '{{ form_var_plop_id }}'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert Workflow.get(wf.id).possible_status[0].items[0].target_mode == 'manual'
|
||
assert Workflow.get(wf.id).possible_status[0].items[0].target_id == '{{ form_var_plop_id }}'
|
||
|
||
|
||
def test_workflows_criticality_levels(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('add criticality level')
|
||
resp = resp.forms[0].submit('cancel')
|
||
assert not Workflow.get(workflow.id).criticality_levels
|
||
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('add criticality level')
|
||
resp.forms[0]['name'] = 'vigilance'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert len(Workflow.get(workflow.id).criticality_levels) == 1
|
||
assert Workflow.get(workflow.id).criticality_levels[0].name == 'vigilance'
|
||
|
||
# test rename
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('vigilance')
|
||
resp = resp.forms[0].submit('cancel')
|
||
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('vigilance')
|
||
resp.forms[0]['name'] = 'Vigilance'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert len(Workflow.get(workflow.id).criticality_levels) == 1
|
||
assert Workflow.get(workflow.id).criticality_levels[0].name == 'Vigilance'
|
||
|
||
# add a second level
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('add criticality level')
|
||
resp.forms[0]['name'] = 'Alerte attentat'
|
||
resp = resp.forms[0].submit('submit')
|
||
assert len(Workflow.get(workflow.id).criticality_levels) == 2
|
||
assert Workflow.get(workflow.id).criticality_levels[0].name == 'Vigilance'
|
||
assert Workflow.get(workflow.id).criticality_levels[1].name == 'Alerte attentat'
|
||
|
||
# test reorder
|
||
level1_id = Workflow.get(workflow.id).criticality_levels[0].id
|
||
level2_id = Workflow.get(workflow.id).criticality_levels[1].id
|
||
app.get(
|
||
'/backoffice/workflows/%s/update_criticality_levels_order?order=%s;%s;'
|
||
% (workflow.id, level1_id, level2_id)
|
||
)
|
||
assert Workflow.get(workflow.id).criticality_levels[0].id == level1_id
|
||
assert Workflow.get(workflow.id).criticality_levels[1].id == level2_id
|
||
app.get(
|
||
'/backoffice/workflows/%s/update_criticality_levels_order?order=%s;%s;'
|
||
% (workflow.id, level2_id, level1_id)
|
||
)
|
||
assert Workflow.get(workflow.id).criticality_levels[0].id == level2_id
|
||
assert Workflow.get(workflow.id).criticality_levels[1].id == level1_id
|
||
|
||
# test removal
|
||
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
resp = resp.click('Vigilance')
|
||
resp = resp.forms[0].submit('delete-level')
|
||
assert len(Workflow.get(workflow.id).criticality_levels) == 1
|
||
|
||
|
||
def test_workflows_wscall_label(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
baz_status = workflow.add_status(name='baz')
|
||
wscall = WebserviceCallStatusItem()
|
||
wscall.parent = baz_status
|
||
baz_status.items.append(wscall)
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (workflow.id, baz_status.id))
|
||
assert 'Webservice' in resp.text
|
||
assert 'Webservice (' not in resp.text
|
||
|
||
wscall.label = 'foowscallbar'
|
||
workflow.store()
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/' % (workflow.id, baz_status.id))
|
||
assert 'Webservice (foowscallbar)' in resp.text
|
||
|
||
|
||
@pytest.mark.parametrize('value', [True, False])
|
||
def test_workflows_wscall_options(pub, value):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
baz_status = workflow.add_status(name='baz')
|
||
wscall = WebserviceCallStatusItem()
|
||
wscall.parent = baz_status
|
||
baz_status.items.append(wscall)
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, baz_status.id))
|
||
assert resp.form['notify_on_errors'].value is None
|
||
assert resp.form['record_on_errors'].value == 'yes'
|
||
resp.form['notify_on_errors'] = value
|
||
resp.form['record_on_errors'] = value
|
||
resp = resp.form.submit('submit').follow()
|
||
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, baz_status.id))
|
||
assert resp.form['notify_on_errors'].value == ('yes' if value else None)
|
||
assert resp.form['record_on_errors'].value == ('yes' if value else None)
|
||
resp.form['notify_on_errors'] = not value
|
||
resp.form['record_on_errors'] = not value
|
||
resp = resp.form.submit('submit').follow()
|
||
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (workflow.id, baz_status.id))
|
||
assert resp.form['notify_on_errors'].value == ('yes' if not value else None)
|
||
assert resp.form['record_on_errors'].value == ('yes' if not value else None)
|
||
|
||
|
||
def test_workflows_wscall_status_error(pub):
|
||
if not pub.is_using_postgresql():
|
||
pytest.skip('this requires SQL')
|
||
return
|
||
|
||
create_superuser(pub)
|
||
|
||
pub.loggederror_class.wipe()
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
baz_status = workflow.add_status(name='baz')
|
||
foo_status = workflow.add_status(name='foo')
|
||
wscall = WebserviceCallStatusItem()
|
||
wscall.parent = baz_status
|
||
wscall.action_on_app_error = foo_status.id
|
||
wscall.action_on_4xx = foo_status.id
|
||
wscall.action_on_5xx = foo_status.id
|
||
wscall.action_on_bad_data = foo_status.id
|
||
wscall.action_on_network_errors = foo_status.id
|
||
baz_status.items.append(wscall)
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
assert pub.loggederror_class.count() == 0
|
||
|
||
# delete foo status
|
||
del workflow.possible_status[1]
|
||
workflow.store()
|
||
app.get('/backoffice/workflows/%s/' % workflow.id)
|
||
assert pub.loggederror_class.count() == 1
|
||
error = pub.loggederror_class.select()[0]
|
||
assert (
|
||
error.tech_id
|
||
== '%s-reference-to-invalid-status-in-workflow-foo-status-baz-item-webservice' % workflow.id
|
||
)
|
||
assert error.formdef_id is None
|
||
assert error.formdef_class is None
|
||
assert error.workflow_id == workflow.id
|
||
assert error.summary == 'reference to invalid status in workflow foo, status baz, item Webservice'
|
||
assert error.occurences_count == 5
|
||
|
||
|
||
def test_workflows_form_action_config(pub):
|
||
create_superuser(pub)
|
||
|
||
Workflow.wipe()
|
||
wf = Workflow(name='foo')
|
||
st = wf.add_status('New')
|
||
form = FormWorkflowStatusItem()
|
||
form.parent = st
|
||
st.items.append(form)
|
||
wf.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (wf.id, st.id))
|
||
resp.form['by$element0'] = '_submitter'
|
||
resp.form['varname'] = 'myform'
|
||
resp.form['condition$type'] = 'django'
|
||
resp.form['condition$value_django'] = '42'
|
||
resp = resp.form.submit('submit')
|
||
assert resp.location == 'http://example.net/backoffice/workflows/%s/status/%s/items/1/fields/' % (
|
||
wf.id,
|
||
st.id,
|
||
)
|
||
|
||
wf = Workflow.get(wf.id)
|
||
form = wf.possible_status[0].items[0]
|
||
assert form.by == ['_submitter']
|
||
assert form.varname == 'myform'
|
||
assert form.condition == {'type': 'django', 'value': '42'}
|
||
|
||
|
||
def test_workflows_inspect_view(pub):
|
||
create_superuser(pub)
|
||
pub.role_class.wipe()
|
||
role = pub.role_class(name='foobar')
|
||
role.store()
|
||
|
||
Workflow.wipe()
|
||
workflow = Workflow(name='foo')
|
||
|
||
workflow.criticality_levels = [WorkflowCriticalityLevel(name='green')]
|
||
|
||
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
|
||
workflow.backoffice_fields_formdef.fields = [
|
||
fields.StringField(
|
||
id='bo1', label='1st backoffice field', type='string', varname='backoffice_blah', required=True
|
||
),
|
||
]
|
||
|
||
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow)
|
||
workflow.variables_formdef.fields.append(fields.StringField(label='Test', type='string'))
|
||
|
||
foo_status = workflow.add_status(name='foo')
|
||
|
||
baz_status = workflow.add_status(name='baz')
|
||
baz_status.backoffice_info_text = 'Info text'
|
||
wscall = WebserviceCallStatusItem()
|
||
wscall.parent = baz_status
|
||
baz_status.items.append(wscall)
|
||
|
||
dispatch1 = DispatchWorkflowStatusItem()
|
||
dispatch1.dispatch_type = 'automatic'
|
||
dispatch1.rules = [
|
||
{'role_id': role.id, 'value': 'foo'},
|
||
]
|
||
baz_status.items.append(dispatch1)
|
||
dispatch2 = DispatchWorkflowStatusItem()
|
||
dispatch2.dispatch_type = 'manual'
|
||
dispatch2.role_key = '_receiver'
|
||
dispatch2.role_id = role.id
|
||
baz_status.items.append(dispatch2)
|
||
|
||
baz_status.backoffice_info_text = '<p>Hello</p>'
|
||
|
||
display_form = FormWorkflowStatusItem()
|
||
display_form.id = '_x'
|
||
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
||
display_form.formdef.fields.append(fields.StringField(label='Test', type='string'))
|
||
display_form.formdef.fields.append(fields.StringField(label='Test2', type='string'))
|
||
display_form.backoffice_info_text = '<p>Foo</p>'
|
||
baz_status.items.append(display_form)
|
||
display_form.parent = baz_status
|
||
|
||
jump = JumpWorkflowStatusItem()
|
||
jump.id = '_jump'
|
||
jump.timeout = 86400
|
||
jump.status = foo_status.id
|
||
baz_status.items.append(jump)
|
||
jump.parent = baz_status
|
||
|
||
invalid_jump = JumpWorkflowStatusItem()
|
||
invalid_jump.id = '_invalid_jump'
|
||
invalid_jump.status = 'xxx'
|
||
baz_status.items.append(invalid_jump)
|
||
invalid_jump.parent = baz_status
|
||
|
||
ac1 = workflow.add_global_action('Action', 'ac1')
|
||
ac1.backoffice_info_text = '<p>Foo</p>'
|
||
|
||
add_to_journal = RegisterCommenterWorkflowStatusItem()
|
||
add_to_journal.id = '_add_to_journal'
|
||
add_to_journal.comment = 'HELLO WORLD'
|
||
ac1.items.append(add_to_journal)
|
||
add_to_journal.parent = ac1
|
||
|
||
trigger = ac1.triggers[0]
|
||
assert trigger.key == 'manual'
|
||
trigger.roles = [role.id]
|
||
|
||
workflow.store()
|
||
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/%s/inspect' % workflow.id)
|
||
assert (
|
||
'<li><span class="parameter">Dispatch Type:</span> Multiple</li>'
|
||
'<li><span class="parameter">Rules:</span> '
|
||
'<ul class="rules"><li>foo → foobar</li></ul>'
|
||
'</li>'
|
||
) in resp.text
|
||
assert (
|
||
'<li><span class="parameter">Dispatch Type:</span> Simple</li>'
|
||
'<li><span class="parameter">Role:</span> foobar</li>'
|
||
) in resp.text
|
||
|
||
|
||
def test_workflows_unused(pub):
|
||
create_superuser(pub)
|
||
FormDef.wipe()
|
||
Workflow.wipe()
|
||
app = login(get_app(pub))
|
||
resp = app.get('/backoffice/workflows/')
|
||
assert 'Unused workflows' not in resp.text
|
||
|
||
workflow = Workflow(name='Workflow One')
|
||
workflow.store()
|
||
resp = app.get('/backoffice/workflows/')
|
||
assert 'Unused workflows' in resp.text
|
||
|
||
formdef = FormDef()
|
||
formdef.name = 'form title'
|
||
formdef.fields = []
|
||
formdef.store()
|
||
resp = app.get('/backoffice/workflows/')
|
||
assert 'Unused workflows' in resp.text
|
||
|
||
formdef.workflow = workflow
|
||
formdef.store()
|
||
resp = app.get('/backoffice/workflows/')
|
||
assert 'Unused workflows' not in resp.text
|
||
|
||
workflow = Workflow(name='Workflow Two')
|
||
workflow.store()
|
||
resp = app.get('/backoffice/workflows/')
|
||
assert 'Unused workflows' in resp.text
|