general: introduce a new "computed data" field (#52110)

This commit is contained in:
Frédéric Péters 2021-04-06 08:35:48 +02:00
parent 24383fab9b
commit f78d10fd8f
14 changed files with 604 additions and 56 deletions

View File

@ -2650,3 +2650,27 @@ def test_form_comment_with_error_in_wscall(http_requests, pub):
app = login(get_app(pub))
resp = app.get('/backoffice/forms/%s/' % formdef.id)
assert 'x [webservice.xxx.foobar] x' in resp.text
def test_form_new_computed_field(pub):
create_superuser(pub)
create_role(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.fields = []
formdef.store()
app = login(get_app(pub))
resp = app.get('/backoffice/forms/1/')
resp = resp.click(href='fields/')
resp.forms[0]['label'] = 'foobar'
resp.forms[0]['type'] = 'computed'
resp = resp.forms[0].submit().follow()
assert len(FormDef.get(1).fields) == 1
assert FormDef.get(1).fields[0].key == 'computed'
assert FormDef.get(1).fields[0].label == 'foobar'
assert FormDef.get(1).fields[0].varname == 'foobar'

View File

@ -0,0 +1,355 @@
import datetime
import pytest
from django.utils.timezone import make_aware
from wcs import fields
from wcs.formdef import FormDef
from wcs.wscalls import NamedWsCall
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
from .test_all import create_user
def pytest_generate_tests(metafunc):
if 'pub' in metafunc.fixturenames:
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True)
@pytest.fixture
def pub(request, emails):
pub = create_temporary_pub(
sql_mode=bool('sql' in request.param),
templates_mode=bool('templates' in request.param),
lazy_mode=bool('lazy' in request.param),
)
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
return pub
def teardown_module(module):
clean_temporary_pub()
def test_computed_field_simple(pub):
create_user(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.ComputedField(id='1', label='computed', varname='computed', value_template='{{ "xxx" }}'),
]
formdef.store()
formdef.data_class().wipe()
resp = login(get_app(pub), username='foo', password='foo').get('/test/')
resp = resp.forms[0].submit('submit') # -> validation
resp = resp.forms[0].submit('submit').follow() # -> submit
assert 'The form has been recorded' in resp.text
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.data == {'1': 'xxx'}
def test_computed_field_used_in_prefill(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.ComputedField(id='1', label='computed', varname='computed', value_template='xxx'),
fields.StringField(
id='2', label='string', prefill={'type': 'string', 'value': '{{ form_var_computed }}'}
),
]
formdef.store()
formdef.data_class().wipe()
resp = get_app(pub).get('/test/')
assert resp.forms[0]['f2'].value == 'xxx'
resp = resp.forms[0].submit('submit') # -> validation
resp = resp.forms[0].submit('submit').follow() # -> submit
assert 'The form has been recorded' in resp.text
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.data == {'1': 'xxx', '2': 'xxx'}
def test_computed_field_used_in_comment(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.ComputedField(id='1', label='computed', varname='computed', value_template='xxx'),
fields.CommentField(id='2', label='X{{ form_var_computed }}Y', type='comment'),
]
formdef.store()
formdef.data_class().wipe()
resp = get_app(pub).get('/test/')
assert 'XxxxY' in resp.text
resp = resp.forms[0].submit('submit') # -> validation
resp = resp.forms[0].submit('submit').follow() # -> submit
assert 'The form has been recorded' in resp.text
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.data == {'1': 'xxx'}
def test_computed_field_freeze(pub, freezer):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.ComputedField(
id='1',
label='computed',
varname='computed',
value_template='{% now "H:i" %}',
freeze_on_initial_value=False,
),
]
formdef.store()
formdef.data_class().wipe()
freezer.move_to(make_aware(datetime.datetime(2021, 4, 6, 10, 0)))
resp = get_app(pub).get('/test/')
freezer.move_to(make_aware(datetime.datetime(2021, 4, 6, 10, 5)))
resp = resp.forms[0].submit('submit') # -> validation
resp = resp.forms[0].submit('submit').follow() # -> submit
assert 'The form has been recorded' in resp.text
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.data == {'1': '10:05'}
formdef.data_class().wipe()
formdef.fields[0].freeze_on_initial_value = True
formdef.store()
freezer.move_to(make_aware(datetime.datetime(2021, 4, 6, 10, 0)))
resp = get_app(pub).get('/test/')
freezer.move_to(make_aware(datetime.datetime(2021, 4, 6, 10, 5)))
resp = resp.forms[0].submit('submit') # -> validation
resp = resp.forms[0].submit('submit').follow() # -> submit
assert 'The form has been recorded' in resp.text
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.data == {'1': '10:00'}
def test_computed_field_from_request_get(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.ComputedField(
id='1',
label='computed',
varname='computed',
value_template='{{ request.GET.param }}',
freeze_on_initial_value=True,
),
]
formdef.store()
formdef.data_class().wipe()
resp = get_app(pub).get('/test/?param=value')
resp = resp.forms[0].submit('submit') # -> validation
resp = resp.forms[0].submit('submit').follow() # -> submit
assert 'The form has been recorded' in resp.text
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.data == {'1': 'value'}
def test_computed_field_usage_in_post_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.PageField(
id='0',
label='1st page',
type='page',
post_conditions=[
{
'condition': {
'type': 'django',
'value': 'form_var_computed == "xxx"',
},
'error_message': 'You shall not pass.',
}
],
),
fields.ComputedField(
id='1',
label='computed',
varname='computed',
value_template='{{ request.GET.param }}',
freeze_on_initial_value=True,
),
]
formdef.store()
formdef.data_class().wipe()
resp = get_app(pub).get('/test/?param=test')
resp = resp.forms[0].submit('submit') # -> validation
assert 'You shall not pass.' in resp.text
resp = get_app(pub).get('/test/?param=xxx')
resp = resp.forms[0].submit('submit') # -> validation
resp = resp.forms[0].submit('submit').follow() # -> submit
assert 'The form has been recorded' in resp.text
def test_computed_field_usage_updated_in_post_condition(pub):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.PageField(
id='0',
label='1st page',
type='page',
post_conditions=[
{
'condition': {
'type': 'django',
'value': 'form_var_computed == "xxx"',
},
'error_message': 'You shall not pass.',
}
],
),
fields.ComputedField(
id='1',
label='computed',
varname='computed',
value_template='{{ form_var_field }}',
),
fields.StringField(id='2', label='string', varname='field'),
]
formdef.store()
formdef.data_class().wipe()
resp = get_app(pub).get('/test/')
resp.forms[0]['f2'].value = 'test'
resp = resp.forms[0].submit('submit') # -> validation
assert 'You shall not pass.' in resp.text
resp.forms[0]['f2'].value = 'xxx'
resp = resp.forms[0].submit('submit') # -> validation
resp = resp.forms[0].submit('submit').follow() # -> submit
assert 'The form has been recorded' in resp.text
def test_computed_field_recall_draft(pub):
create_user(pub)
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.ComputedField(
id='1',
label='computed',
varname='computed',
value_template='{{ request.GET.param }}',
freeze_on_initial_value=True,
),
]
formdef.store()
formdef.data_class().wipe()
resp = login(get_app(pub), username='foo', password='foo').get('/test/?param=value')
resp = resp.forms[0].submit('submit') # -> validation
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.is_draft()
# recall draft
resp = login(get_app(pub), username='foo', password='foo').get(formdata.get_url()).follow()
assert 'form-validation' in resp.text
resp = resp.forms[1].submit('submit').follow() # -> submit
assert 'The form has been recorded' in resp.text
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.data == {'1': 'value'}
# retry, moving back to first page
formdef.data_class().wipe()
resp = login(get_app(pub), username='foo', password='foo').get('/test/?param=value')
resp = resp.forms[0].submit('submit') # -> validation
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.is_draft()
resp = login(get_app(pub), username='foo', password='foo').get(formdata.get_url()).follow()
assert 'form-validation' in resp.text
resp = resp.forms[1].submit('previous') # -> first page
resp = resp.forms[1].submit('submit') # -> validation
resp = resp.forms[1].submit('submit').follow() # -> submit
assert 'The form has been recorded' in resp.text
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.data == {'1': 'value'}
def test_computed_field_complex_data(pub, http_requests):
FormDef.wipe()
wscall = NamedWsCall()
wscall.name = 'Hello world'
wscall.request = {'url': 'http://remote.example.net/json'}
wscall.store()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.ComputedField(
id='1',
label='computed',
varname='computed',
value_template='{{ webservice.hello_world }}',
freeze_on_initial_value=True,
),
fields.CommentField(id='2', label='X{{form_var_computed_foo}}Y', type='comment'),
]
formdef.store()
formdef.data_class().wipe()
resp = get_app(pub).get('/test/')
assert 'XbarY' in resp.text
resp = resp.forms[0].submit('submit') # -> validation
resp = resp.forms[0].submit('submit').follow() # -> submit
assert 'The form has been recorded' in resp.text
assert formdef.data_class().count() == 1
formdata = formdef.data_class().select()[0]
assert formdata.data['1'] == {'foo': 'bar'}
def test_computed_field_usage_in_live_data(pub, http_requests):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'test'
formdef.fields = [
fields.ComputedField(
id='1',
label='computed',
varname='computed',
value_template='{{ "xxx" }}',
),
fields.StringField(id='0', label='string', varname='string'),
fields.CommentField(id='2', label='X{{form_var_computed}}Y{{form_var_string}}Z', type='comment'),
]
formdef.store()
formdef.data_class().wipe()
app = get_app(pub)
resp = app.get('/test/')
assert 'XxxxYNoneZ' in resp.text
resp.form['f0'] = 'hello'
live_resp = app.post('/test/live', params=resp.form.submit_fields())
assert live_resp.json['result']['2']['content'] == '<p>XxxxYhelloZ</p>'

View File

@ -50,6 +50,9 @@ def test_add_to_form():
if klass is fields.PageField:
with pytest.raises(AttributeError):
klass(label='foo').add_to_form(form)
elif klass is fields.ComputedField:
# no ui
continue
else:
klass(label='foo').add_to_form(form)

View File

@ -46,7 +46,7 @@ class BlockDirectory(FieldsDirectory):
('history', 'snapshots_dir'),
]
field_def_page_class = BlockFieldDefPage
blacklisted_types = ['page', 'table', 'table-select', 'tablerows', 'ranked-items', 'blocks']
blacklisted_types = ['page', 'table', 'table-select', 'tablerows', 'ranked-items', 'blocks', 'computed']
support_import = False
readonly_message = N_('This block of fields is readonly.')

View File

@ -1141,7 +1141,7 @@ class FormDefPage(Directory):
on_page = 0
for i, field in enumerate(self.formdef.fields):
field.id = i
if hasattr(field, str('add_to_form')):
if getattr(field, 'add_to_form', None):
try:
field.add_to_form(form)
except Exception as e:

View File

@ -141,7 +141,7 @@ class UserFieldsDirectory(FieldsDirectory):
section = 'settings'
field_def_page_class = UserFieldDefPage
support_import = False
blacklisted_types = ['page']
blacklisted_types = ['page', 'computed']
field_var_prefix = '..._user_var_'
def index_bottom(self):

View File

@ -1026,7 +1026,7 @@ class WorkflowVariablesFieldsDirectory(FieldsDirectory):
section = 'workflows'
field_def_page_class = WorkflowVariablesFieldDefPage
support_import = False
blacklisted_types = ['page', 'blocks']
blacklisted_types = ['page', 'blocks', 'computed']
field_var_prefix = 'form_option_'
readonly_message = N_('This workflow is readonly.')
@ -1048,7 +1048,7 @@ class WorkflowBackofficeFieldsDirectory(FieldsDirectory):
section = 'workflows'
field_def_page_class = WorkflowBackofficeFieldDefPage
support_import = False
blacklisted_types = ['page', 'blocks']
blacklisted_types = ['page', 'blocks', 'computed']
blacklisted_attributes = ['condition']
field_var_prefix = 'form_var_'
readonly_message = N_('This workflow is readonly.')

View File

@ -3389,6 +3389,58 @@ class BlockField(WidgetField):
return node
class ComputedField(Field):
key = 'computed'
description = _('Computed Data')
value_template = None
freeze_on_initial_value = False
add_to_form = None
add_to_view_form = None
get_opendocument_node_value = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.varname:
self.varname = misc.simplify(self.label, space='_')
def get_admin_attributes(self):
attributes = super().get_admin_attributes()
attributes.remove('condition')
return attributes + ['varname', 'value_template', 'freeze_on_initial_value']
def fill_admin_form(self, form):
form.add(StringWidget, 'label', title=_('Label'), value=self.label, required=True, size=50)
form.add(
VarnameWidget,
'varname',
title=_('Identifier'),
required=True,
value=self.varname,
size=30,
hint=_('This is used as suffix for variable names.'),
)
form.add(
StringWidget,
'value_template',
title=_('Value Template'),
required=True,
size=80,
value=self.value_template,
validation_function=ComputedExpressionWidget.validate_template,
)
form.add(
CheckboxWidget,
'freeze_on_initial_value',
title=_('Freeze on initial value'),
value=self.freeze_on_initial_value,
)
register_field_class(ComputedField)
def get_field_class_by_type(type):
for k in field_classes:
if k.key == type:
@ -3407,6 +3459,8 @@ def get_field_types():
def get_field_options(blacklisted_types):
widgets, non_widgets = [], []
for klass in field_classes:
if klass is ComputedField:
continue
if klass.key in blacklisted_types:
continue
if issubclass(klass, WidgetField):
@ -3414,6 +3468,10 @@ def get_field_options(blacklisted_types):
else:
non_widgets.append((klass.key, _(klass.description), klass.key))
options = widgets + [('', '', '')] + non_widgets
# add computed field in its own "section"
options.extend([('', '', ''), (ComputedField.key, _(ComputedField.description), ComputedField.key)])
if get_publisher().has_site_option('fields-blocks') and (
not blacklisted_types or 'blocks' not in blacklisted_types
):

View File

@ -612,6 +612,20 @@ class FormDef(StorableObject):
)
return form
def get_computed_fields_from_page(self, page):
on_page = page is None
for field in self.fields:
if field.key == 'page':
if on_page:
break
if page.id == field.id:
on_page = True
continue
if not on_page:
continue
if field.key == 'computed':
yield field
def add_fields_to_form(
self,
form,
@ -641,6 +655,8 @@ class FormDef(StorableObject):
value = None
if form_data:
value = form_data.get(field.id)
if not field.add_to_form:
continue
widget = field.add_to_form(form, value)
widget.is_hidden = not (visible)
widget.field = field
@ -710,6 +726,8 @@ class FormDef(StorableObject):
for field in page['fields']:
value = dict.get(field.id)
if not field.add_to_view_form:
continue
if not field.include_in_validation_page:
form.widgets.append(HtmlWidget(htmltext('<div style="display: none;">')))
field.add_to_view_form(form, value)
@ -1297,7 +1315,14 @@ class FormDef(StorableObject):
data = formdata.data
for field in self.fields:
if isinstance(
field, (fields.SubtitleField, fields.TitleField, fields.CommentField, fields.PageField)
field,
(
fields.SubtitleField,
fields.TitleField,
fields.CommentField,
fields.PageField,
fields.ComputedField,
),
):
continue
if data is None:

View File

@ -310,6 +310,17 @@ class FormStatusPage(Directory, FormTemplateMixin):
form_data['draft_formdata_id'] = filled.id
form_data['page_no'] = filled.page_no
session.add_magictoken(magictoken, form_data)
# restore computed fields data
computed_data = {}
for field in self.formdef.fields:
if field.key != 'computed':
continue
if field.id in form_data:
computed_data[field.id] = form_data[field.id]
if computed_data:
session.add_magictoken('%s-computed' % magictoken, computed_data)
return redirect('../?mt=%s' % magictoken)
def get_workflow_form(self, user):

View File

@ -41,11 +41,12 @@ from wcs.qommon.admin.texts import TextsDirectory
from wcs.qommon.form import get_selection_error_text
from wcs.roles import logged_users_role
from wcs.variables import LazyFormDef
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowStatusItem
from ..qommon import N_, _, emails, errors, get_cfg, get_logger, misc, template
from ..qommon.admin.emails import EmailsDirectory
from ..qommon.form import CheckboxWidget, EmailWidget, Form, HiddenErrorWidget, HtmlWidget, StringWidget
from ..qommon.template import TemplateError
class SubmittedDraftException(Exception):
@ -320,6 +321,8 @@ class FormPage(Directory, FormTemplateMixin):
from wcs.blocks import BlockSubWidget
for field in fields:
if field.key == 'computed':
continue
field_key = '%s' % field.id
widget = form.get_widget('f%s' % field_key) if form else None
yield field, field_key, widget, None, None
@ -458,8 +461,25 @@ class FormPage(Directory, FormTemplateMixin):
session = get_session()
magictoken = get_request().form.get('magictoken')
if page and self.pages.index(page) > 0:
magictoken = get_request().form['magictoken']
self.feed_current_data(magictoken)
has_new_magictoken = False
if magictoken:
form_data = session.get_by_magictoken(magictoken, {})
else:
form_data = {}
if page == self.pages[0] and 'magictoken' not in get_request().form:
magictoken = randbytes(8)
has_new_magictoken = True
computed_data = self.handle_computed_fields(
magictoken, self.formdef.get_computed_fields_from_page(page)
)
if computed_data:
form_data.update(computed_data)
self.feed_current_data(magictoken)
with get_publisher().substitutions.temporary_feed(transient_formdata, force_mode='lazy'):
@ -478,18 +498,9 @@ class FormPage(Directory, FormTemplateMixin):
form.action = self.action_url
# include a data-has-draft attribute on the <form> element when a draft
# already exists for the form; this will activate the autosave.
magictoken = get_request().form.get('magictoken')
if magictoken:
form_data = session.get_by_magictoken(magictoken, {})
if self.has_draft_support():
form.attrs['data-has-draft'] = 'yes'
else:
form_data = {}
if not has_new_magictoken and self.has_draft_support():
form.attrs['data-has-draft'] = 'yes'
if page == self.pages[0] and 'magictoken' not in get_request().form:
magictoken = randbytes(8)
else:
magictoken = get_request().form['magictoken']
form.add_hidden('magictoken', magictoken)
data = session.get_by_magictoken(magictoken, {})
@ -530,7 +541,7 @@ class FormPage(Directory, FormTemplateMixin):
if had_prefill:
# include prefilled data
transient_formdata = self.get_transient_formdata()
transient_formdata = self.get_transient_formdata(magictoken)
transient_formdata.data.update(self.formdef.get_data(form))
if self.has_draft_support() and not (req.is_from_application() or req.is_from_bot()):
# save to get prefilling data in database
@ -620,6 +631,26 @@ class FormPage(Directory, FormTemplateMixin):
templates=list(self.get_formdef_template_variants(self.filling_templates)), context=context
)
def handle_computed_fields(self, magictoken, fields):
fields = [x for x in fields if x.key == 'computed' and x.value_template]
computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {})
if not fields:
return computed_values
if not computed_values:
get_session().add_magictoken('%s-computed' % magictoken, computed_values)
for field in fields:
if field.freeze_on_initial_value and field.id in computed_values:
continue
with get_publisher().complex_data():
try:
value = WorkflowStatusItem.compute(field.value_template, raises=True, allow_complex=True)
except TemplateError:
continue
else:
value = get_publisher().get_cached_complex_data(value)
computed_values[field.id] = value
return computed_values
def modify_filling_context(self, context, page, data):
pass
@ -687,6 +718,8 @@ class FormPage(Directory, FormTemplateMixin):
formdata.user = get_request().user
formdata.data = get_session().get_by_magictoken(magictoken, {})
formdata.prefilling_data = formdata.data.get('prefilling_data', {})
computed_values = get_session().get_by_magictoken('%s-computed' % magictoken) or {}
formdata.data.update(computed_values)
if self.edit_mode:
if magictoken is None:
@ -886,14 +919,17 @@ class FormPage(Directory, FormTemplateMixin):
if 'mt' in get_request().form:
magictoken = get_request().form['mt']
data = session.get_by_magictoken(magictoken, {})
computed_values = session.get_by_magictoken('%s-computed' % magictoken, {})
if not get_request().is_in_backoffice():
# don't remove magictoken as the backoffice agent may get
# the page reloaded.
session.remove_magictoken(magictoken)
if data:
if data or computed_values:
# create a new one since the other has been exposed in a url
magictoken = randbytes(8)
session.add_magictoken(magictoken, data)
session.add_magictoken(magictoken, data or {})
session.add_magictoken('%s-computed' % magictoken, computed_values)
get_request().form['magictoken'] = magictoken
self.feed_current_data(magictoken)
if 'page_no' in data and int(data['page_no']) != 0:
@ -985,10 +1021,12 @@ class FormPage(Directory, FormTemplateMixin):
# for templates referencing fields from the sampe page.
self.reset_locked_data(form)
data = self.formdef.get_data(form)
computed_data = self.handle_computed_fields(magictoken, submitted_fields)
form_data.update(data)
if self.has_draft_support() and form.get_submit() == 'savedraft':
form_data.update(computed_data)
filled = self.save_draft(form_data, page_no)
return redirect(filled.get_url())
@ -1005,6 +1043,7 @@ class FormPage(Directory, FormTemplateMixin):
form_data = copy.copy(session.get_by_magictoken(magictoken, {}))
data = self.formdef.get_data(form)
form_data.update(data)
form_data.update(computed_data)
for i, post_condition in enumerate(post_conditions):
condition = post_condition.get('condition')
error_message = post_condition.get('error_message')
@ -1042,6 +1081,7 @@ class FormPage(Directory, FormTemplateMixin):
with get_publisher().substitutions.temporary_feed(transient_formdata, force_mode='lazy'):
data = self.formdef.get_data(form)
form_data.update(data)
form_data.update(computed_data)
session.add_magictoken(magictoken, form_data)
@ -1363,6 +1403,9 @@ class FormPage(Directory, FormTemplateMixin):
filled.just_created()
filled.data = self.formdef.get_data(form)
magictoken = get_request().form['magictoken']
computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {})
filled.data.update(computed_values)
session = get_session()
filled.user = get_request().user

View File

@ -80,6 +80,8 @@ SQL_TYPE_MAPPING = {
'password': 'text[][]',
# field block
'block': 'jsonb',
# computed data field
'computed': 'jsonb',
}
@ -1694,6 +1696,11 @@ class SqlMixin:
if field.key in ('ranked-items', 'password'):
# turn {'poire': 2, 'abricot': 1, 'pomme': 3} into an array
value = [[force_str(x), force_str(y)] for x, y in value.items()]
elif field.key == 'computed':
if value is not None:
# embed value in a dict, so it's never necessary to cast the
# value for postgresql
value = {'data': value, '@type': 'computed-data'}
elif sql_type == 'varchar':
assert isinstance(value, str)
elif sql_type == 'date':
@ -1750,11 +1757,14 @@ class SqlMixin:
for fmt, val in value:
d[fmt] = force_str(val)
value = d
elif field.key == 'computed':
if isinstance(value, dict) and value.get('@type') == 'computed-data':
value = value.get('data')
if sql_type == 'date':
value = value.timetuple()
elif sql_type == 'bytea':
value = pickle_loads(value)
elif sql_type == 'jsonb' and value.get('schema'):
elif sql_type == 'jsonb' and isinstance(value, dict) and value.get('schema'):
# block field, adapt date/field values
for field_id, field_type in value.get('schema').items():
if field_type not in ('date', 'file'):

View File

@ -687,6 +687,7 @@ class LazyFormDataVar:
'file': LazyFieldVarFile,
'block': LazyFieldVarBlock,
'bool': LazyFieldVarBool,
'computed': LazyFieldVarComputed,
}.get(field.key, klass)
return klass(**self.get_field_kwargs(field))
@ -783,23 +784,10 @@ class LazyFieldVar:
raise AssertionError('lazy cannot be pickled')
class LazyFieldVarStructured(LazyFieldVar):
class LazyFieldVarComplex(LazyFieldVar):
def inspect_keys(self):
structured_value = self._field.get_structured_value(self._data)
if not structured_value:
if not self._data.get(self._field.id):
return []
return ['raw']
keys = ['raw', 'structured']
if self._field.data_source and self._field.data_source.get('type', '').startswith('carddef:'):
try:
self.live
except AttributeError:
# don't advertise "live" if linked data is missing
pass
else:
keys.append('live')
keys = []
structured_value = self.get_field_var_value()
def walk(base, value):
if isinstance(value, dict):
@ -817,6 +805,50 @@ class LazyFieldVarStructured(LazyFieldVar):
return keys
def __getitem__(self, key):
try:
return super().__getitem__(key)
except KeyError:
pass
structured_value = self.get_field_var_value()
if not structured_value:
raise KeyError(key)
if isinstance(structured_value, dict):
return structured_value[key]
if isinstance(structured_value, list):
for i, struct_value in enumerate(structured_value):
if str(key) == str(i):
return struct_value
raise KeyError(key)
class LazyFieldVarComputed(LazyFieldVarComplex):
def get_field_var_value(self):
return self.get_value()
class LazyFieldVarStructured(LazyFieldVarComplex):
def inspect_keys(self):
structured_value = self._field.get_structured_value(self._data)
if not structured_value:
if not self._data.get(self._field.id):
return []
return ['raw']
keys = ['raw', 'structured']
if self._field.data_source and self._field.data_source.get('type', '').startswith('carddef:'):
try:
self.live
except AttributeError:
# don't advertise "live" if linked data is missing
pass
else:
keys.append('live')
keys.extend(super().inspect_keys())
return keys
@property
def structured_raw(self):
# backward compatibility, _structured should be use.
@ -826,6 +858,9 @@ class LazyFieldVarStructured(LazyFieldVar):
def structured(self):
return self._field.get_structured_value(self._data)
def get_field_var_value(self):
return self.structured
@property
def live(self):
if not (self._field.data_source and self._field.data_source.get('type', '').startswith('carddef:')):
@ -857,22 +892,6 @@ class LazyFieldVarStructured(LazyFieldVar):
raise AttributeError('live')
return LazyFormData(carddata)
def __getitem__(self, key):
try:
return super().__getitem__(key)
except KeyError:
pass
structured_value = self._field.get_structured_value(self._data)
if not structured_value:
raise KeyError(key)
if isinstance(structured_value, dict):
return structured_value[key]
if isinstance(structured_value, list):
for i, struct_value in enumerate(structured_value):
if str(key) == str(i):
return struct_value
raise KeyError(key)
class DateOperatorsMixin:
def __eq__(self, other):

View File

@ -80,7 +80,7 @@ class WorkflowFormFieldDefPage(FieldDefPage):
class WorkflowFormFieldsDirectory(FieldsDirectory):
section = 'workflows'
support_import = False
blacklisted_types = ['page']
blacklisted_types = ['page', 'computed']
field_def_page_class = WorkflowFormFieldDefPage