wcs/wcs/testdef.py

306 lines
10 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2022 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import io
from contextlib import contextmanager
from django.core.handlers.wsgi import WSGIRequest
from quixote import get_publisher
from wcs import sql
from wcs.compat import CompatHTTPRequest
from wcs.fields import Field, PageField
from wcs.qommon.form import FileWithPreviewWidget, Form, get_selection_error_text
from wcs.qommon.template import TemplateError
from wcs.workflows import WorkflowStatusItem
from .qommon import _, misc
from .qommon.storage import Equal
class TestError(Exception):
pass
class TestDef(sql.TestDef):
_names = 'testdef'
name = ''
slug = None
object_type = None # (formdef, carddef, etc.)
object_id = None
data = None # (json export of formdata, carddata, etc.)
def __str__(self):
return self.name
def store(self, *args, comment=None, snapshot_store_user=True, **kwargs):
if not self.slug:
existing_slugs = {
x.slug
for x in self.select(
[Equal('object_type', self.object_type), Equal('object_id', self.object_id)]
)
}
base_slug = misc.simplify(self.name)
self.slug = base_slug
i = 2
while self.slug in existing_slugs:
self.slug = '%s-%s' % (base_slug, i)
i += 1
super().store(*args, **kwargs)
@classmethod
def create_from_formdata(cls, formdef, formdata):
testdef = cls()
testdef.object_type = formdef.get_table_name()
testdef.object_id = formdef.id
field_data = {}
for field in formdef.fields:
if field.id in formdata.data:
value = formdata.data[field.id]
if hasattr(field, 'get_json_value'):
value = field.get_json_value(value)
field_data[field.id] = value
testdef.data = {
'fields': field_data,
'user': formdata.user.get_json_export_dict() if formdata.user else None,
}
return testdef
def build_formdata(self, objectdef, include_fields=False):
formdata = objectdef.data_class()()
if self.data['user']:
formdata.set_user_from_json(self.data['user'])
if include_fields:
formdata.data = {
field.id: field.from_json_value(self.data['fields'][field.id])
for field in objectdef.fields
if field.id in self.data['fields']
}
return formdata
@contextmanager
def fake_request(self):
true_request = get_publisher().get_request()
wsgi_request = WSGIRequest({'REQUEST_METHOD': 'POST', 'wsgi.input': io.StringIO()})
fake_request = CompatHTTPRequest(wsgi_request)
try:
get_publisher()._set_request(fake_request)
yield
finally:
get_publisher()._set_request(true_request)
def run(self, objectdef):
with self.fake_request():
self._run(objectdef)
def _run(self, objectdef):
formdata = self.build_formdata(objectdef)
get_publisher().substitutions.reset()
get_publisher().substitutions.feed(get_publisher())
get_publisher().substitutions.feed(objectdef)
get_publisher().substitutions.feed(formdata)
self.form = Form(action='#')
fields = []
fields_by_page = {}
for field in objectdef.fields:
if field.type == 'page':
fields = fields_by_page[field] = []
continue
fields.append(field)
if not fields_by_page: # form without pages
fields_by_page[PageField()] = fields
previous_page = None
for i, (page, fields) in enumerate(fields_by_page.items(), 1):
page.index = i
if previous_page:
self.evaluate_page_conditions(previous_page, formdata, objectdef)
previous_page = page
if page and not page.is_visible(formdata.data, objectdef):
fields_with_data = [
field for field in fields if self.data['fields'].get(field.id) is not None
]
if fields_with_data:
raise TestError(
_('Tried to fill field "%(label)s" on page %(no)d but page was not shown.')
% {'label': fields_with_data[0].label, 'no': page.index}
)
self.fill_page_fields(fields, page, formdata, objectdef)
if previous_page: # evaluate last page post conditions
self.evaluate_page_conditions(previous_page, formdata, objectdef)
def fill_page_fields(self, fields, page, formdata, objectdef):
self.handle_computed_fields(fields, formdata)
for field in fields:
if field.key in (
'subtitle',
'title',
'comment',
'computed',
'table',
'table-select',
'tablerows',
'ranked-items',
):
continue
if not field.is_visible(formdata.data, objectdef):
if self.data['fields'].get(field.id) is not None:
raise TestError(
_('Tried to fill field "%(label)s" on page %(no)d but it is hidden.')
% {'label': field.label, 'no': page.index}
)
continue
value = self.data['fields'].get(field.id)
value = field.from_json_value(value)
self.run_widget_validation(field, value)
field.set_value(formdata.data, value)
get_publisher().substitutions.invalidate_cache()
self.handle_computed_fields(fields, formdata, exclude_frozen=True)
def evaluate_page_conditions(self, page, formdata, objectdef):
for post_condition in page.post_conditions or []:
condition = post_condition.get('condition', {})
try:
if not Field.evaluate_condition(formdata.data, objectdef, condition):
raise TestError(
_('Page %(no)d post condition was not met (%(condition)s).')
% {'no': page.index, 'condition': condition.get('value')}
)
except RuntimeError:
raise TestError(_('Failed to evaluate page %d post condition.') % page.index)
def run_widget_validation(self, field, value):
widget = field.add_to_form(self.form)
widget.set_value(value)
widget.transfer_form_value(get_publisher().get_request())
if isinstance(widget, FileWithPreviewWidget):
widget.get_value_from_token = False
widget.value = value
widget._parsed = False
widget.parse()
# set better error for item fields where an invalid choice is selected
if field.type == 'item' and value and widget.value != value:
widget.set_error(get_selection_error_text())
elif field.type == 'items' and value and widget.value != value:
value = ','.join(set(value) - set(widget.value))
widget.set_error(get_selection_error_text())
if widget.has_error():
field_label = _('"%s"') % field.label
if not widget.error:
widget = self.get_error_subwidget(widget)
value = widget.value
field = widget.field
field_label = _('"%(subfield)s" (of field %(field)s)') % {
'subfield': field.label,
'field': field_label,
}
if field.convert_value_to_str:
value = field.convert_value_to_str(value)
error_msg = _('Invalid value "%s"') % value if value else _('Empty value')
raise TestError(
_('%(error)s for field %(label)s: %(details)s.')
% {
'error': error_msg,
'label': field_label,
'details': widget.error,
}
)
def handle_computed_fields(self, fields, formdata, exclude_frozen=False):
for field in fields:
if field.key != 'computed':
continue
if exclude_frozen and field.freeze_on_initial_value:
continue
with get_publisher().complex_data():
try:
value = WorkflowStatusItem.compute(field.value_template, raises=True, allow_complex=True)
except TemplateError:
continue
else:
value = get_publisher().get_cached_complex_data(value)
if isinstance(value, str) and len(value) > 10000:
value = None
formdata.data[field.id] = value
get_publisher().substitutions.invalidate_cache()
def get_error_subwidget(self, widget):
for widget in widget.get_widgets():
if widget.error:
return widget
if hasattr(widget, 'get_widgets'):
widget = self.get_error_subwidget(widget)
if widget:
return widget
def export_to_json(self):
return {
'name': self.name,
'slug': self.slug,
'object_type': self.object_type,
'object_id': self.object_id,
'data': self.data,
}
@classmethod
def import_from_json(cls, data):
testdefs = TestDef.select(
[
Equal('object_type', data['object_type']),
Equal('object_id', data['object_id']),
Equal('slug', data['slug']),
]
)
testdef = testdefs[0] if testdefs else TestDef()
for k, v in data.items():
setattr(testdef, k, v)
testdef.store()
return testdef