testdef: add support for computed fields (#73330)

This commit is contained in:
Valentin Deniaud 2023-01-09 17:55:44 +01:00 committed by Gitea
parent 7ee68eefbe
commit 90f85518f8
2 changed files with 199 additions and 1 deletions

View File

@ -8,6 +8,7 @@ from wcs.formdef import FormDef
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.upload_storage import PicklableUpload
from wcs.testdef import TestDef, TestError
from wcs.wscalls import NamedWsCall
from .utilities import clean_temporary_pub, create_temporary_pub
@ -437,3 +438,174 @@ def test_validation_file_field(pub):
with pytest.raises(TestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Invalid value "hop.pdf" for field "File": forbidden file type.'
def test_computed_field_support(pub):
formdef = FormDef()
formdef.name = 'test title'
formdef.fields = [
fields.PageField(
id='0',
label='1st page',
type='page',
post_conditions=[
{
'condition': {
'type': 'django',
'value': 'form_var_foo == "xxx" and form_var_bar == "hop"',
},
'error_message': '',
}
],
),
fields.ComputedField(
id='1',
label='Computed (frozen)',
varname='foo',
value_template='{% firstof form_var_text "xxx" %}',
freeze_on_initial_value=True,
),
fields.ComputedField(
id='2', label='Computed (live)', varname='bar', value_template='{% firstof form_var_text "xxx" %}'
),
fields.StringField(id='3', label='Text', varname='text'),
]
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.receipt_time = datetime.datetime(2021, 1, 1, 0, 0).timetuple()
formdata.data['1'] = 'zzz'
formdata.data['3'] = 'hop'
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.run(formdef)
formdef.fields[1].value_template = '{% for %}'
testdef = TestDef.create_from_formdata(formdef, formdata)
with pytest.raises(TestError) as excinfo:
testdef.run(formdef)
assert (
str(excinfo.value)
== 'Page 1 post condition was not met (form_var_foo == "xxx" and form_var_bar == "hop").'
)
def test_computed_field_support_complex_data(pub):
formdef = FormDef()
formdef.name = 'test title'
formdef.fields = [
fields.PageField(
id='0',
label='1st page',
type='page',
post_conditions=[
{'condition': {'type': 'django', 'value': 'form_var_foo|length == 3'}, 'error_message': ''}
],
),
fields.ComputedField(
id='1',
label='Computed',
varname='foo',
value_template='{{form_objects|first|get:"form_var_items_raw"}}',
),
fields.ItemsField(id='2', label='Items', varname='items', required=False),
]
formdef.store()
submitted_formdata = formdef.data_class()()
submitted_formdata.just_created()
submitted_formdata.receipt_time = datetime.datetime(2021, 1, 1, 0, 0).timetuple()
submitted_formdata.data['2'] = ['a', 'bc']
submitted_formdata.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.receipt_time = datetime.datetime(2021, 1, 1, 0, 0).timetuple()
testdef = TestDef.create_from_formdata(formdef, formdata)
with pytest.raises(TestError) as excinfo:
testdef.run(formdef)
assert str(excinfo.value) == 'Page 1 post condition was not met (form_var_foo|length == 3).'
submitted_formdata.data['2'] = ['a', 'b', 'c']
submitted_formdata.store()
testdef.run(formdef)
def test_computed_field_support_webservice(pub, http_requests):
wscall = NamedWsCall()
wscall.name = 'Hello world'
wscall.request = {'url': 'http://remote.example.net/json'}
wscall.store()
formdef = FormDef()
formdef.name = 'test title'
formdef.fields = [
fields.PageField(
id='0',
label='1st page',
type='page',
post_conditions=[
{
'condition': {'type': 'django', 'value': 'form_var_computed_foo == "bar"'},
'error_message': '',
}
],
),
fields.ComputedField(
id='1',
label='Computed',
varname='computed',
value_template='{{ webservice.hello_world }}',
freeze_on_initial_value=True,
),
]
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.receipt_time = datetime.datetime(2021, 1, 1, 0, 0).timetuple()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.run(formdef)
def test_computed_field_value_too_long(pub):
formdef = FormDef()
formdef.name = 'test title'
formdef.fields = [
fields.PageField(
id='0',
label='1st page',
type='page',
post_conditions=[
{
'condition': {'type': 'django', 'value': 'not form_var_computed'},
'error_message': '',
}
],
),
fields.ComputedField(
id='1',
label='Computed',
varname='computed',
value_template='{% token_decimal length=9999 %}',
),
]
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.receipt_time = datetime.datetime(2021, 1, 1, 0, 0).timetuple()
testdef = TestDef.create_from_formdata(formdef, formdata)
with pytest.raises(TestError):
testdef.run(formdef)
formdef.fields[1].value_template = '{% token_decimal length=100000 %}'
formdef.store()
testdef = TestDef.create_from_formdata(formdef, formdata)
testdef.run(formdef)

View File

@ -24,6 +24,8 @@ from wcs import sql
from wcs.compat import CompatHTTPRequest
from wcs.fields import Field, PageField
from wcs.qommon.form import FileWithPreviewWidget, Form, get_selection_error_text
from wcs.qommon.template import TemplateError
from wcs.workflows import WorkflowStatusItem
from .qommon import _, misc
from .qommon.storage import Equal
@ -158,8 +160,9 @@ class TestDef(sql.TestDef):
self.evaluate_page_conditions(previous_page, formdata, objectdef)
def fill_page_fields(self, fields, page, formdata, objectdef):
self.handle_computed_fields(fields, formdata)
for field in fields:
if field.type in ('subtitle', 'title', 'comment', 'computed'):
if field.key in ('subtitle', 'title', 'comment', 'computed'):
continue
if not field.is_visible(formdata.data, objectdef):
@ -177,6 +180,8 @@ class TestDef(sql.TestDef):
get_publisher().substitutions.invalidate_cache()
self.handle_computed_fields(fields, formdata, exclude_frozen=True)
def evaluate_page_conditions(self, page, formdata, objectdef):
for post_condition in page.post_conditions or []:
condition = post_condition.get('condition', {})
@ -222,6 +227,27 @@ class TestDef(sql.TestDef):
}
)
def handle_computed_fields(self, fields, formdata, exclude_frozen=False):
for field in fields:
if field.key != 'computed':
continue
if exclude_frozen and field.freeze_on_initial_value:
continue
with get_publisher().complex_data():
try:
value = WorkflowStatusItem.compute(field.value_template, raises=True, allow_complex=True)
except TemplateError:
continue
else:
value = get_publisher().get_cached_complex_data(value)
if isinstance(value, str) and len(value) > 10000:
value = None
formdata.data[field.id] = value
get_publisher().substitutions.invalidate_cache()
def export_to_json(self):
return {
'name': self.name,