wcs/wcs/blocks.py

630 lines
23 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2020 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import collections
import itertools
import types
import uuid
import xml.etree.ElementTree as ET
from contextlib import contextmanager
from quixote import get_publisher, get_request, get_response
from quixote.html import htmltag, htmltext
from . import conditions, data_sources, fields
from .categories import BlockCategory
from .formdata import FormData
from .qommon import _, misc
from .qommon.errors import UnknownReferencedErrorMixin
from .qommon.form import CompositeWidget, SingleSelectHintWidget, WidgetList
from .qommon.storage import Equal, StorableObject
from .qommon.substitution import CompatibilityNamesDict
from .qommon.template import Template
from .qommon.xml_storage import PostConditionsXmlMixin
class BlockdefImportError(Exception):
def __init__(self, msg, msg_args=None, details=None):
self.msg = msg
self.msg_args = msg_args or ()
self.details = details
class BlockdefImportUnknownReferencedError(UnknownReferencedErrorMixin, BlockdefImportError):
pass
class BlockDef(StorableObject, PostConditionsXmlMixin):
_names = 'blockdefs'
_indexes = ['slug']
backoffice_class = 'wcs.admin.blocks.BlockDirectory'
xml_root_node = 'block'
verbose_name = _('Block of fields')
verbose_name_plural = _('Blocks of fields')
var_prefixes = ['block']
name = None
slug = None
fields = None
digest_template = None
category_id = None
documentation = None
post_conditions = None
SLUG_DASH = '_'
# declarations for serialization
TEXT_ATTRIBUTES = ['name', 'slug', 'digest_template', 'documentation']
def __init__(self, name=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.fields = []
@property
def category(self):
return BlockCategory.get(self.category_id, ignore_errors=True)
@category.setter
def category(self, category):
if category:
self.category_id = category.id
elif self.category_id:
self.category_id = None
def migrate(self):
changed = False
for f in self.fields or []:
changed |= f.migrate()
if changed:
self.store(comment=_('Automatic update'), snapshot_store_user=False)
def store(self, comment=None, snapshot_store_user=True, application=None, *args, **kwargs):
from wcs.carddef import CardDef
from wcs.formdef import FormDef
assert not self.is_readonly()
if self.slug is None:
# set slug if it's not yet there
self.slug = self.get_new_slug()
super().store(*args, **kwargs)
if get_publisher().snapshot_class:
get_publisher().snapshot_class.snap(
instance=self, comment=comment, store_user=snapshot_store_user, application=application
)
# update relations
for objdef in itertools.chain(
FormDef.select(ignore_errors=True, ignore_migration=True),
CardDef.select(ignore_errors=True, ignore_migration=True),
):
for field in objdef.get_all_fields():
if field.key == 'block' and field.block_slug == self.slug:
objdef.store()
if get_response():
from wcs.admin.tests import TestsAfterJob
context = _('in field block "%s"') % field.label
get_response().add_after_job(
TestsAfterJob(objdef, reason='%s (%s)' % (comment, context))
)
break
def get_new_field_id(self):
return 'bf%s' % str(uuid.uuid4())
def get_admin_url(self):
base_url = get_publisher().get_backoffice_url()
return '%s/forms/blocks/%s/' % (base_url, self.id)
def get_field_admin_url(self, field):
return self.get_admin_url() + '%s/' % field.id
def get_widget_fields(self):
return [field for field in self.fields or [] if isinstance(field, fields.WidgetField)]
def get_display_value(self, value):
if not self.digest_template:
return self.name
from .variables import LazyBlockDataVar
context = CompatibilityNamesDict({'block_var': LazyBlockDataVar(self.fields, value)})
# for backward compatibility it is also possible to use <slug>_var_<whatever>
context[self.slug.replace('-', '_') + '_var'] = context['block_var']
return Template(self.digest_template, autoescape=False).render(context)
def get_substitution_counter_variables(self, index):
return CompatibilityNamesDict(
{
'block_counter': {
'index0': index,
'index': index + 1,
}
}
)
def get_dependencies(self):
yield self.category
for field in self.fields or []:
yield from field.get_dependencies()
def export_to_xml(self, include_id=False):
root = ET.Element(self.xml_root_node)
if include_id and self.id:
root.attrib['id'] = str(self.id)
for text_attribute in list(self.TEXT_ATTRIBUTES):
if not hasattr(self, text_attribute) or not getattr(self, text_attribute):
continue
ET.SubElement(root, text_attribute).text = getattr(self, text_attribute)
BlockCategory.object_category_xml_export(self, root, include_id=include_id)
fields = ET.SubElement(root, 'fields')
for field in self.fields or []:
fields.append(field.export_to_xml(include_id=True))
self.post_conditions_export_to_xml(root, include_id=include_id)
return root
@classmethod
def import_from_xml(cls, fd, include_id=False, check_datasources=True, check_deprecated=False):
try:
tree = ET.parse(fd)
except Exception:
raise ValueError()
blockdef = cls.import_from_xml_tree(
tree,
include_id=include_id,
check_datasources=check_datasources,
check_deprecated=check_deprecated,
)
if blockdef.slug:
try:
cls.get_on_index(blockdef.slug, 'slug', ignore_migration=True)
except KeyError:
pass
else:
blockdef.slug = blockdef.get_new_slug()
return blockdef
@classmethod
def import_from_xml_tree(
cls, tree, include_id=False, check_datasources=True, check_deprecated=False, **kwargs
):
from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan
blockdef = cls()
if tree.find('name') is None or not tree.find('name').text:
raise BlockdefImportError(_('Missing name'))
# if the tree we get is actually a ElementTree for real, we get its
# root element and go on happily.
if not ET.iselement(tree):
tree = tree.getroot()
if tree.tag != cls.xml_root_node:
raise BlockdefImportError(
_('Provided XML file is invalid, it starts with a <%(seen)s> tag instead of <%(expected)s>')
% {'seen': tree.tag, 'expected': cls.xml_root_node}
)
if include_id and tree.attrib.get('id'):
blockdef.id = tree.attrib.get('id')
for text_attribute in list(cls.TEXT_ATTRIBUTES):
value = tree.find(text_attribute)
if value is None or value.text is None:
continue
setattr(blockdef, text_attribute, misc.xml_node_text(value))
from wcs.fields.base import get_field_class_by_type
blockdef.fields = []
unknown_field_types = set()
for field in tree.find('fields'):
try:
field_o = get_field_class_by_type(field.findtext('type'))()
except KeyError:
field_type = field.findtext('type')
unknown_field_types.add(field_type)
continue
field_o.init_with_xml(field, include_id=True)
blockdef.fields.append(field_o)
BlockCategory.object_category_xml_import(blockdef, tree, include_id=include_id)
post_conditions_node = tree.find('post_conditions')
blockdef.post_conditions_init_with_xml(post_conditions_node, include_id=include_id)
unknown_datasources = set()
if check_datasources:
from wcs.carddef import CardDef
# check if datasources are defined
for field in blockdef.fields:
data_source = getattr(field, 'data_source', None)
if data_source:
data_source_id = data_source.get('type')
if isinstance(data_sources.get_object(data_source), data_sources.StubNamedDataSource):
unknown_datasources.add(data_source_id)
elif data_source_id and data_source_id.startswith('carddef:'):
parts = data_source_id.split(':')
# check if carddef exists
url_name = parts[1]
try:
CardDef.get_by_urlname(url_name)
except KeyError:
unknown_datasources.add(data_source_id)
continue
if len(parts) == 2 or parts[2] == '_with_user_filter':
continue
lookup_criterias = [
Equal('formdef_type', 'carddef'),
Equal('visibility', 'datasource'),
Equal('slug', parts[2]),
]
try:
get_publisher().custom_view_class.select(lookup_criterias)[0]
except IndexError:
unknown_datasources.add(data_source_id)
if unknown_field_types or unknown_datasources:
details = collections.defaultdict(set)
if unknown_field_types:
details[_('Unknown field types')].update(unknown_field_types)
if unknown_datasources:
details[_('Unknown datasources')].update(unknown_datasources)
raise BlockdefImportUnknownReferencedError(_('Unknown referenced objects'), details=details)
if check_deprecated:
# check for deprecated elements
job = DeprecationsScan()
try:
job.check_deprecated_elements_in_object(blockdef)
except DeprecatedElementsDetected as e:
raise BlockdefImportError(str(e))
return blockdef
def get_usage_fields(self):
from wcs.formdef import get_formdefs_of_all_kinds
for formdef in get_formdefs_of_all_kinds():
for field in formdef.fields:
if field.key == 'block' and field.block_slug == self.slug:
field.formdef = formdef
yield field
def get_usage_formdefs(self):
from wcs.formdef import get_formdefs_of_all_kinds
for formdef in get_formdefs_of_all_kinds():
for field in formdef.fields:
if field.key == 'block' and field.block_slug == self.slug:
yield formdef
break
def is_used(self):
return any(self.get_usage_formdefs())
@contextmanager
def visibility_context(self, value, row_index):
from .variables import LazyBlockDataVar
context = self.get_substitution_counter_variables(row_index)
context['block_var'] = LazyBlockDataVar(self.fields, value)
with get_publisher().substitutions.temporary_feed(context):
yield
def i18n_scan(self):
location = 'forms/blocks/%s/' % self.id
for field in self.fields or []:
yield from field.i18n_scan(base_location=location)
for post_condition in self.post_conditions or []:
yield location, None, post_condition.get('error_message')
def get_all_fields(self):
return self.fields
def data_class(self):
return types.ClassType('fake_formdata', (FormData,), {'_formdef': self})
class BlockSubWidget(CompositeWidget):
template_name = 'qommon/forms/widgets/block_sub.html'
def __init__(self, name, value=None, *args, **kwargs):
self.block = kwargs.pop('block')
self.readonly = kwargs.get('readonly')
self.remove_button = kwargs.pop('remove_button', False)
self.index = kwargs.pop('index', 0)
super().__init__(name, value, *args, **kwargs)
def add_to_form(field):
if 'readonly' in kwargs:
field_value = None
if value is not None:
field_value = value.get(field.id)
return field.add_to_view_form(form=self, value=field_value)
else:
widget = field.add_to_form(form=self)
if field.key in ['title', 'subtitle', 'comment']:
return widget
widget = self.get_widget('f%s' % field.id)
if widget:
widget.div_id = None
widget.prefill_attributes = field.get_prefill_attributes()
return widget
self.fields = {}
live_sources = []
live_condition_fields = {}
for field in self.block.fields:
context = self.block.get_substitution_counter_variables(self.index)
if field.key in ['title', 'subtitle', 'comment']:
with get_publisher().substitutions.temporary_feed(context):
widget = add_to_form(field)
else:
widget = add_to_form(field)
if field.condition:
varnames = field.get_condition_varnames(formdef=self.block)
live_sources.extend(varnames)
for varname in varnames:
if varname not in live_condition_fields:
live_condition_fields[varname] = []
live_condition_fields[varname].append(field)
field.widget = widget
self.fields[field.id] = widget
for field in self.block.fields:
if field.varname in live_sources:
field.widget.live_condition_source = True
field.widget.live_condition_fields = live_condition_fields[field.varname]
if value:
self.set_value(value)
self.set_visibility(value)
def set_visibility(self, value):
with self.block.visibility_context(value, self.index):
for field in self.block.fields:
widget = self.fields.get(field.id)
if not widget:
continue
visible = field.is_visible({}, formdef=None)
widget.is_hidden = not (visible)
def set_value(self, value):
self.value = value
for widget in self.get_widgets():
if hasattr(widget, 'set_value') and not getattr(widget, 'secondary', False):
widget.set_value(value.get(widget.field.id))
def get_field_data(self, field, widget):
from wcs.formdef import FormDef
return FormDef.get_field_data(field, widget)
def _parse(self, request):
value = {}
empty = True
all_lists = True
for widget in self.get_widgets():
if widget.field.key in ['title', 'subtitle', 'comment']:
continue
widget_value = self.get_field_data(widget.field, widget)
with self.block.visibility_context(value, self.index):
if not widget.field.is_visible({}, formdef=None):
widget.clear_error()
continue
value.update(widget_value)
empty_values = [None]
if (
widget.field.key == 'item'
and isinstance(widget, SingleSelectHintWidget)
and widget.separate_hint()
):
# <select> will have its first option automatically selected,
# do not consider it to mark the field as filled.
empty_values.append(widget.options[0][0])
else:
all_lists = False
if widget_value.get(widget.field.id) not in empty_values:
empty = False
if not empty and self.block.post_conditions:
error_messages = []
with self.block.visibility_context(value, self.index):
for i, post_condition in enumerate(self.block.post_conditions):
condition = post_condition.get('condition')
try:
if conditions.Condition(condition, record_errors=False).evaluate():
continue
except RuntimeError:
pass
error_message = post_condition.get('error_message')
error_messages.append(get_publisher().translate(error_message))
if error_messages:
self.set_error(' '.join(error_messages))
if empty and not all_lists and not get_publisher().keep_all_block_rows_mode:
value = None
for widget in self.get_widgets(): # reset "required" errors
if widget.error == self.REQUIRED_ERROR:
widget.clear_error()
self.value = value
def add_media(self):
for widget in self.get_widgets():
if hasattr(widget, 'add_media'):
widget.add_media()
class BlockWidget(WidgetList):
template_name = 'qommon/forms/widgets/block.html'
always_include_add_button = True
def __init__(
self,
name,
value=None,
title=None,
block=None,
default_items_count=None,
max_items=None,
add_element_label=None,
**kwargs,
):
self.block = block
self.readonly = kwargs.get('readonly')
self.label_display = kwargs.pop('label_display') or 'normal'
self.remove_button = kwargs.pop('remove_button', False)
element_values = None
if value:
element_values = value.get('data')
max_items = max_items or 1
default_items_count = min(default_items_count or 1, max_items)
hint = kwargs.pop('hint', None)
element_kwargs = {'block': self.block, 'render_br': False, 'remove_button': self.remove_button}
element_kwargs.update(kwargs)
super().__init__(
name,
value=element_values,
title=title,
default_items_count=default_items_count,
max_items=max_items,
element_type=BlockSubWidget,
element_kwargs=element_kwargs,
add_element_label=add_element_label or _('Add another'),
hint=hint,
**kwargs,
)
@property
def a11y_labelledby(self):
return bool(self.a11y_role)
@property
def a11y_role(self):
# don't mark block as a group if it has no label
if self.label_display != 'hidden':
return 'group'
return None
def set_value(self, value):
from .fields.block import BlockRowValue
if isinstance(value, BlockRowValue):
value = value.make_value(block=self.block, field=self.field, data={})
if isinstance(value, dict) and 'data' in value:
super().set_value(value['data'])
self.value = value
else:
self.value = None
def _parse(self, request):
# iterate over existing form keys to get actual list of elements.
# (maybe this could be moved to WidgetList)
prefix = '%s$element' % self.name
known_prefixes = {x.split('$', 2)[1] for x in request.form.keys() if x.startswith(prefix)}
for prefix in known_prefixes:
if prefix not in self.element_names:
self.add_element(element_name=prefix)
super()._parse(request)
if self.value:
self.value = {'data': self.value}
# keep "schema" next to data, this allows custom behaviour for
# date fields (time.struct_time) when writing/reading from
# database in JSON.
self.value['schema'] = {x.id: x.key for x in self.block.fields}
def unparse(self):
self._parsed = False
for widget in self.widgets:
widget._parsed = False
def parse(self, request=None):
if not self._parsed:
self._parsed = True
if request is None:
request = get_request()
self._parse(request)
if self.required and self.value is None:
self.set_error(_(self.REQUIRED_ERROR))
for widget in self.widgets:
# mark required rows with a special attribute, to avoid doubling the
# error messages in the template.
widget.is_required_error = bool(widget.error == self.REQUIRED_ERROR)
return self.value
def add_media(self):
for widget in self.get_widgets():
if hasattr(widget, 'add_media'):
widget.add_media()
def get_error(self, request=None):
request = request or get_request()
if request.get_method() == 'POST':
self.parse(request=request)
return self.error
def has_error(self, request=None):
if self.get_error():
return True
# we know subwidgets have been parsed
has_error = False
for widget in self.widgets:
if widget.value is None:
continue
if widget.has_error():
has_error = True
return has_error
def render_title(self, title):
attrs = {'id': 'form_label_%s' % self.get_name_for_id()}
if not title or self.label_display == 'hidden':
# add a tag even if there's no label to display as it's used as an anchor point
# for links to errors.
return htmltag('div', **attrs) + htmltext('</div>')
if self.label_display == 'normal':
return super().render_title(title)
if self.required:
title += htmltext('<span title="%s" class="required">*</span>') % _('This field is required.')
hint = self.get_hint()
if hint:
attrs['aria-describedby'] = 'form_hint_%s' % self.name
title_tag = htmltag('h4', **attrs)
return title_tag + htmltext('%s</h4>') % title
def had_add_clicked(self):
add_widget = self.get_widget('add_element')
request = get_request()
request_form = getattr(request, 'orig_form', request.form)
return request_form.get(add_widget.name) if add_widget else False