441 lines
17 KiB
Python
441 lines
17 KiB
Python
import datetime
|
|
import glob
|
|
import json
|
|
import os
|
|
import pickle
|
|
import sys
|
|
import shutil
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from django.utils.six import BytesIO
|
|
from quixote import cleanup
|
|
from wcs import fields
|
|
from wcs.formdef import FormDef, get_formdefs_of_all_kinds
|
|
from wcs.qommon.http_request import HTTPRequest
|
|
from wcs.qommon.form import PicklableUpload
|
|
from wcs.workflows import Workflow, AttachmentEvolutionPart
|
|
from wcs.fields import StringField, FileField, DateField, ItemField, PageField
|
|
|
|
from utilities import create_temporary_pub, clean_temporary_pub
|
|
|
|
def pytest_generate_tests(metafunc):
|
|
if 'pub' in metafunc.fixturenames:
|
|
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True)
|
|
|
|
@pytest.fixture
|
|
def pub(request):
|
|
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
|
|
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
|
|
pub.set_app_dir(req)
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
|
|
return pub
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
|
|
|
|
def test_is_disabled(pub):
|
|
formdef = FormDef()
|
|
assert not formdef.is_disabled()
|
|
|
|
formdef.disabled = True
|
|
assert formdef.is_disabled()
|
|
|
|
|
|
def test_is_disabled_publication_date(pub):
|
|
formdef = FormDef()
|
|
|
|
formdef.publication_date = '%s-%02d-%02d' % (datetime.datetime.today() - datetime.timedelta(1)).timetuple()[:3]
|
|
assert not formdef.is_disabled()
|
|
|
|
formdef.publication_date = '%s-%02d-%02d' % (datetime.datetime.today() + datetime.timedelta(1)).timetuple()[:3]
|
|
assert formdef.is_disabled()
|
|
|
|
|
|
def test_is_disabled_expiration_date(pub):
|
|
formdef = FormDef()
|
|
|
|
formdef.expiration_date = '%s-%02d-%02d' % (datetime.datetime.today() - datetime.timedelta(1)).timetuple()[:3]
|
|
assert formdef.is_disabled()
|
|
|
|
formdef.expiration_date = '%s-%02d-%02d' % (datetime.datetime.today() + datetime.timedelta(1)).timetuple()[:3]
|
|
assert not formdef.is_disabled()
|
|
|
|
|
|
def test_is_disabled_publication_datetime(pub):
|
|
formdef = FormDef()
|
|
|
|
formdef.publication_date = '%s-%02d-%02d %02d:%02d' % (
|
|
datetime.datetime.now() - datetime.timedelta(hours=1)).timetuple()[:5]
|
|
assert not formdef.is_disabled()
|
|
|
|
formdef.publication_date = '%s-%02d-%02d %02d:%02d' % (
|
|
datetime.datetime.now() + datetime.timedelta(hours=1)).timetuple()[:5]
|
|
assert formdef.is_disabled()
|
|
|
|
|
|
def test_is_disabled_expiration_datetime(pub):
|
|
formdef = FormDef()
|
|
|
|
formdef.expiration_date = '%s-%02d-%02d %02d:%02d' % (
|
|
datetime.datetime.now() - datetime.timedelta(hours=1)).timetuple()[:5]
|
|
assert formdef.is_disabled()
|
|
|
|
formdef.expiration_date = '%s-%02d-%02d %02d:%02d' % (
|
|
datetime.datetime.now() + datetime.timedelta(hours=1)).timetuple()[:5]
|
|
assert not formdef.is_disabled()
|
|
|
|
def test_title_change(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.store()
|
|
assert FormDef.get(formdef.id).name == 'foo'
|
|
assert FormDef.get(formdef.id).url_name == 'foo'
|
|
assert FormDef.get(formdef.id).internal_identifier == 'foo'
|
|
|
|
formdef.name = 'bar'
|
|
formdef.store()
|
|
assert FormDef.get(formdef.id).name == 'bar'
|
|
assert FormDef.get(formdef.id).url_name == 'foo'
|
|
assert FormDef.get(formdef.id).internal_identifier == 'bar'
|
|
|
|
# makes sure the internal_name doesn't change if there are submitted forms
|
|
formdef.data_class()().store()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
assert FormDef.get(formdef.id).name == 'baz'
|
|
assert FormDef.get(formdef.id).internal_identifier == 'bar' # didn't change
|
|
|
|
def test_substitution_variables(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.store()
|
|
|
|
from wcs.workflows import WorkflowVariablesFieldsFormDef
|
|
wf = Workflow(name='variables')
|
|
wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
|
|
wf.variables_formdef.fields = [
|
|
StringField(id='1', label='Test', type='string', varname='foo'),
|
|
ItemField(id='2', label='Test Liste', type='item', varname='bar'),
|
|
]
|
|
wf.store()
|
|
formdef.workflow_id = wf.id
|
|
|
|
assert 'form_name' in formdef.get_substitution_variables()
|
|
assert formdef.get_substitution_variables()['form_name'] == 'foo'
|
|
formdef.workflow_options = {'foo': 'bar'}
|
|
assert 'form_option_foo' in formdef.get_substitution_variables()
|
|
assert formdef.get_substitution_variables()['form_option_foo'] == 'bar'
|
|
|
|
formdef.workflow_options = {'bar': 'bar', 'bar_display': 'Bar'}
|
|
assert formdef.get_substitution_variables()['form_option_bar'] == 'Bar'
|
|
assert formdef.get_substitution_variables()['form_option_bar_raw'] == 'bar'
|
|
|
|
def test_urls(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.store()
|
|
assert formdef.get_url() == 'http://example.net/foo/'
|
|
assert formdef.get_url(backoffice=True) == 'http://example.net/backoffice/management/foo/'
|
|
del pub.cfg['misc']['frontoffice-url']
|
|
del pub.cfg['misc']['backoffice-url']
|
|
assert formdef.get_url() == 'https://example.net/foo/'
|
|
assert formdef.get_url(backoffice=True) == 'https://example.net/backoffice/management/foo/'
|
|
|
|
def test_schema_with_date_variable(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.store()
|
|
|
|
from wcs.workflows import WorkflowVariablesFieldsFormDef
|
|
wf = Workflow(name='variables')
|
|
wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
|
|
wf.variables_formdef.fields.append(
|
|
DateField(label='Test', type='date', varname='foo'))
|
|
wf.store()
|
|
formdef.workflow_id = wf.id
|
|
formdef.workflow_options = {'foo': datetime.datetime(2016, 4, 2).timetuple()}
|
|
assert json.loads(formdef.export_to_json())['options']['foo'].startswith('2016-04-02')
|
|
|
|
def test_substitution_variables_object(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
assert 'form_objects' in formdef.get_substitution_variables()
|
|
substs = formdef.get_substitution_variables().get('form_objects')
|
|
assert substs.count == 0
|
|
assert substs.count_status_1 == 0
|
|
|
|
d = formdef.data_class()()
|
|
d.status = 'wf-1'
|
|
d.store()
|
|
assert substs.count == 1
|
|
assert substs.count_status_1 == 1
|
|
|
|
with pytest.raises(AttributeError):
|
|
assert substs.foobar
|
|
|
|
assert substs.formdef is formdef
|
|
|
|
def test_file_field_migration(pub):
|
|
pub.cfg['filetypes'] = {1:
|
|
{'mimetypes': [
|
|
'application/pdf',
|
|
'application/vnd.oasis.opendocument.text',
|
|
'application/msword',
|
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
|
'application/vnd.oasis.opendocument.spreadsheet',
|
|
'application/vnd.ms-excel',
|
|
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'],
|
|
'label': 'Documents'}}
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.fields = [
|
|
FileField(type='file', id='1', label='images & docs'),
|
|
FileField(type='file', id='2', label='images')]
|
|
formdef.fields[0].__dict__['file_type'] = ['image/*', 'application/pdf,application/vnd.oasis.opendocument.text,application/msword,application/vnd.openxmlformats-officedocument.wordprocessingml.document,application/vnd.oasis.opendocument.spreadsheet,application/vnd.ms-excel,application/vnd.openxmlformats-officedocument.spreadsheetml.sheet']
|
|
formdef.fields[1].__dict__['file_type'] = ['image/*']
|
|
formdef.store()
|
|
formdef = FormDef.get(1)
|
|
assert 'file_type' not in formdef.fields[0].__dict__
|
|
assert formdef.fields[0].document_type
|
|
assert formdef.fields[0].document_type['id'] == '_legacy'
|
|
assert formdef.fields[0].document_type['mimetypes'] == ['image/*', 'application/pdf,application/vnd.oasis.opendocument.text,application/msword,application/vnd.openxmlformats-officedocument.wordprocessingml.document,application/vnd.oasis.opendocument.spreadsheet,application/vnd.ms-excel,application/vnd.openxmlformats-officedocument.spreadsheetml.sheet']
|
|
assert formdef.fields[1].document_type['label'] == 'Image files'
|
|
assert formdef.fields[0].document_type['label'] == 'Image files, Documents'
|
|
|
|
def test_internal_identifier_migration(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
obj = pickle.load(open(formdef.get_object_filename(), 'rb'))
|
|
del obj.internal_identifier
|
|
pickle.dump(obj, open(formdef.get_object_filename(), 'wb'))
|
|
assert pickle.load(open(formdef.get_object_filename(), 'rb')).internal_identifier is None
|
|
assert FormDef.get(formdef.id, ignore_migration=True).internal_identifier is None
|
|
|
|
formdef = FormDef.get(formdef.id)
|
|
assert formdef.internal_identifier == 'foo'
|
|
|
|
def test_page_field_migration(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.fields = [
|
|
PageField(id='1', type='page',
|
|
condition='foo',
|
|
post_conditions=[{'condition': 'blah', 'error_message': 'blah'}]),
|
|
]
|
|
formdef.store()
|
|
formdef = FormDef.get(formdef.id)
|
|
assert formdef.fields[0].condition == {'type': 'python', 'value': 'foo'}
|
|
assert formdef.fields[0].post_conditions == [
|
|
{'condition': {'type': 'python', 'value': 'blah'},
|
|
'error_message': 'blah'}]
|
|
|
|
|
|
def test_unused_file_removal_job(pub):
|
|
from wcs.formdef import clean_unused_files
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'removal job'
|
|
formdef.fields = [
|
|
fields.FileField(id='5', label='file', varname='filefield'),
|
|
]
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {
|
|
'5': PicklableUpload('test.txt', 'text/plain'),
|
|
}
|
|
formdata.data['5'].receive([b'hello world'])
|
|
formdata.store()
|
|
|
|
assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
|
|
clean_unused_files(pub)
|
|
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
|
|
formdata.anonymise()
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1 # file is not removed
|
|
|
|
if not pub.site_options.has_section('options'):
|
|
pub.site_options.add_section('options')
|
|
|
|
for behaviour in ('move', 'remove'):
|
|
pub.site_options.set('options', 'unused-files-behaviour', behaviour)
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {
|
|
'5': PicklableUpload('test.txt', 'text/plain'),
|
|
}
|
|
formdata.data['5'].receive([b'hello world'])
|
|
formdata.store()
|
|
|
|
assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
|
|
clean_unused_files(pub)
|
|
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
|
|
formdata.anonymise()
|
|
clean_unused_files(pub)
|
|
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
|
|
|
|
for i in range(5):
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {
|
|
'5': PicklableUpload('test.txt', 'text/plain'),
|
|
}
|
|
formdata.data['5'].receive([b'hello world'])
|
|
formdata.store()
|
|
|
|
# same file, deduplicated
|
|
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
|
|
formdata.anonymise()
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
for formdata in formdef.data_class().select():
|
|
formdata.anonymise()
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
clean_unused_files(pub)
|
|
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
|
|
|
|
# file referenced in formdef option
|
|
workflow = Workflow(name='variables')
|
|
from wcs.workflows import WorkflowVariablesFieldsFormDef
|
|
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow)
|
|
workflow.variables_formdef.fields.append(fields.FileField(id='1', label='Test', type='file'))
|
|
workflow.add_status('Status1', 'st1')
|
|
workflow.store()
|
|
formdef.workflow = workflow
|
|
formdef.workflow_options = {'1': PicklableUpload('test.txt', 'text/plain')}
|
|
formdef.workflow_options['1'].receive([b'hello world'])
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {
|
|
'5': PicklableUpload('test.txt', 'text/plain'),
|
|
}
|
|
formdata.data['5'].receive([b'hello world'])
|
|
formdata.store()
|
|
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
clean_unused_files(pub)
|
|
formdata.remove_self()
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
|
|
formdef.workflow_options = {}
|
|
formdef.store()
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
|
|
|
|
# workflow attachment
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {}
|
|
formdata.store()
|
|
|
|
formdata.evolution[-1].parts = [AttachmentEvolutionPart('hello.txt',
|
|
fp=BytesIO(b'hello world'), varname='testfile')]
|
|
formdata.store()
|
|
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 1
|
|
clean_unused_files(pub)
|
|
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 1
|
|
formdata.anonymise()
|
|
clean_unused_files(pub)
|
|
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 0
|
|
|
|
# files in user profile
|
|
|
|
from wcs.admin.settings import UserFieldsFormDef
|
|
user_formdef = UserFieldsFormDef(pub)
|
|
user_formdef.fields.append(fields.FileField(id='3', label='test', type='file'))
|
|
user_formdef.store()
|
|
|
|
user = pub.user_class()
|
|
user.email = 'bar@localhost'
|
|
user.form_data = {'3': PicklableUpload('test.txt', 'text/plain')}
|
|
user.form_data['3'].receive([b'hello world 2'])
|
|
user.store()
|
|
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
user.remove_self()
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
|
|
|
|
if behaviour == 'move':
|
|
# 2 files ("hello world" + "hello world 2")
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'unused-files/uploads/'))) == 2
|
|
# 1 attachment
|
|
assert len(glob.glob(os.path.join(pub.app_dir, 'unused-files/attachments/*/*'))) == 1
|
|
|
|
def test_get_formdefs_of_all_kinds(pub):
|
|
from wcs.admin.settings import UserFieldsFormDef
|
|
from wcs.carddef import CardDef
|
|
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
|
|
from wcs.workflows import WorkflowBackofficeFieldsFormDef, WorkflowVariablesFieldsFormDef
|
|
|
|
FormDef.wipe()
|
|
Workflow.wipe()
|
|
formdefs = get_formdefs_of_all_kinds()
|
|
assert len(formdefs) == 1
|
|
assert formdefs[0].__class__ == UserFieldsFormDef
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'basic formdef'
|
|
formdef.store()
|
|
|
|
carddef = CardDef()
|
|
carddef.name = 'carddef'
|
|
carddef.store()
|
|
|
|
wf1 = Workflow(name='workflow with form fields formdef')
|
|
st1 = wf1.add_status('Status1', 'st1')
|
|
display_form1 = FormWorkflowStatusItem()
|
|
display_form1.formdef = WorkflowFormFieldsFormDef(item=display_form1)
|
|
display_form1.parent = st1
|
|
display_form2 = FormWorkflowStatusItem()
|
|
display_form2.parent = st1
|
|
st1.items.append(display_form1)
|
|
st1.items.append(display_form2) # empty formdef
|
|
wf1.store()
|
|
|
|
wf2 = Workflow(name='workflow with variables fields formdef')
|
|
wf2.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf2)
|
|
wf2.store()
|
|
|
|
wf3 = Workflow(name='workflow with backoffice fields formdef')
|
|
wf3.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf3)
|
|
wf3.store()
|
|
|
|
formdefs = get_formdefs_of_all_kinds()
|
|
assert len(formdefs) == 6
|
|
assert sorted([(f.name, f.__class__) for f in formdefs]) == [
|
|
('Backoffice fields of workflow "workflow with backoffice fields formdef"', WorkflowBackofficeFieldsFormDef),
|
|
('Form action in workflow "workflow with form fields formdef"', WorkflowFormFieldsFormDef),
|
|
('Options of workflow "workflow with variables fields formdef"', WorkflowVariablesFieldsFormDef),
|
|
('User Fields', UserFieldsFormDef),
|
|
('basic formdef', FormDef),
|
|
('carddef', CardDef),
|
|
]
|