649 lines
23 KiB
Python
649 lines
23 KiB
Python
import datetime
|
|
import glob
|
|
import io
|
|
import json
|
|
import os
|
|
import pickle
|
|
import time
|
|
|
|
import pytest
|
|
from django.utils.encoding import force_bytes
|
|
|
|
from wcs import fields
|
|
from wcs.admin.settings import UserFieldsFormDef
|
|
from wcs.blocks import BlockDef
|
|
from wcs.carddef import CardDef
|
|
from wcs.fields import DateField, FileField, ItemField, PageField, StringField
|
|
from wcs.formdef import FormDef, get_formdefs_of_all_kinds
|
|
from wcs.qommon.http_request import HTTPRequest
|
|
from wcs.qommon.upload_storage import PicklableUpload
|
|
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormEvolutionPart, WorkflowFormFieldsFormDef
|
|
from wcs.workflows import (
|
|
AttachmentEvolutionPart,
|
|
Workflow,
|
|
WorkflowBackofficeFieldsFormDef,
|
|
WorkflowVariablesFieldsFormDef,
|
|
)
|
|
|
|
from .utilities import clean_temporary_pub, create_temporary_pub
|
|
|
|
|
|
@pytest.fixture
|
|
def pub():
|
|
pub = create_temporary_pub()
|
|
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
|
|
pub.set_app_dir(req)
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
|
|
return pub
|
|
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
|
|
|
|
def test_is_disabled(pub):
|
|
formdef = FormDef()
|
|
assert not formdef.is_disabled()
|
|
|
|
formdef.disabled = True
|
|
assert formdef.is_disabled()
|
|
|
|
|
|
def test_is_disabled_publication_date(pub):
|
|
formdef = FormDef()
|
|
|
|
formdef.publication_date = (
|
|
'%s-%02d-%02d' % (datetime.datetime.today() - datetime.timedelta(1)).timetuple()[:3]
|
|
)
|
|
assert not formdef.is_disabled()
|
|
|
|
formdef.publication_date = (
|
|
'%s-%02d-%02d' % (datetime.datetime.today() + datetime.timedelta(1)).timetuple()[:3]
|
|
)
|
|
assert formdef.is_disabled()
|
|
|
|
|
|
def test_is_disabled_expiration_date(pub):
|
|
formdef = FormDef()
|
|
|
|
formdef.expiration_date = (
|
|
'%s-%02d-%02d' % (datetime.datetime.today() - datetime.timedelta(1)).timetuple()[:3]
|
|
)
|
|
assert formdef.is_disabled()
|
|
|
|
formdef.expiration_date = (
|
|
'%s-%02d-%02d' % (datetime.datetime.today() + datetime.timedelta(1)).timetuple()[:3]
|
|
)
|
|
assert not formdef.is_disabled()
|
|
|
|
|
|
def test_is_disabled_publication_datetime(pub):
|
|
formdef = FormDef()
|
|
|
|
formdef.publication_date = (
|
|
'%s-%02d-%02d %02d:%02d' % (datetime.datetime.now() - datetime.timedelta(hours=1)).timetuple()[:5]
|
|
)
|
|
assert not formdef.is_disabled()
|
|
|
|
formdef.publication_date = (
|
|
'%s-%02d-%02d %02d:%02d' % (datetime.datetime.now() + datetime.timedelta(hours=1)).timetuple()[:5]
|
|
)
|
|
assert formdef.is_disabled()
|
|
|
|
|
|
def test_is_disabled_expiration_datetime(pub):
|
|
formdef = FormDef()
|
|
|
|
formdef.expiration_date = (
|
|
'%s-%02d-%02d %02d:%02d' % (datetime.datetime.now() - datetime.timedelta(hours=1)).timetuple()[:5]
|
|
)
|
|
assert formdef.is_disabled()
|
|
|
|
formdef.expiration_date = (
|
|
'%s-%02d-%02d %02d:%02d' % (datetime.datetime.now() + datetime.timedelta(hours=1)).timetuple()[:5]
|
|
)
|
|
assert not formdef.is_disabled()
|
|
|
|
|
|
def test_title_change(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.store()
|
|
assert FormDef.get(formdef.id).name == 'foo'
|
|
assert FormDef.get(formdef.id).url_name == 'foo'
|
|
assert FormDef.get(formdef.id).internal_identifier == 'foo'
|
|
|
|
formdef.name = 'bar'
|
|
formdef.store()
|
|
assert FormDef.get(formdef.id).name == 'bar'
|
|
assert FormDef.get(formdef.id).url_name == 'foo'
|
|
assert FormDef.get(formdef.id).internal_identifier == 'bar'
|
|
|
|
# makes sure the internal_name doesn't change if there are submitted forms
|
|
formdef.data_class()().store()
|
|
formdef.name = 'baz'
|
|
formdef.store()
|
|
assert FormDef.get(formdef.id).name == 'baz'
|
|
assert FormDef.get(formdef.id).internal_identifier == 'bar' # didn't change
|
|
|
|
|
|
def test_overlong_slug(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'foo' + 'a' * 500
|
|
formdef.store()
|
|
|
|
|
|
def test_substitution_variables(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.store()
|
|
|
|
wf = Workflow(name='variables')
|
|
wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
|
|
wf.variables_formdef.fields = [
|
|
StringField(id='1', label='Test', type='string', varname='foo'),
|
|
ItemField(id='2', label='Test Liste', type='item', varname='bar'),
|
|
]
|
|
wf.store()
|
|
formdef.workflow_id = wf.id
|
|
|
|
assert 'form_name' in formdef.get_substitution_variables()
|
|
assert formdef.get_substitution_variables()['form_name'] == 'foo'
|
|
formdef.workflow_options = {'foo': 'bar'}
|
|
assert 'form_option_foo' in formdef.get_substitution_variables()
|
|
assert formdef.get_substitution_variables()['form_option_foo'] == 'bar'
|
|
|
|
formdef.workflow_options = {'bar': 'bar', 'bar_display': 'Bar'}
|
|
assert formdef.get_substitution_variables()['form_option_bar'] == 'Bar'
|
|
assert formdef.get_substitution_variables()['form_option_bar_raw'] == 'bar'
|
|
|
|
|
|
def test_urls(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.store()
|
|
assert formdef.get_url() == 'http://example.net/foo/'
|
|
assert formdef.get_url(backoffice=True) == 'http://example.net/backoffice/management/foo/'
|
|
del pub.cfg['misc']['frontoffice-url']
|
|
assert formdef.get_url() == 'https://example.net/foo/'
|
|
assert formdef.get_url(backoffice=True) == 'https://example.net/backoffice/management/foo/'
|
|
|
|
|
|
def test_schema_with_date_variable(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.store()
|
|
|
|
wf = Workflow(name='variables')
|
|
wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
|
|
wf.variables_formdef.fields.append(DateField(label='Test', type='date', varname='foo'))
|
|
wf.store()
|
|
formdef.workflow_id = wf.id
|
|
formdef.workflow_options = {'foo': datetime.datetime(2016, 4, 2).timetuple()}
|
|
assert json.loads(formdef.export_to_json())['options']['foo'].startswith('2016-04-02')
|
|
|
|
|
|
def test_substitution_variables_object(pub):
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
assert 'form_objects' in formdef.get_substitution_variables()
|
|
substs = formdef.get_substitution_variables().get('form_objects')
|
|
assert substs.count == 0
|
|
assert substs.count_status_1 == 0
|
|
|
|
d = formdef.data_class()()
|
|
d.status = 'wf-1'
|
|
d.store()
|
|
substs = formdef.get_substitution_variables().get('form_objects')
|
|
assert substs.count == 1
|
|
assert substs.count_status_1 == 1
|
|
|
|
with pytest.raises(AttributeError):
|
|
assert substs.foobar
|
|
|
|
assert substs.formdef is formdef
|
|
|
|
|
|
def test_file_field_migration(pub):
|
|
pub.cfg['filetypes'] = {
|
|
1: {
|
|
'mimetypes': [
|
|
'application/pdf',
|
|
'application/vnd.oasis.opendocument.text',
|
|
'application/msword',
|
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
|
'application/vnd.oasis.opendocument.spreadsheet',
|
|
'application/vnd.ms-excel',
|
|
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
],
|
|
'label': 'Documents',
|
|
}
|
|
}
|
|
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.fields = [
|
|
FileField(type='file', id='1', label='images & docs'),
|
|
FileField(type='file', id='2', label='images'),
|
|
]
|
|
formdef.fields[0].__dict__['file_type'] = [
|
|
'image/*',
|
|
(
|
|
'application/pdf,application/vnd.oasis.opendocument.text,application/msword,'
|
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document,'
|
|
'application/vnd.oasis.opendocument.spreadsheet,application/vnd.ms-excel,'
|
|
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
|
|
),
|
|
]
|
|
formdef.fields[1].__dict__['file_type'] = ['image/*']
|
|
formdef.store()
|
|
formdef = FormDef.get(1)
|
|
assert 'file_type' not in formdef.fields[0].__dict__
|
|
assert formdef.fields[0].document_type
|
|
assert formdef.fields[0].document_type['id'] == '_legacy'
|
|
assert formdef.fields[0].document_type['mimetypes'] == [
|
|
'image/*',
|
|
(
|
|
'application/pdf,application/vnd.oasis.opendocument.text,application/msword,'
|
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document,'
|
|
'application/vnd.oasis.opendocument.spreadsheet,application/vnd.ms-excel,'
|
|
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
|
|
),
|
|
]
|
|
assert formdef.fields[1].document_type['label'] == 'Image files'
|
|
assert formdef.fields[0].document_type['label'] == 'Image files, Documents'
|
|
|
|
|
|
def test_internal_identifier_migration(pub):
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.fields = []
|
|
formdef.store()
|
|
|
|
with open(formdef.get_object_filename(), 'rb') as fd:
|
|
obj = pickle.load(fd)
|
|
del obj.internal_identifier
|
|
with open(formdef.get_object_filename(), 'wb') as fd:
|
|
pickle.dump(obj, fd)
|
|
with open(formdef.get_object_filename(), 'rb') as fd:
|
|
assert pickle.load(fd).internal_identifier is None
|
|
assert FormDef.get(formdef.id, ignore_migration=True).internal_identifier is None
|
|
|
|
formdef = FormDef.get(formdef.id)
|
|
assert formdef.internal_identifier == 'foo'
|
|
|
|
|
|
def test_page_field_migration(pub):
|
|
# disable snapshots as XML serialization of old post_conditions structure
|
|
# is not possible.
|
|
pub.snapshot_class = None
|
|
FormDef.wipe()
|
|
formdef = FormDef()
|
|
formdef.name = 'foo'
|
|
formdef.fields = [
|
|
PageField(
|
|
id='1',
|
|
type='page',
|
|
condition='foo',
|
|
# old post_conditions structure
|
|
post_conditions=[{'condition': 'blah', 'error_message': 'blah'}],
|
|
),
|
|
]
|
|
formdef.store()
|
|
formdef = FormDef.get(formdef.id)
|
|
assert formdef.fields[0].condition == {'type': 'python', 'value': 'foo'}
|
|
assert formdef.fields[0].post_conditions == [
|
|
{'condition': {'type': 'python', 'value': 'blah'}, 'error_message': 'blah'}
|
|
]
|
|
|
|
|
|
def test_unused_file_removal_job(pub):
|
|
from wcs.formdef import clean_unused_files
|
|
|
|
BlockDef.wipe()
|
|
FormDef.wipe()
|
|
|
|
block = BlockDef()
|
|
block.name = 'foobar'
|
|
block.fields = [
|
|
fields.FileField(id='234', required=True, label='Test2', type='file'),
|
|
]
|
|
block.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'removal job'
|
|
formdef.fields = [
|
|
fields.FileField(id='5', label='file', varname='filefield'),
|
|
fields.BlockField(id='6', label='test', type='block:foobar', max_items=3),
|
|
]
|
|
formdef.store()
|
|
formdef.data_class().wipe()
|
|
|
|
if not pub.site_options.has_section('options'):
|
|
pub.site_options.add_section('options')
|
|
pub.site_options.add_section('storage-remote')
|
|
pub.site_options.set('storage-remote', 'label', 'remote')
|
|
pub.site_options.set('storage-remote', 'class', 'wcs.qommon.upload_storage.RemoteOpaqueUploadStorage')
|
|
pub.site_options.set('storage-remote', 'ws', 'https://crypto.example.net/')
|
|
|
|
for behaviour in (None, 'move', 'remove'):
|
|
if behaviour:
|
|
pub.site_options.set('options', 'unused-files-behaviour', behaviour)
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {
|
|
'5': PicklableUpload('test.txt', 'text/plain'),
|
|
}
|
|
formdata.data['5'].receive([b'hello world'])
|
|
formdata.store()
|
|
|
|
assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
|
|
clean_unused_files(pub)
|
|
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
|
|
formdata.anonymise()
|
|
clean_unused_files(pub)
|
|
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
|
|
|
|
for _ in range(5):
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {
|
|
'5': PicklableUpload('test.txt', 'text/plain'),
|
|
}
|
|
formdata.data['5'].receive([b'hello world'])
|
|
formdata.store()
|
|
|
|
# same file, deduplicated
|
|
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
|
|
formdata.anonymise()
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
for formdata in formdef.data_class().select():
|
|
formdata.anonymise()
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
clean_unused_files(pub)
|
|
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
|
|
|
|
# file referenced in formdef option
|
|
workflow = Workflow(name='variables')
|
|
|
|
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow)
|
|
workflow.variables_formdef.fields.append(fields.FileField(id='1', label='Test', type='file'))
|
|
workflow.add_status('Status1', 'st1')
|
|
workflow.store()
|
|
formdef.workflow = workflow
|
|
formdef.workflow_options = {'1': PicklableUpload('test.txt', 'text/plain')}
|
|
formdef.workflow_options['1'].receive([b'hello world'])
|
|
formdef.store()
|
|
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {
|
|
'5': PicklableUpload('test.txt', 'text/plain'),
|
|
}
|
|
formdata.data['5'].receive([b'hello world'])
|
|
formdata.store()
|
|
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
clean_unused_files(pub)
|
|
formdata.remove_self()
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
|
|
formdef.workflow_options = {}
|
|
formdef.store()
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
|
|
|
|
# file in block field
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {
|
|
'6': {
|
|
'data': [
|
|
{'234': PicklableUpload('test.txt', 'text/plain')},
|
|
{'234': PicklableUpload('test2.txt', 'text/plain')},
|
|
],
|
|
'schema': {'234': 'file'},
|
|
},
|
|
}
|
|
formdata.data['6']['data'][0]['234'].receive([b'hello world'])
|
|
formdata.data['6']['data'][1]['234'].receive([b'hello world block'])
|
|
formdata.workflow_data = {'wscall': {'data': ['not', 'a', 'block'], 'err': 0}}
|
|
formdata.store()
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 2
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 2
|
|
formdata.remove_self()
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
|
|
|
|
# non local storage: nothing happens
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {
|
|
'5': PicklableUpload('test.txt', 'text/plain'),
|
|
}
|
|
formdata.data['5'].receive([b'hello world'])
|
|
formdata.data['5'].storage = 'remote'
|
|
formdata.data['5'].storage_attrs = {'redirect_url': 'https://crypto.example.net/1234'}
|
|
formdata.store()
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
|
|
|
|
# workflow attachment
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {}
|
|
formdata.store()
|
|
|
|
formdata.evolution[-1].parts = [
|
|
AttachmentEvolutionPart('hello.txt', fp=io.BytesIO(b'hello world'), varname='testfile')
|
|
]
|
|
formdata.store()
|
|
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 1
|
|
clean_unused_files(pub)
|
|
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 1
|
|
formdata.anonymise()
|
|
clean_unused_files(pub)
|
|
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 0
|
|
|
|
# files in user profile
|
|
user_formdef = UserFieldsFormDef(pub)
|
|
user_formdef.fields.append(fields.FileField(id='3', label='test', type='file'))
|
|
user_formdef.store()
|
|
|
|
user = pub.user_class()
|
|
user.email = 'bar@localhost'
|
|
user.form_data = {'3': PicklableUpload('test.txt', 'text/plain')}
|
|
user.form_data['3'].receive([b'hello world 2'])
|
|
user.store()
|
|
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
user.remove_self()
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
|
|
|
|
# file from workflow form
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {}
|
|
formdata.store()
|
|
|
|
display_form = FormWorkflowStatusItem()
|
|
display_form.varname = 'blah'
|
|
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
|
display_form.formdef.fields = []
|
|
|
|
data = {'blah_1': PicklableUpload('test.txt', 'text/plain')}
|
|
data['blah_1'].receive([b'hello world wf form'])
|
|
formdata.evolution[-1].parts = [
|
|
WorkflowFormEvolutionPart(display_form, data),
|
|
]
|
|
|
|
count = len(os.listdir(os.path.join(pub.app_dir, 'uploads')))
|
|
formdata.store()
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == count + 1
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == count + 1
|
|
|
|
formdata.evolution[-1].parts = []
|
|
formdata.store()
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == count
|
|
|
|
if behaviour == 'move':
|
|
# 4 files ("hello world" + "hello world 2" + "hello world block" + "hello world wf form")
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'unused-files/uploads/'))) == 4
|
|
# 1 attachment
|
|
assert len(glob.glob(os.path.join(pub.app_dir, 'unused-files/attachments/*/*'))) == 1
|
|
|
|
# unknown unused-files-behaviour: do nothing
|
|
pub.site_options.set('options', 'unused-files-behaviour', 'foo')
|
|
formdata = formdef.data_class()()
|
|
formdata.just_created()
|
|
formdata.data = {
|
|
'5': PicklableUpload('test-no-remove.txt', 'text/plain'),
|
|
}
|
|
formdata.data['5'].receive([b'hello world'])
|
|
formdata.store()
|
|
|
|
assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
|
|
clean_unused_files(pub)
|
|
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
|
|
formdata.anonymise()
|
|
clean_unused_files(pub)
|
|
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1 # file is not removed
|
|
|
|
|
|
def test_get_formdefs_of_all_kinds(pub):
|
|
BlockDef.wipe()
|
|
FormDef.wipe()
|
|
Workflow.wipe()
|
|
formdefs = get_formdefs_of_all_kinds()
|
|
assert len(formdefs) == 1
|
|
assert formdefs[0].__class__ == UserFieldsFormDef
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'basic formdef'
|
|
formdef.store()
|
|
|
|
carddef = CardDef()
|
|
carddef.name = 'carddef'
|
|
carddef.store()
|
|
|
|
wf1 = Workflow(name='workflow with form fields formdef')
|
|
st1 = wf1.add_status('Status1', 'st1')
|
|
display_form1 = st1.add_action('form')
|
|
display_form1.formdef = WorkflowFormFieldsFormDef(item=display_form1)
|
|
st1.add_action('form') # empty formdef
|
|
wf1.store()
|
|
|
|
wf2 = Workflow(name='workflow with variables fields formdef')
|
|
wf2.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf2)
|
|
wf2.store()
|
|
|
|
wf3 = Workflow(name='workflow with backoffice fields formdef')
|
|
wf3.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf3)
|
|
wf3.store()
|
|
|
|
formdefs = get_formdefs_of_all_kinds()
|
|
assert len(formdefs) == 6
|
|
assert sorted((f.name, f.__class__) for f in formdefs) == [
|
|
(
|
|
'Backoffice fields of workflow "workflow with backoffice fields formdef"',
|
|
WorkflowBackofficeFieldsFormDef,
|
|
),
|
|
('Form action in workflow "workflow with form fields formdef"', WorkflowFormFieldsFormDef),
|
|
('Options of workflow "workflow with variables fields formdef"', WorkflowVariablesFieldsFormDef),
|
|
('User Fields', UserFieldsFormDef),
|
|
('basic formdef', FormDef),
|
|
('carddef', CardDef),
|
|
]
|
|
|
|
|
|
def test_pickle_2to3_conversion(pub):
|
|
FormDef.wipe()
|
|
Workflow.wipe()
|
|
|
|
workflow = Workflow(name='blah')
|
|
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
|
|
workflow.backoffice_fields_formdef.fields = [
|
|
fields.StringField(id='bo0', varname='foo_bovar', type='string', label='bo variable'),
|
|
]
|
|
status = workflow.add_status('Status1')
|
|
display_form = status.add_action('form', id='_display_form')
|
|
display_form.by = []
|
|
display_form.varname = 'blah'
|
|
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
|
|
display_form.formdef.fields.append(
|
|
fields.StringField(id='1', label='Test', varname='str', type='string', required=True)
|
|
)
|
|
workflow.store()
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'basic formdef'
|
|
formdef.workflow_id = workflow.id
|
|
formdef.workflow_options = {'bo0': 'whatever'}
|
|
formdef.workflow_roles = {'_receiver': '1'}
|
|
formdef.fields = [
|
|
StringField(id='1', label='Test', type='string', varname='foo'),
|
|
]
|
|
formdef.store()
|
|
|
|
formdef_id = formdef.id
|
|
formdef_filename = os.path.join(formdef.get_objects_dir(), str(formdef.id))
|
|
workflow_filename = os.path.join(workflow.get_objects_dir(), str(workflow.id))
|
|
|
|
# turn pickle to bytes
|
|
|
|
def deep_str2bytes(obj, seen=None):
|
|
# reverse deep_bytes2str
|
|
if seen is None:
|
|
seen = {}
|
|
if obj is None or isinstance(obj, (int, float, bytes, time.struct_time, type(Ellipsis))):
|
|
return obj
|
|
if id(obj) in seen:
|
|
return obj
|
|
if isinstance(obj, str):
|
|
return force_bytes(obj)
|
|
seen[id(obj)] = True
|
|
if isinstance(obj, dict):
|
|
new_d = {}
|
|
for k, v in obj.items():
|
|
new_d[force_bytes(k)] = deep_str2bytes(v, seen)
|
|
return new_d
|
|
if isinstance(obj, list):
|
|
return [deep_str2bytes(x, seen) for x in obj]
|
|
if hasattr(obj, '__class__') and obj.__class__.__module__.startswith(('wcs.', 'qommon.', 'modules.')):
|
|
obj.__dict__ = deep_str2bytes(obj.__dict__, seen)
|
|
return obj
|
|
return obj
|
|
|
|
formdef.__dict__ = deep_str2bytes(formdef.__dict__)
|
|
with open(formdef_filename, 'wb') as fd:
|
|
pickle.dump(formdef, fd, protocol=2)
|
|
|
|
workflow.__dict__ = deep_str2bytes(workflow.__dict__)
|
|
with open(workflow_filename, 'wb') as fd:
|
|
pickle.dump(workflow, fd, protocol=2)
|
|
|
|
formdef = FormDef.get(formdef_id)
|
|
assert formdef.fields[0].label == 'Test'
|
|
assert formdef.workflow.possible_status[0].items[0].varname == 'blah'
|
|
assert formdef.workflow.possible_status[0].items[0].formdef.fields[0].varname == 'str'
|