545 lines
19 KiB
Python
545 lines
19 KiB
Python
import collections
|
|
import pytest
|
|
import sys
|
|
import shutil
|
|
import time
|
|
|
|
from quixote import cleanup
|
|
from quixote.http_request import Upload
|
|
from wcs.qommon.http_request import HTTPRequest
|
|
from wcs import fields, formdef
|
|
from wcs.formdef import FormDef
|
|
from wcs.formdata import Evolution
|
|
from wcs.roles import Role
|
|
from wcs.workflows import Workflow, WorkflowCriticalityLevel, WorkflowBackofficeFieldsFormDef
|
|
from wcs.wf.anonymise import AnonymiseWorkflowStatusItem
|
|
from wcs.wf.wscall import JournalWsCallErrorPart
|
|
from wcs.wf.register_comment import JournalEvolutionPart
|
|
|
|
from utilities import create_temporary_pub, clean_temporary_pub
|
|
|
|
from test_api import local_user
|
|
|
|
def pytest_generate_tests(metafunc):
|
|
if 'pub' in metafunc.fixturenames:
|
|
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True)
|
|
|
|
@pytest.fixture
|
|
def pub(request):
|
|
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
|
|
|
|
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
|
|
pub.set_app_dir(req)
|
|
pub._set_request(req)
|
|
pub.cfg['identification'] = {'methods': ['password']}
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
|
|
FormDef.wipe()
|
|
|
|
global formdef
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.url_name = 'foobar'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
return pub
|
|
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
|
|
|
|
def test_basic(pub):
|
|
formdata = formdef.data_class()()
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars.get('form_status') == 'Unknown'
|
|
assert substvars.get('form_name') == 'foobar'
|
|
assert substvars.get('form_slug') == 'foobar'
|
|
|
|
def test_saved(pub):
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars.get('form_number') == '%s-1' % formdef.id
|
|
assert substvars.get('form_number_raw') == '1'
|
|
assert substvars.get('form_url').endswith('/foobar/1/')
|
|
assert substvars.get('form_url_backoffice').endswith('/backoffice/management/foobar/1/')
|
|
assert substvars.get('form_status_url').endswith('/foobar/1/status')
|
|
|
|
def test_auto_display_id(pub):
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars.get('form_number') == '%s-%s' % (formdef.id, formdata.id)
|
|
assert substvars.get('form_number_raw') == str(formdata.id)
|
|
|
|
def test_manual_display_id(pub):
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
formdata.id_display = 'bar'
|
|
formdata.store()
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars.get('form_number') == 'bar'
|
|
assert substvars.get('form_number_raw') == str(formdata.id)
|
|
|
|
def test_submission_context(pub):
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
formdata.backoffice_submission = True
|
|
formdata.submission_channel = 'mail'
|
|
formdata.submission_context = {
|
|
'mail_url': 'http://www.example.com/test.pdf',
|
|
}
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars.get('form_submission_backoffice') is True
|
|
assert substvars.get('form_submission_channel') == 'mail'
|
|
assert substvars.get('form_submission_channel_label') == 'Mail'
|
|
assert substvars.get('form_submission_context_mail_url') == 'http://www.example.com/test.pdf'
|
|
|
|
formdata = formdef.data_class()()
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars.get('form_submission_backoffice') is False
|
|
assert substvars.get('form_submission_channel') is None
|
|
assert substvars.get('form_submission_channel_label') == 'Web'
|
|
|
|
def test_just_created(pub):
|
|
formdef.data_class().wipe()
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.store()
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars.get('form_status') == 'Just Submitted'
|
|
assert substvars.get('form_status_is_endpoint') == False
|
|
assert substvars.get('form_receipt_date')
|
|
assert substvars.get('form_receipt_time')
|
|
assert substvars.get('form_receipt_datetime')
|
|
assert substvars.get('form_evolution')
|
|
|
|
def test_field(pub):
|
|
formdef.data_class().wipe()
|
|
formdef.fields = [fields.StringField(id='0', label='string')]
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
substvars = formdata.get_substitution_variables()
|
|
assert 'form_f0' in substvars
|
|
assert not substvars.get('form_f0')
|
|
|
|
formdata.data = {'0': 'test'}
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars.get('form_f0') == 'test'
|
|
assert substvars.get('form_field_string') == 'test'
|
|
|
|
def test_field_varname(pub):
|
|
formdef.data_class().wipe()
|
|
formdef.fields = [fields.StringField(id='0', label='string', varname='foo')]
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'0': 'test'}
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars.get('form_f0') == 'test'
|
|
assert substvars.get('form_var_foo') == 'test'
|
|
|
|
def test_file_field(pub):
|
|
formdef.data_class().wipe()
|
|
formdef.fields = [fields.FileField(id='0', label='file', varname='foo')]
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
upload = Upload('test.txt', 'text/plain', 'ascii')
|
|
upload.receive(['first line', 'second line'])
|
|
formdata.data = {'0': upload}
|
|
formdata.id = 1
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars.get('form_var_foo') == 'test.txt'
|
|
assert substvars.get('form_var_foo_url').endswith('/foobar/1/download?f=0')
|
|
assert isinstance(substvars.get('form_var_foo_raw'), Upload)
|
|
|
|
formdata.data = {'0': None}
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars['form_var_foo'] is None
|
|
assert substvars['form_var_foo_raw'] is None
|
|
assert substvars['form_var_foo_url'] is None
|
|
|
|
formdata.data = {}
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars['form_var_foo'] is None
|
|
assert substvars['form_var_foo_raw'] is None
|
|
assert substvars['form_var_foo_url'] is None
|
|
|
|
def test_get_submitter(pub):
|
|
formdef.data_class().wipe()
|
|
formdef.fields = [fields.StringField(id='0', label='email', varname='foo',
|
|
prefill={'type': 'user', 'value': 'email'})]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
assert formdef.get_submitter_email(formdata) is None
|
|
formdata.data = {'0': 'foo@localhost'}
|
|
assert formdef.get_submitter_email(formdata) == 'foo@localhost'
|
|
|
|
user = pub.user_class()
|
|
user.email = 'bar@localhost'
|
|
user.store()
|
|
|
|
formdata.user_id = user.id
|
|
assert formdef.get_submitter_email(formdata) == 'foo@localhost'
|
|
|
|
formdata.data = {}
|
|
assert formdef.get_submitter_email(formdata) == 'bar@localhost'
|
|
|
|
def test_get_last_update_time(pub):
|
|
formdef.data_class().wipe()
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
assert formdata.last_update_time is None
|
|
|
|
formdata.just_created()
|
|
assert formdata.last_update_time == formdata.evolution[-1].time
|
|
|
|
time.sleep(1)
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
evo.status = formdata.status
|
|
evo.comment = 'hello world'
|
|
formdata.evolution.append(evo)
|
|
assert formdata.last_update_time != formdata.receipt_time
|
|
assert formdata.last_update_time == formdata.evolution[-1].time
|
|
|
|
# check with missing 'evolution' values
|
|
formdata.evolution = None
|
|
assert formdata.last_update_time == formdata.receipt_time
|
|
|
|
def test_password_field(pub):
|
|
formdef.data_class().wipe()
|
|
formdef.fields = [fields.PasswordField(id='0', label='pwd')]
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'0': {'cleartext': 'foo'}}
|
|
formdata.store()
|
|
|
|
formdata2 = formdata.get(formdata.id)
|
|
assert formdata2.data == {'0': {'cleartext': 'foo'}}
|
|
|
|
def test_date_field(pub):
|
|
formdef.data_class().wipe()
|
|
formdef.fields = [fields.DateField(id='0', label='date')]
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
value = time.strptime('2015-05-12', '%Y-%m-%d')
|
|
formdata.data = {'0': value}
|
|
formdata.store()
|
|
|
|
formdata2 = formdata.get(formdata.id)
|
|
assert formdata2.data == {'0': value}
|
|
|
|
assert formdata2.get_substitution_variables()['form_field_date'] == '2015-05-12'
|
|
pub.cfg['language'] = {'language': 'fr'}
|
|
assert formdata2.get_substitution_variables()['form_field_date'] == '12/05/2015'
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
|
|
def test_clean_drafts(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
d = formdef.data_class()()
|
|
d.status = 'draft'
|
|
d.receipt_time = time.localtime()
|
|
d.store()
|
|
d_id1 = d.id
|
|
|
|
d = formdef.data_class()()
|
|
d.status = 'draft'
|
|
d.receipt_time = time.localtime(0) # epoch, 1970-01-01
|
|
d.store()
|
|
d_id2 = d.id
|
|
|
|
assert formdef.data_class().count() == 2
|
|
from wcs.formdef import clean_drafts
|
|
clean_drafts(pub)
|
|
assert formdef.data_class().count() == 1
|
|
assert formdef.data_class().select()[0].id == d_id1
|
|
|
|
def test_criticality_levels(pub):
|
|
workflow = Workflow(name='criticality')
|
|
workflow.criticality_levels = [
|
|
WorkflowCriticalityLevel(name='green'),
|
|
WorkflowCriticalityLevel(name='yellow'),
|
|
WorkflowCriticalityLevel(name='red'),
|
|
]
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.fields = []
|
|
formdef.workflow_id = workflow.id
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
d = formdef.data_class()()
|
|
assert d.get_criticality_level_object().name == 'green'
|
|
d.increase_criticality_level()
|
|
assert d.get_criticality_level_object().name == 'yellow'
|
|
d.increase_criticality_level()
|
|
assert d.get_criticality_level_object().name == 'red'
|
|
d.increase_criticality_level()
|
|
assert d.get_criticality_level_object().name == 'red'
|
|
d.decrease_criticality_level()
|
|
assert d.get_criticality_level_object().name == 'yellow'
|
|
d.decrease_criticality_level()
|
|
assert d.get_criticality_level_object().name == 'green'
|
|
d.decrease_criticality_level()
|
|
assert d.get_criticality_level_object().name == 'green'
|
|
d.set_criticality_level(1)
|
|
assert d.get_criticality_level_object().name == 'yellow'
|
|
d.set_criticality_level(2)
|
|
assert d.get_criticality_level_object().name == 'red'
|
|
d.set_criticality_level(4)
|
|
assert d.get_criticality_level_object().name == 'red'
|
|
|
|
workflow.criticality_levels = [WorkflowCriticalityLevel(name='green')]
|
|
workflow.store()
|
|
formdef = FormDef.get(id=formdef.id) # reload formdef
|
|
d = formdef.data_class()()
|
|
assert d.get_criticality_level_object().name == 'green'
|
|
d.increase_criticality_level()
|
|
assert d.get_criticality_level_object().name == 'green'
|
|
|
|
workflow.criticality_levels = [
|
|
WorkflowCriticalityLevel(name='green'),
|
|
WorkflowCriticalityLevel(name='yellow'),
|
|
]
|
|
workflow.store()
|
|
formdef = FormDef.get(id=formdef.id) # reload formdef
|
|
d = formdef.data_class()()
|
|
d.criticality_level = 104
|
|
# set too high, this simulates a workflow being changed to have less
|
|
# levels than before.
|
|
assert d.get_criticality_level_object().name == 'yellow'
|
|
d.increase_criticality_level()
|
|
assert d.get_criticality_level_object().name == 'yellow'
|
|
d.decrease_criticality_level()
|
|
assert d.get_criticality_level_object().name == 'green'
|
|
|
|
d.criticality_level = 104
|
|
d.decrease_criticality_level()
|
|
assert d.get_criticality_level_object().name == 'green'
|
|
|
|
def test_field_item_substvars(pub):
|
|
ds = {
|
|
'type': 'formula',
|
|
'value': repr([('1', 'un'), ('2', 'deux')]),
|
|
}
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.fields = [fields.ItemField(id='0', label='string', data_source=ds,
|
|
varname='xxx')]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'0': '1', '0_display': 'un'}
|
|
|
|
variables = formdata.get_substitution_variables()
|
|
assert variables.get('form_var_xxx') == 'un'
|
|
assert variables.get('form_var_xxx_raw') == '1'
|
|
|
|
def test_get_json_export_dict_evolution(pub, local_user):
|
|
Workflow.wipe()
|
|
workflow = Workflow(name='test')
|
|
st_new = workflow.add_status('New')
|
|
st_finished = workflow.add_status('Finished')
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.workflow_id = workflow.id
|
|
formdef.name = 'foo'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
d = formdef.data_class()()
|
|
d.status = 'wf-%s' % st_new.id
|
|
d.user_id = local_user.id
|
|
d.receipt_time = time.localtime()
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
evo.status = 'wf-%s' % st_new.id
|
|
evo.who = '_submitter'
|
|
d.evolution = [evo]
|
|
d.store()
|
|
evo.add_part(JournalEvolutionPart(d, "ok"))
|
|
evo.add_part(JournalWsCallErrorPart("summary", "label", "data"))
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
evo.status = 'wf-%s' % st_finished.id
|
|
evo.who = '_submitter'
|
|
d.evolution.append(evo)
|
|
d.store()
|
|
|
|
export = d.get_json_export_dict()
|
|
assert 'evolution' in export
|
|
assert len(export['evolution']) == 2
|
|
assert export['evolution'][0]['status'] == st_new.id
|
|
assert 'time' in export['evolution'][0]
|
|
assert export['evolution'][0]['who']['id'] == local_user.id
|
|
assert export['evolution'][0]['who']['email'] == local_user.email
|
|
assert export['evolution'][0]['who']['NameID'] == local_user.name_identifiers
|
|
assert 'parts' in export['evolution'][0]
|
|
assert len(export['evolution'][0]['parts']) == 2
|
|
assert export['evolution'][0]['parts'][0]['type'] == 'workflow-comment'
|
|
assert export['evolution'][0]['parts'][0]['content'] == 'ok'
|
|
assert export['evolution'][0]['parts'][1]['type'] == 'wscall-error'
|
|
assert export['evolution'][0]['parts'][1]['summary'] == 'summary'
|
|
assert export['evolution'][0]['parts'][1]['label'] == 'label'
|
|
assert export['evolution'][0]['parts'][1]['data'] == 'data'
|
|
assert export['evolution'][1]['status'] == st_finished.id
|
|
assert 'time' in export['evolution'][1]
|
|
assert export['evolution'][1]['who']['id'] == local_user.id
|
|
assert export['evolution'][1]['who']['email'] == local_user.email
|
|
assert export['evolution'][1]['who']['NameID'] == local_user.name_identifiers
|
|
assert 'parts' not in export['evolution'][1]
|
|
|
|
export = d.get_json_export_dict(anonymise=True)
|
|
assert 'evolution' in export
|
|
assert len(export['evolution']) == 2
|
|
assert export['evolution'][0]['status'] == st_new.id
|
|
assert 'time' in export['evolution'][0]
|
|
assert 'who' not in export['evolution'][0]
|
|
assert 'parts' in export['evolution'][0]
|
|
assert len(export['evolution'][0]['parts']) == 2
|
|
assert len(export['evolution'][0]['parts'][0]) == 1
|
|
assert export['evolution'][0]['parts'][0]['type'] == 'workflow-comment'
|
|
assert len(export['evolution'][0]['parts'][1]) == 1
|
|
assert export['evolution'][0]['parts'][1]['type'] == 'wscall-error'
|
|
assert export['evolution'][1]['status'] == st_finished.id
|
|
assert 'time' in export['evolution'][1]
|
|
assert 'who' not in export['evolution'][0]
|
|
assert 'parts' not in export['evolution'][1]
|
|
|
|
def test_field_bool_substvars(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.fields = [fields.BoolField(id='0', label='checkbox', varname='xxx')]
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
|
|
formdata.data = {'0': False}
|
|
variables = formdata.get_substitution_variables()
|
|
assert variables.get('form_var_xxx') == 'False'
|
|
assert variables.get('form_var_xxx_raw') is False
|
|
|
|
formdata.data = {'0': True}
|
|
variables = formdata.get_substitution_variables()
|
|
assert variables.get('form_var_xxx') == 'True'
|
|
assert variables.get('form_var_xxx_raw') is True
|
|
|
|
def test_backoffice_field_varname(pub):
|
|
wf = Workflow(name='bo fields')
|
|
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
|
|
wf.backoffice_fields_formdef.fields = [
|
|
fields.StringField(id='bo1', label='1st backoffice field',
|
|
type='string', varname='backoffice_blah'),
|
|
]
|
|
st1 = wf.add_status('Status1')
|
|
wf.store()
|
|
|
|
formdef.workflow_id = wf.id
|
|
formdef.data_class().wipe()
|
|
formdef.fields = [fields.StringField(id='0', label='string', varname='foo')]
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
formdata.data = {'bo1': 'test'}
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars.get('form_var_backoffice_blah') == 'test'
|
|
|
|
def test_private_history(pub, local_user):
|
|
formdef.data_class().wipe()
|
|
formdef.private_status_and_history = True
|
|
formdef.store()
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
|
|
assert formdef.is_user_allowed_read_status_and_history(None, formdata=formdata) is False
|
|
|
|
assert formdef.is_user_allowed_read_status_and_history(local_user, formdata=formdata) is False
|
|
local_user.is_admin = True
|
|
assert formdef.is_user_allowed_read_status_and_history(local_user, formdata=formdata) is True
|
|
local_user.is_admin = False
|
|
|
|
role = Role(name='foobar')
|
|
role.store()
|
|
|
|
formdef.workflow_roles['_receiver'] = role.id
|
|
assert formdef.is_user_allowed_read_status_and_history(local_user, formdata=formdata) is False
|
|
|
|
local_user.roles = [role.id]
|
|
assert formdef.is_user_allowed_read_status_and_history(local_user, formdata=formdata) is True
|
|
|
|
def test_workflow_data_file_url(pub):
|
|
upload = Upload('test.txt', 'text/plain', 'ascii')
|
|
upload.receive(['first line', 'second line'])
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.store()
|
|
# create workflow_data as ordered dict to be sure _url comes last, to
|
|
# trigger #17233.
|
|
formdata.workflow_data = collections.OrderedDict(
|
|
foo_var_file='test.txt',
|
|
foo_var_file_raw=upload,
|
|
foo_var_file_url=None,
|
|
)
|
|
substvars = formdata.get_substitution_variables()
|
|
assert substvars['foo_var_file_url']
|
|
|
|
def test_evolution_get_status(pub):
|
|
Workflow.wipe()
|
|
workflow = Workflow(name='test')
|
|
st_new = workflow.add_status('New')
|
|
st_finished = workflow.add_status('Finished')
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.workflow_id = workflow.id
|
|
formdef.name = 'foo'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
d = formdef.data_class()()
|
|
d.evolution = []
|
|
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
evo.status = 'wf-%s' % st_new.id
|
|
d.evolution.append(evo)
|
|
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
d.evolution.append(evo)
|
|
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
d.evolution.append(evo)
|
|
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
evo.status = 'wf-%s' % st_finished.id
|
|
d.evolution.append(evo)
|
|
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
d.evolution.append(evo)
|
|
|
|
d.store()
|
|
d = formdef.data_class().get(d.id)
|
|
|
|
assert [x.get_status().id for x in d.evolution] == ['1', '1', '1', '2', '2']
|