wcs/tests/test_formdata.py

545 lines
19 KiB
Python

import collections
import pytest
import sys
import shutil
import time
from quixote import cleanup
from quixote.http_request import Upload
from wcs.qommon.http_request import HTTPRequest
from wcs import fields, formdef
from wcs.formdef import FormDef
from wcs.formdata import Evolution
from wcs.roles import Role
from wcs.workflows import Workflow, WorkflowCriticalityLevel, WorkflowBackofficeFieldsFormDef
from wcs.wf.anonymise import AnonymiseWorkflowStatusItem
from wcs.wf.wscall import JournalWsCallErrorPart
from wcs.wf.register_comment import JournalEvolutionPart
from utilities import create_temporary_pub, clean_temporary_pub
from test_api import local_user
def pytest_generate_tests(metafunc):
if 'pub' in metafunc.fixturenames:
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True)
@pytest.fixture
def pub(request):
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub._set_request(req)
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
FormDef.wipe()
global formdef
formdef = FormDef()
formdef.name = 'foobar'
formdef.url_name = 'foobar'
formdef.fields = []
formdef.store()
return pub
def teardown_module(module):
clean_temporary_pub()
def test_basic(pub):
formdata = formdef.data_class()()
substvars = formdata.get_substitution_variables()
assert substvars.get('form_status') == 'Unknown'
assert substvars.get('form_name') == 'foobar'
assert substvars.get('form_slug') == 'foobar'
def test_saved(pub):
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.store()
substvars = formdata.get_substitution_variables()
assert substvars.get('form_number') == '%s-1' % formdef.id
assert substvars.get('form_number_raw') == '1'
assert substvars.get('form_url').endswith('/foobar/1/')
assert substvars.get('form_url_backoffice').endswith('/backoffice/management/foobar/1/')
assert substvars.get('form_status_url').endswith('/foobar/1/status')
def test_auto_display_id(pub):
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.store()
substvars = formdata.get_substitution_variables()
assert substvars.get('form_number') == '%s-%s' % (formdef.id, formdata.id)
assert substvars.get('form_number_raw') == str(formdata.id)
def test_manual_display_id(pub):
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.id_display = 'bar'
formdata.store()
substvars = formdata.get_substitution_variables()
assert substvars.get('form_number') == 'bar'
assert substvars.get('form_number_raw') == str(formdata.id)
def test_submission_context(pub):
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.backoffice_submission = True
formdata.submission_channel = 'mail'
formdata.submission_context = {
'mail_url': 'http://www.example.com/test.pdf',
}
substvars = formdata.get_substitution_variables()
assert substvars.get('form_submission_backoffice') is True
assert substvars.get('form_submission_channel') == 'mail'
assert substvars.get('form_submission_channel_label') == 'Mail'
assert substvars.get('form_submission_context_mail_url') == 'http://www.example.com/test.pdf'
formdata = formdef.data_class()()
substvars = formdata.get_substitution_variables()
assert substvars.get('form_submission_backoffice') is False
assert substvars.get('form_submission_channel') is None
assert substvars.get('form_submission_channel_label') == 'Web'
def test_just_created(pub):
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
substvars = formdata.get_substitution_variables()
assert substvars.get('form_status') == 'Just Submitted'
assert substvars.get('form_status_is_endpoint') == False
assert substvars.get('form_receipt_date')
assert substvars.get('form_receipt_time')
assert substvars.get('form_receipt_datetime')
assert substvars.get('form_evolution')
def test_field(pub):
formdef.data_class().wipe()
formdef.fields = [fields.StringField(id='0', label='string')]
formdef.store()
formdata = formdef.data_class()()
substvars = formdata.get_substitution_variables()
assert 'form_f0' in substvars
assert not substvars.get('form_f0')
formdata.data = {'0': 'test'}
substvars = formdata.get_substitution_variables()
assert substvars.get('form_f0') == 'test'
assert substvars.get('form_field_string') == 'test'
def test_field_varname(pub):
formdef.data_class().wipe()
formdef.fields = [fields.StringField(id='0', label='string', varname='foo')]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'0': 'test'}
substvars = formdata.get_substitution_variables()
assert substvars.get('form_f0') == 'test'
assert substvars.get('form_var_foo') == 'test'
def test_file_field(pub):
formdef.data_class().wipe()
formdef.fields = [fields.FileField(id='0', label='file', varname='foo')]
formdef.store()
formdata = formdef.data_class()()
upload = Upload('test.txt', 'text/plain', 'ascii')
upload.receive(['first line', 'second line'])
formdata.data = {'0': upload}
formdata.id = 1
substvars = formdata.get_substitution_variables()
assert substvars.get('form_var_foo') == 'test.txt'
assert substvars.get('form_var_foo_url').endswith('/foobar/1/download?f=0')
assert isinstance(substvars.get('form_var_foo_raw'), Upload)
formdata.data = {'0': None}
substvars = formdata.get_substitution_variables()
assert substvars['form_var_foo'] is None
assert substvars['form_var_foo_raw'] is None
assert substvars['form_var_foo_url'] is None
formdata.data = {}
substvars = formdata.get_substitution_variables()
assert substvars['form_var_foo'] is None
assert substvars['form_var_foo_raw'] is None
assert substvars['form_var_foo_url'] is None
def test_get_submitter(pub):
formdef.data_class().wipe()
formdef.fields = [fields.StringField(id='0', label='email', varname='foo',
prefill={'type': 'user', 'value': 'email'})]
formdef.store()
formdata = formdef.data_class()()
assert formdef.get_submitter_email(formdata) is None
formdata.data = {'0': 'foo@localhost'}
assert formdef.get_submitter_email(formdata) == 'foo@localhost'
user = pub.user_class()
user.email = 'bar@localhost'
user.store()
formdata.user_id = user.id
assert formdef.get_submitter_email(formdata) == 'foo@localhost'
formdata.data = {}
assert formdef.get_submitter_email(formdata) == 'bar@localhost'
def test_get_last_update_time(pub):
formdef.data_class().wipe()
formdef.store()
formdata = formdef.data_class()()
assert formdata.last_update_time is None
formdata.just_created()
assert formdata.last_update_time == formdata.evolution[-1].time
time.sleep(1)
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
evo.comment = 'hello world'
formdata.evolution.append(evo)
assert formdata.last_update_time != formdata.receipt_time
assert formdata.last_update_time == formdata.evolution[-1].time
# check with missing 'evolution' values
formdata.evolution = None
assert formdata.last_update_time == formdata.receipt_time
def test_password_field(pub):
formdef.data_class().wipe()
formdef.fields = [fields.PasswordField(id='0', label='pwd')]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'0': {'cleartext': 'foo'}}
formdata.store()
formdata2 = formdata.get(formdata.id)
assert formdata2.data == {'0': {'cleartext': 'foo'}}
def test_date_field(pub):
formdef.data_class().wipe()
formdef.fields = [fields.DateField(id='0', label='date')]
formdef.store()
formdata = formdef.data_class()()
value = time.strptime('2015-05-12', '%Y-%m-%d')
formdata.data = {'0': value}
formdata.store()
formdata2 = formdata.get(formdata.id)
assert formdata2.data == {'0': value}
assert formdata2.get_substitution_variables()['form_field_date'] == '2015-05-12'
pub.cfg['language'] = {'language': 'fr'}
assert formdata2.get_substitution_variables()['form_field_date'] == '12/05/2015'
pub.cfg['language'] = {'language': 'en'}
def test_clean_drafts(pub):
formdef = FormDef()
formdef.name = 'foo'
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
d = formdef.data_class()()
d.status = 'draft'
d.receipt_time = time.localtime()
d.store()
d_id1 = d.id
d = formdef.data_class()()
d.status = 'draft'
d.receipt_time = time.localtime(0) # epoch, 1970-01-01
d.store()
d_id2 = d.id
assert formdef.data_class().count() == 2
from wcs.formdef import clean_drafts
clean_drafts(pub)
assert formdef.data_class().count() == 1
assert formdef.data_class().select()[0].id == d_id1
def test_criticality_levels(pub):
workflow = Workflow(name='criticality')
workflow.criticality_levels = [
WorkflowCriticalityLevel(name='green'),
WorkflowCriticalityLevel(name='yellow'),
WorkflowCriticalityLevel(name='red'),
]
workflow.store()
formdef = FormDef()
formdef.name = 'foo'
formdef.fields = []
formdef.workflow_id = workflow.id
formdef.store()
formdef.data_class().wipe()
d = formdef.data_class()()
assert d.get_criticality_level_object().name == 'green'
d.increase_criticality_level()
assert d.get_criticality_level_object().name == 'yellow'
d.increase_criticality_level()
assert d.get_criticality_level_object().name == 'red'
d.increase_criticality_level()
assert d.get_criticality_level_object().name == 'red'
d.decrease_criticality_level()
assert d.get_criticality_level_object().name == 'yellow'
d.decrease_criticality_level()
assert d.get_criticality_level_object().name == 'green'
d.decrease_criticality_level()
assert d.get_criticality_level_object().name == 'green'
d.set_criticality_level(1)
assert d.get_criticality_level_object().name == 'yellow'
d.set_criticality_level(2)
assert d.get_criticality_level_object().name == 'red'
d.set_criticality_level(4)
assert d.get_criticality_level_object().name == 'red'
workflow.criticality_levels = [WorkflowCriticalityLevel(name='green')]
workflow.store()
formdef = FormDef.get(id=formdef.id) # reload formdef
d = formdef.data_class()()
assert d.get_criticality_level_object().name == 'green'
d.increase_criticality_level()
assert d.get_criticality_level_object().name == 'green'
workflow.criticality_levels = [
WorkflowCriticalityLevel(name='green'),
WorkflowCriticalityLevel(name='yellow'),
]
workflow.store()
formdef = FormDef.get(id=formdef.id) # reload formdef
d = formdef.data_class()()
d.criticality_level = 104
# set too high, this simulates a workflow being changed to have less
# levels than before.
assert d.get_criticality_level_object().name == 'yellow'
d.increase_criticality_level()
assert d.get_criticality_level_object().name == 'yellow'
d.decrease_criticality_level()
assert d.get_criticality_level_object().name == 'green'
d.criticality_level = 104
d.decrease_criticality_level()
assert d.get_criticality_level_object().name == 'green'
def test_field_item_substvars(pub):
ds = {
'type': 'formula',
'value': repr([('1', 'un'), ('2', 'deux')]),
}
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [fields.ItemField(id='0', label='string', data_source=ds,
varname='xxx')]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'0': '1', '0_display': 'un'}
variables = formdata.get_substitution_variables()
assert variables.get('form_var_xxx') == 'un'
assert variables.get('form_var_xxx_raw') == '1'
def test_get_json_export_dict_evolution(pub, local_user):
Workflow.wipe()
workflow = Workflow(name='test')
st_new = workflow.add_status('New')
st_finished = workflow.add_status('Finished')
workflow.store()
formdef = FormDef()
formdef.workflow_id = workflow.id
formdef.name = 'foo'
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
d = formdef.data_class()()
d.status = 'wf-%s' % st_new.id
d.user_id = local_user.id
d.receipt_time = time.localtime()
evo = Evolution()
evo.time = time.localtime()
evo.status = 'wf-%s' % st_new.id
evo.who = '_submitter'
d.evolution = [evo]
d.store()
evo.add_part(JournalEvolutionPart(d, "ok"))
evo.add_part(JournalWsCallErrorPart("summary", "label", "data"))
evo = Evolution()
evo.time = time.localtime()
evo.status = 'wf-%s' % st_finished.id
evo.who = '_submitter'
d.evolution.append(evo)
d.store()
export = d.get_json_export_dict()
assert 'evolution' in export
assert len(export['evolution']) == 2
assert export['evolution'][0]['status'] == st_new.id
assert 'time' in export['evolution'][0]
assert export['evolution'][0]['who']['id'] == local_user.id
assert export['evolution'][0]['who']['email'] == local_user.email
assert export['evolution'][0]['who']['NameID'] == local_user.name_identifiers
assert 'parts' in export['evolution'][0]
assert len(export['evolution'][0]['parts']) == 2
assert export['evolution'][0]['parts'][0]['type'] == 'workflow-comment'
assert export['evolution'][0]['parts'][0]['content'] == 'ok'
assert export['evolution'][0]['parts'][1]['type'] == 'wscall-error'
assert export['evolution'][0]['parts'][1]['summary'] == 'summary'
assert export['evolution'][0]['parts'][1]['label'] == 'label'
assert export['evolution'][0]['parts'][1]['data'] == 'data'
assert export['evolution'][1]['status'] == st_finished.id
assert 'time' in export['evolution'][1]
assert export['evolution'][1]['who']['id'] == local_user.id
assert export['evolution'][1]['who']['email'] == local_user.email
assert export['evolution'][1]['who']['NameID'] == local_user.name_identifiers
assert 'parts' not in export['evolution'][1]
export = d.get_json_export_dict(anonymise=True)
assert 'evolution' in export
assert len(export['evolution']) == 2
assert export['evolution'][0]['status'] == st_new.id
assert 'time' in export['evolution'][0]
assert 'who' not in export['evolution'][0]
assert 'parts' in export['evolution'][0]
assert len(export['evolution'][0]['parts']) == 2
assert len(export['evolution'][0]['parts'][0]) == 1
assert export['evolution'][0]['parts'][0]['type'] == 'workflow-comment'
assert len(export['evolution'][0]['parts'][1]) == 1
assert export['evolution'][0]['parts'][1]['type'] == 'wscall-error'
assert export['evolution'][1]['status'] == st_finished.id
assert 'time' in export['evolution'][1]
assert 'who' not in export['evolution'][0]
assert 'parts' not in export['evolution'][1]
def test_field_bool_substvars(pub):
formdef = FormDef()
formdef.name = 'foobar'
formdef.fields = [fields.BoolField(id='0', label='checkbox', varname='xxx')]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'0': False}
variables = formdata.get_substitution_variables()
assert variables.get('form_var_xxx') == 'False'
assert variables.get('form_var_xxx_raw') is False
formdata.data = {'0': True}
variables = formdata.get_substitution_variables()
assert variables.get('form_var_xxx') == 'True'
assert variables.get('form_var_xxx_raw') is True
def test_backoffice_field_varname(pub):
wf = Workflow(name='bo fields')
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf)
wf.backoffice_fields_formdef.fields = [
fields.StringField(id='bo1', label='1st backoffice field',
type='string', varname='backoffice_blah'),
]
st1 = wf.add_status('Status1')
wf.store()
formdef.workflow_id = wf.id
formdef.data_class().wipe()
formdef.fields = [fields.StringField(id='0', label='string', varname='foo')]
formdef.store()
formdata = formdef.data_class()()
formdata.data = {'bo1': 'test'}
substvars = formdata.get_substitution_variables()
assert substvars.get('form_var_backoffice_blah') == 'test'
def test_private_history(pub, local_user):
formdef.data_class().wipe()
formdef.private_status_and_history = True
formdef.store()
formdata = formdef.data_class()()
formdata.store()
assert formdef.is_user_allowed_read_status_and_history(None, formdata=formdata) is False
assert formdef.is_user_allowed_read_status_and_history(local_user, formdata=formdata) is False
local_user.is_admin = True
assert formdef.is_user_allowed_read_status_and_history(local_user, formdata=formdata) is True
local_user.is_admin = False
role = Role(name='foobar')
role.store()
formdef.workflow_roles['_receiver'] = role.id
assert formdef.is_user_allowed_read_status_and_history(local_user, formdata=formdata) is False
local_user.roles = [role.id]
assert formdef.is_user_allowed_read_status_and_history(local_user, formdata=formdata) is True
def test_workflow_data_file_url(pub):
upload = Upload('test.txt', 'text/plain', 'ascii')
upload.receive(['first line', 'second line'])
formdata = formdef.data_class()()
formdata.store()
# create workflow_data as ordered dict to be sure _url comes last, to
# trigger #17233.
formdata.workflow_data = collections.OrderedDict(
foo_var_file='test.txt',
foo_var_file_raw=upload,
foo_var_file_url=None,
)
substvars = formdata.get_substitution_variables()
assert substvars['foo_var_file_url']
def test_evolution_get_status(pub):
Workflow.wipe()
workflow = Workflow(name='test')
st_new = workflow.add_status('New')
st_finished = workflow.add_status('Finished')
workflow.store()
formdef = FormDef()
formdef.workflow_id = workflow.id
formdef.name = 'foo'
formdef.fields = []
formdef.store()
formdef.data_class().wipe()
d = formdef.data_class()()
d.evolution = []
evo = Evolution()
evo.time = time.localtime()
evo.status = 'wf-%s' % st_new.id
d.evolution.append(evo)
evo = Evolution()
evo.time = time.localtime()
d.evolution.append(evo)
evo = Evolution()
evo.time = time.localtime()
d.evolution.append(evo)
evo = Evolution()
evo.time = time.localtime()
evo.status = 'wf-%s' % st_finished.id
d.evolution.append(evo)
evo = Evolution()
evo.time = time.localtime()
d.evolution.append(evo)
d.store()
d = formdef.data_class().get(d.id)
assert [x.get_status().id for x in d.evolution] == ['1', '1', '1', '2', '2']