wcs/tests/test_formdef.py

592 lines
22 KiB
Python

import datetime
import glob
import json
import os
import pickle
import sys
import shutil
import time
import pytest
from django.utils import six
from django.utils.encoding import force_bytes
from django.utils.six import BytesIO
from quixote import cleanup
from wcs import fields
from wcs.blocks import BlockDef
from wcs.formdef import FormDef, get_formdefs_of_all_kinds
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.form import PicklableUpload
from wcs.workflows import Workflow, AttachmentEvolutionPart, WorkflowBackofficeFieldsFormDef
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
from wcs.fields import StringField, FileField, DateField, ItemField, PageField
from utilities import create_temporary_pub, clean_temporary_pub
def pytest_generate_tests(metafunc):
if 'pub' in metafunc.fixturenames:
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True)
@pytest.fixture
def pub(request):
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
return pub
def teardown_module(module):
clean_temporary_pub()
def test_is_disabled(pub):
formdef = FormDef()
assert not formdef.is_disabled()
formdef.disabled = True
assert formdef.is_disabled()
def test_is_disabled_publication_date(pub):
formdef = FormDef()
formdef.publication_date = '%s-%02d-%02d' % (datetime.datetime.today() - datetime.timedelta(1)).timetuple()[:3]
assert not formdef.is_disabled()
formdef.publication_date = '%s-%02d-%02d' % (datetime.datetime.today() + datetime.timedelta(1)).timetuple()[:3]
assert formdef.is_disabled()
def test_is_disabled_expiration_date(pub):
formdef = FormDef()
formdef.expiration_date = '%s-%02d-%02d' % (datetime.datetime.today() - datetime.timedelta(1)).timetuple()[:3]
assert formdef.is_disabled()
formdef.expiration_date = '%s-%02d-%02d' % (datetime.datetime.today() + datetime.timedelta(1)).timetuple()[:3]
assert not formdef.is_disabled()
def test_is_disabled_publication_datetime(pub):
formdef = FormDef()
formdef.publication_date = '%s-%02d-%02d %02d:%02d' % (
datetime.datetime.now() - datetime.timedelta(hours=1)).timetuple()[:5]
assert not formdef.is_disabled()
formdef.publication_date = '%s-%02d-%02d %02d:%02d' % (
datetime.datetime.now() + datetime.timedelta(hours=1)).timetuple()[:5]
assert formdef.is_disabled()
def test_is_disabled_expiration_datetime(pub):
formdef = FormDef()
formdef.expiration_date = '%s-%02d-%02d %02d:%02d' % (
datetime.datetime.now() - datetime.timedelta(hours=1)).timetuple()[:5]
assert formdef.is_disabled()
formdef.expiration_date = '%s-%02d-%02d %02d:%02d' % (
datetime.datetime.now() + datetime.timedelta(hours=1)).timetuple()[:5]
assert not formdef.is_disabled()
def test_title_change(pub):
formdef = FormDef()
formdef.name = 'foo'
formdef.store()
assert FormDef.get(formdef.id).name == 'foo'
assert FormDef.get(formdef.id).url_name == 'foo'
assert FormDef.get(formdef.id).internal_identifier == 'foo'
formdef.name = 'bar'
formdef.store()
assert FormDef.get(formdef.id).name == 'bar'
assert FormDef.get(formdef.id).url_name == 'foo'
assert FormDef.get(formdef.id).internal_identifier == 'bar'
# makes sure the internal_name doesn't change if there are submitted forms
formdef.data_class()().store()
formdef.name = 'baz'
formdef.store()
assert FormDef.get(formdef.id).name == 'baz'
assert FormDef.get(formdef.id).internal_identifier == 'bar' # didn't change
def test_substitution_variables(pub):
formdef = FormDef()
formdef.name = 'foo'
formdef.store()
from wcs.workflows import WorkflowVariablesFieldsFormDef
wf = Workflow(name='variables')
wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
wf.variables_formdef.fields = [
StringField(id='1', label='Test', type='string', varname='foo'),
ItemField(id='2', label='Test Liste', type='item', varname='bar'),
]
wf.store()
formdef.workflow_id = wf.id
assert 'form_name' in formdef.get_substitution_variables()
assert formdef.get_substitution_variables()['form_name'] == 'foo'
formdef.workflow_options = {'foo': 'bar'}
assert 'form_option_foo' in formdef.get_substitution_variables()
assert formdef.get_substitution_variables()['form_option_foo'] == 'bar'
formdef.workflow_options = {'bar': 'bar', 'bar_display': 'Bar'}
assert formdef.get_substitution_variables()['form_option_bar'] == 'Bar'
assert formdef.get_substitution_variables()['form_option_bar_raw'] == 'bar'
def test_urls(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foo'
formdef.store()
assert formdef.get_url() == 'http://example.net/foo/'
assert formdef.get_url(backoffice=True) == 'http://example.net/backoffice/management/foo/'
del pub.cfg['misc']['frontoffice-url']
del pub.cfg['misc']['backoffice-url']
assert formdef.get_url() == 'https://example.net/foo/'
assert formdef.get_url(backoffice=True) == 'https://example.net/backoffice/management/foo/'
def test_schema_with_date_variable(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foo'
formdef.store()
from wcs.workflows import WorkflowVariablesFieldsFormDef
wf = Workflow(name='variables')
wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
wf.variables_formdef.fields.append(
DateField(label='Test', type='date', varname='foo'))
wf.store()
formdef.workflow_id = wf.id
formdef.workflow_options = {'foo': datetime.datetime(2016, 4, 2).timetuple()}
assert json.loads(formdef.export_to_json())['options']['foo'].startswith('2016-04-02')
def test_substitution_variables_object(pub):
formdef = FormDef()
formdef.name = 'foo'
formdef.store()
formdef.data_class().wipe()
assert 'form_objects' in formdef.get_substitution_variables()
substs = formdef.get_substitution_variables().get('form_objects')
assert substs.count == 0
assert substs.count_status_1 == 0
d = formdef.data_class()()
d.status = 'wf-1'
d.store()
assert substs.count == 1
assert substs.count_status_1 == 1
with pytest.raises(AttributeError):
assert substs.foobar
assert substs.formdef is formdef
def test_file_field_migration(pub):
pub.cfg['filetypes'] = {1:
{'mimetypes': [
'application/pdf',
'application/vnd.oasis.opendocument.text',
'application/msword',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'application/vnd.oasis.opendocument.spreadsheet',
'application/vnd.ms-excel',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'],
'label': 'Documents'}}
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foo'
formdef.fields = [
FileField(type='file', id='1', label='images & docs'),
FileField(type='file', id='2', label='images')]
formdef.fields[0].__dict__['file_type'] = ['image/*', 'application/pdf,application/vnd.oasis.opendocument.text,application/msword,application/vnd.openxmlformats-officedocument.wordprocessingml.document,application/vnd.oasis.opendocument.spreadsheet,application/vnd.ms-excel,application/vnd.openxmlformats-officedocument.spreadsheetml.sheet']
formdef.fields[1].__dict__['file_type'] = ['image/*']
formdef.store()
formdef = FormDef.get(1)
assert 'file_type' not in formdef.fields[0].__dict__
assert formdef.fields[0].document_type
assert formdef.fields[0].document_type['id'] == '_legacy'
assert formdef.fields[0].document_type['mimetypes'] == ['image/*', 'application/pdf,application/vnd.oasis.opendocument.text,application/msword,application/vnd.openxmlformats-officedocument.wordprocessingml.document,application/vnd.oasis.opendocument.spreadsheet,application/vnd.ms-excel,application/vnd.openxmlformats-officedocument.spreadsheetml.sheet']
assert formdef.fields[1].document_type['label'] == 'Image files'
assert formdef.fields[0].document_type['label'] == 'Image files, Documents'
def test_internal_identifier_migration(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foo'
formdef.fields = []
formdef.store()
obj = pickle.load(open(formdef.get_object_filename(), 'rb'))
del obj.internal_identifier
pickle.dump(obj, open(formdef.get_object_filename(), 'wb'))
assert pickle.load(open(formdef.get_object_filename(), 'rb')).internal_identifier is None
assert FormDef.get(formdef.id, ignore_migration=True).internal_identifier is None
formdef = FormDef.get(formdef.id)
assert formdef.internal_identifier == 'foo'
def test_page_field_migration(pub):
# disable snapshots as XML serialization of old post_conditions structure
# is not possible.
pub.snapshot_class = None
FormDef.wipe()
formdef = FormDef()
formdef.name = 'foo'
formdef.fields = [
PageField(id='1', type='page',
condition='foo',
# old post_conditions structure
post_conditions=[{'condition': 'blah', 'error_message': 'blah'}]),
]
formdef.store()
formdef = FormDef.get(formdef.id)
assert formdef.fields[0].condition == {'type': 'python', 'value': 'foo'}
assert formdef.fields[0].post_conditions == [
{'condition': {'type': 'python', 'value': 'blah'},
'error_message': 'blah'}]
def test_unused_file_removal_job(pub):
from wcs.formdef import clean_unused_files
BlockDef.wipe()
FormDef.wipe()
block = BlockDef()
block.name = 'foobar'
block.fields = [
fields.FileField(id='234', required=True, label='Test2', type='file'),
]
block.store()
formdef = FormDef()
formdef.name = 'removal job'
formdef.fields = [
fields.FileField(id='5', label='file', varname='filefield'),
fields.BlockField(id='6', label='test', type='block:foobar', max_items=3),
]
formdef.store()
formdef.data_class().wipe()
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.add_section('storage-remote')
pub.site_options.set('storage-remote', 'label', 'remote')
pub.site_options.set('storage-remote', 'class', 'wcs.qommon.upload_storage.RemoteOpaqueUploadStorage')
pub.site_options.set('storage-remote', 'ws', 'https://crypto.example.net/')
for behaviour in (None, 'move', 'remove'):
if behaviour:
pub.site_options.set('options', 'unused-files-behaviour', behaviour)
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {
'5': PicklableUpload('test.txt', 'text/plain'),
}
formdata.data['5'].receive([b'hello world'])
formdata.store()
assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
clean_unused_files(pub)
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
formdata.anonymise()
clean_unused_files(pub)
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
for i in range(5):
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {
'5': PicklableUpload('test.txt', 'text/plain'),
}
formdata.data['5'].receive([b'hello world'])
formdata.store()
# same file, deduplicated
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
formdata.anonymise()
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
for formdata in formdef.data_class().select():
formdata.anonymise()
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
clean_unused_files(pub)
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
# file referenced in formdef option
workflow = Workflow(name='variables')
from wcs.workflows import WorkflowVariablesFieldsFormDef
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow)
workflow.variables_formdef.fields.append(fields.FileField(id='1', label='Test', type='file'))
workflow.add_status('Status1', 'st1')
workflow.store()
formdef.workflow = workflow
formdef.workflow_options = {'1': PicklableUpload('test.txt', 'text/plain')}
formdef.workflow_options['1'].receive([b'hello world'])
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {
'5': PicklableUpload('test.txt', 'text/plain'),
}
formdata.data['5'].receive([b'hello world'])
formdata.store()
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
clean_unused_files(pub)
formdata.remove_self()
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
formdef.workflow_options = {}
formdef.store()
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
# file in block field
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {
'6': {
'data': [
{'234': PicklableUpload('test.txt', 'text/plain')},
{'234': PicklableUpload('test2.txt', 'text/plain')},
],
'schema': {'234': 'file'},
},
}
formdata.data['6']['data'][0]['234'].receive([b'hello world'])
formdata.data['6']['data'][1]['234'].receive([b'hello world block'])
formdata.workflow_data = {'wscall': {'data': ['not', 'a', 'block'], 'err': 0}}
formdata.store()
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 2
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 2
formdata.remove_self()
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
# non local storage: nothing happens
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {
'5': PicklableUpload('test.txt', 'text/plain'),
}
formdata.data['5'].receive([b'hello world'])
formdata.data['5'].storage = 'remote'
formdata.data['5'].storage_attrs = {'redirect_url': 'https://crypto.example.net/1234'}
formdata.store()
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
# workflow attachment
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {}
formdata.store()
formdata.evolution[-1].parts = [AttachmentEvolutionPart('hello.txt',
fp=BytesIO(b'hello world'), varname='testfile')]
formdata.store()
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 1
clean_unused_files(pub)
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 1
formdata.anonymise()
clean_unused_files(pub)
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 0
# files in user profile
from wcs.admin.settings import UserFieldsFormDef
user_formdef = UserFieldsFormDef(pub)
user_formdef.fields.append(fields.FileField(id='3', label='test', type='file'))
user_formdef.store()
user = pub.user_class()
user.email = 'bar@localhost'
user.form_data = {'3': PicklableUpload('test.txt', 'text/plain')}
user.form_data['3'].receive([b'hello world 2'])
user.store()
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
user.remove_self()
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
if behaviour == 'move':
# 3 files ("hello world" + "hello world 2" + "hello world block")
assert len(os.listdir(os.path.join(pub.app_dir, 'unused-files/uploads/'))) == 3
# 1 attachment
assert len(glob.glob(os.path.join(pub.app_dir, 'unused-files/attachments/*/*'))) == 1
# unknown unused-files-behaviour: do nothing
pub.site_options.set('options', 'unused-files-behaviour', 'foo')
formdata = formdef.data_class()()
formdata.just_created()
formdata.data = {
'5': PicklableUpload('test-no-remove.txt', 'text/plain'),
}
formdata.data['5'].receive([b'hello world'])
formdata.store()
assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
clean_unused_files(pub)
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
formdata.anonymise()
clean_unused_files(pub)
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1 # file is not removed
def test_get_formdefs_of_all_kinds(pub):
from wcs.admin.settings import UserFieldsFormDef
from wcs.carddef import CardDef
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
from wcs.workflows import WorkflowBackofficeFieldsFormDef, WorkflowVariablesFieldsFormDef
BlockDef.wipe()
FormDef.wipe()
Workflow.wipe()
formdefs = get_formdefs_of_all_kinds()
assert len(formdefs) == 1
assert formdefs[0].__class__ == UserFieldsFormDef
formdef = FormDef()
formdef.name = 'basic formdef'
formdef.store()
carddef = CardDef()
carddef.name = 'carddef'
carddef.store()
wf1 = Workflow(name='workflow with form fields formdef')
st1 = wf1.add_status('Status1', 'st1')
display_form1 = FormWorkflowStatusItem()
display_form1.formdef = WorkflowFormFieldsFormDef(item=display_form1)
display_form1.parent = st1
display_form2 = FormWorkflowStatusItem()
display_form2.parent = st1
st1.items.append(display_form1)
st1.items.append(display_form2) # empty formdef
wf1.store()
wf2 = Workflow(name='workflow with variables fields formdef')
wf2.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf2)
wf2.store()
wf3 = Workflow(name='workflow with backoffice fields formdef')
wf3.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf3)
wf3.store()
formdefs = get_formdefs_of_all_kinds()
assert len(formdefs) == 6
assert sorted([(f.name, f.__class__) for f in formdefs]) == [
('Backoffice fields of workflow "workflow with backoffice fields formdef"', WorkflowBackofficeFieldsFormDef),
('Form action in workflow "workflow with form fields formdef"', WorkflowFormFieldsFormDef),
('Options of workflow "workflow with variables fields formdef"', WorkflowVariablesFieldsFormDef),
('User Fields', UserFieldsFormDef),
('basic formdef', FormDef),
('carddef', CardDef),
]
def test_pickle_2to3_conversion(pub):
FormDef.wipe()
Workflow.wipe()
workflow = Workflow(name='blah')
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
workflow.backoffice_fields_formdef.fields = [
fields.StringField(
id='bo0', varname='foo_bovar', type='string', label='bo variable'),
]
status = workflow.add_status('Status1')
display_form = FormWorkflowStatusItem()
display_form.id = '_display_form'
display_form.by = []
display_form.varname = 'blah'
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
display_form.formdef.fields.append(
fields.StringField(
id='1', label='Test', varname='str', type='string', required=True))
status.items.append(display_form)
display_form.parent = status
workflow.store()
formdef = FormDef()
formdef.name = 'basic formdef'
formdef.workflow_id = workflow.id
formdef.workflow_options = {'bo0': 'whatever'}
formdef.workflow_roles = {'_receiver': '1'}
formdef.fields = [
StringField(id='1', label='Test', type='string', varname='foo'),
]
formdef.roles
formdef.store()
formdef_id = formdef.id
workflow_id = workflow.id
formdef_filename = os.path.join(formdef.get_objects_dir(), str(formdef.id))
workflow_filename = os.path.join(workflow.get_objects_dir(), str(workflow.id))
# turn pickle to bytes
def deep_str2bytes(obj, seen=None):
# reverse deep_bytes2str
if seen is None:
seen = {}
if obj is None or isinstance(obj, (int, float, bytes, time.struct_time, type(Ellipsis))):
return obj
if id(obj) in seen:
return obj
if isinstance(obj, str):
return force_bytes(obj)
seen[id(obj)] = True
if isinstance(obj, dict):
new_d = {}
for k, v in obj.items():
new_d[force_bytes(k)] = deep_str2bytes(v, seen)
return new_d
if isinstance(obj, list):
return [deep_str2bytes(x, seen) for x in obj]
if hasattr(obj, '__class__') and obj.__class__.__module__.startswith(('wcs.', 'qommon.', 'modules.')):
obj.__dict__ = deep_str2bytes(obj.__dict__, seen)
return obj
return obj
formdef.__dict__ = deep_str2bytes(formdef.__dict__)
pickle.dump(formdef, open(formdef_filename, 'wb'), protocol=2)
workflow.__dict__ = deep_str2bytes(workflow.__dict__)
pickle.dump(workflow, open(workflow_filename, 'wb'), protocol=2)
formdef = FormDef.get(formdef_id)
assert formdef.fields[0].label == 'Test'
assert formdef.workflow.possible_status[0].items[0].varname == 'blah'
assert formdef.workflow.possible_status[0].items[0].formdef.fields[0].varname == 'str'