wcs/wcs/blocks.py

321 lines
11 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2020 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import uuid
import time
import xml.etree.ElementTree as ET
from quixote import get_request
from .qommon import _, N_, misc
from .qommon.form import CompositeWidget, WidgetList
from .qommon.storage import StorableObject
from .qommon.template import Template
from . import data_sources
from . import fields
class BlockdefImportError(Exception):
def __init__(self, msg, details=None):
self.msg = msg
self.details = details
class BlockDef(StorableObject):
_names = 'blockdefs'
_indexes = ['slug']
xml_root_node = 'block'
name = None
slug = None
fields = None
digest_template = None
last_modification_time = None
last_modification_user_id = None
# declarations for serialization
TEXT_ATTRIBUTES = ['name', 'slug', 'digest_template']
def __init__(self, name=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.fields = []
def store(self):
if self.slug is None:
# set slug if it's not yet there
self.slug = self.get_new_slug()
self.last_modification_time = time.localtime()
if get_request() and get_request().user:
self.last_modification_user_id = str(get_request().user.id)
else:
self.last_modification_user_id = None
super().store()
def get_new_slug(self):
new_slug = misc.simplify(self.name, space='_')
base_new_slug = new_slug
suffix_no = 0
while True:
try:
obj = self.get_on_index(new_slug, 'slug', ignore_migration=True)
except KeyError:
break
if obj.id == self.id:
break
suffix_no += 1
new_slug = '%s_%s' % (base_new_slug, suffix_no)
return new_slug
def get_new_field_id(self):
return 'bf%s' % str(uuid.uuid4())
def get_display_value(self, value):
if not self.digest_template:
return self.name
from .qommon.substitution import CompatibilityNamesDict
from .variables import LazyBlockDataVar
context = CompatibilityNamesDict({self.slug + '_var': LazyBlockDataVar(self.fields, value)})
return Template(self.digest_template, autoescape=False).render(context)
def export_to_xml(self, include_id=False):
root = ET.Element(self.xml_root_node)
if include_id and self.id:
root.attrib['id'] = str(self.id)
for text_attribute in list(self.TEXT_ATTRIBUTES):
if not hasattr(self, text_attribute) or not getattr(self, text_attribute):
continue
ET.SubElement(root, text_attribute).text = getattr(self, text_attribute)
if self.last_modification_time:
elem = ET.SubElement(root, 'last_modification')
elem.text = time.strftime('%Y-%m-%d %H:%M:%S', self.last_modification_time)
if include_id:
elem.attrib['user_id'] = str(self.last_modification_user_id)
fields = ET.SubElement(root, 'fields')
for field in self.fields or []:
fields.append(field.export_to_xml(charset='utf-8', include_id=True))
return root
@classmethod
def import_from_xml(cls, fd, include_id=False, check_datasources=True):
try:
tree = ET.parse(fd)
except:
raise ValueError()
blockdef = cls.import_from_xml_tree(tree, include_id=include_id)
if blockdef.slug:
try:
cls.get_on_index(blockdef.slug, 'slug', ignore_migration=True)
except KeyError:
pass
else:
blockdef.slug = blockdef.get_new_slug()
if check_datasources:
# check if datasources are defined
unknown_datasources = set()
for field in blockdef.fields:
data_source = getattr(field, 'data_source', None)
if data_source:
if isinstance(data_sources.get_object(data_source), data_sources.StubNamedDataSource):
unknown_datasources.add(data_source.get('type'))
if unknown_datasources:
raise BlockdefImportError(
N_('Unknown datasources'), details=', '.join(sorted(unknown_datasources))
)
return blockdef
@classmethod
def import_from_xml_tree(cls, tree, include_id=False):
charset = 'utf-8'
blockdef = cls()
if tree.find('name') is None or not tree.find('name').text:
raise BlockdefImportError(N_('Missing name'))
# if the tree we get is actually a ElementTree for real, we get its
# root element and go on happily.
if not ET.iselement(tree):
tree = tree.getroot()
if tree.tag != cls.xml_root_node:
raise BlockdefImportError(N_('Unexpected root node'))
if include_id and tree.attrib.get('id'):
blockdef.id = tree.attrib.get('id')
for text_attribute in list(cls.TEXT_ATTRIBUTES):
value = tree.find(text_attribute)
if value is None or value.text is None:
continue
setattr(blockdef, text_attribute, misc.xml_node_text(value))
blockdef.fields = []
for i, field in enumerate(tree.find('fields')):
try:
field_o = fields.get_field_class_by_type(field.findtext('type'))()
except KeyError:
raise BlockdefImportError(N_('Unknown field type'), details=field.findtext('type'))
field_o.init_with_xml(field, charset, include_id=True)
blockdef.fields.append(field_o)
if tree.find('last_modification') is not None:
node = tree.find('last_modification')
blockdef.last_modification_time = time.strptime(node.text, '%Y-%m-%d %H:%M:%S')
if include_id and node.attrib.get('user_id'):
blockdef.last_modification_user_id = node.attrib.get('user_id')
return blockdef
def get_usage_formdefs(self):
from wcs.formdef import get_formdefs_of_all_kinds
block_identifier = 'block:%s' % self.slug
for formdef in get_formdefs_of_all_kinds():
for field in formdef.fields:
if field.type == block_identifier:
yield formdef
break
def is_used(self):
return any(self.get_usage_formdefs())
class BlockSubWidget(CompositeWidget):
def __init__(self, name, value=None, *args, **kwargs):
self.block = kwargs.pop('block')
self.readonly = kwargs.get('readonly')
super().__init__(name, value, *args, **kwargs)
for field in self.block.fields:
if 'readonly' in kwargs:
field.add_to_view_form(form=self)
else:
field.add_to_form(form=self)
if value:
self.set_value(value)
def set_value(self, value):
for widget in self.get_widgets():
widget.set_value(value.get(widget.field.id))
def get_field_data(self, field, widget):
from wcs.formdef import FormDef
return FormDef.get_field_data(field, widget)
def _parse(self, request):
value = {}
empty = True
for widget in self.get_widgets():
widget_value = self.get_field_data(widget.field, widget)
value.update(widget_value)
if widget_value.get(widget.field.id) is not None:
empty = False
if empty:
value = None
self.value = value
def add_media(self):
for widget in self.get_widgets():
if hasattr(widget, 'add_media'):
widget.add_media()
class BlockWidget(WidgetList):
def __init__(
self, name, value=None, title=None, block=None, max_items=None, add_element_label=None, **kwargs
):
self.block = block
self.readonly = kwargs.get('readonly')
element_values = None
if value:
element_values = value.get('data')
if not max_items:
max_items = 1
element_kwargs = {'block': self.block, 'render_br': False}
element_kwargs.update(kwargs)
super().__init__(
name,
value=element_values,
title=title,
max_items=max_items,
element_type=BlockSubWidget,
element_kwargs=element_kwargs,
add_element_label=add_element_label or _('Add another'),
**kwargs,
)
def set_value(self, value):
super().set_value(value['data'] if value else None)
self.value = value
def _parse(self, request):
# iterate over existing form keys to get actual list of elements.
# (maybe this could be moved to WidgetList)
prefix = '%s$element' % self.name
known_prefixes = {x.split('$', 2)[1] for x in request.form.keys() if x.startswith(prefix)}
for i in range(len(known_prefixes) - len(self.element_names)):
self.add_element()
super()._parse(request)
if self.value:
self.value = {'data': self.value}
# keep "schema" next to data, this allows custom behaviour for
# date fields (time.struct_time) when writing/reading from
# database in JSON.
self.value['schema'] = {x.id: x.key for x in self.block.fields}
def parse(self, request=None):
if not self._parsed:
self._parsed = True
if request is None:
request = get_request()
self._parse(request)
if self.required and self.value is None:
self.set_error(_(self.REQUIRED_ERROR))
return self.value
def add_media(self):
for widget in self.get_widgets():
if hasattr(widget, 'add_media'):
widget.add_media()
def get_error(self, request=None):
request = request or get_request()
if request.get_method() == 'POST':
self.parse(request=request)
return self.error
def has_error(self, request=None):
if self.get_error():
return True
# we know subwidgets have been parsed
has_error = False
for widget in self.widgets:
if widget.value is None:
continue
if widget.has_error():
has_error = True
return has_error