# w.c.s. - web application for online forms # Copyright (C) 2005-2010 Entr'ouvert # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . import base64 import collections import contextlib import copy import datetime import decimal import glob import itertools import json import os import pickle import sys import time import uuid import xml.etree.ElementTree as ET from operator import itemgetter from django.utils.encoding import force_bytes, force_str from quixote import get_publisher, get_session from quixote.html import TemplateIO, htmltext from . import data_sources, fields from .categories import Category from .qommon import PICKLE_KWARGS, _, get_cfg, ngettext, pgettext_lazy from .qommon.admin.emails import EmailsDirectory from .qommon.afterjobs import AfterJob from .qommon.cron import CronJob from .qommon.errors import UnknownReferencedErrorMixin from .qommon.form import Form, HtmlWidget from .qommon.misc import ( JSONEncoder, get_as_datetime, get_dependencies_from_template, indent_xml, is_attachment, is_upload, simplify, xml_node_text, ) from .qommon.publisher import get_publisher_class from .qommon.storage import Equal, NotEqual, StorableObject, classonlymethod, fix_key from .qommon.substitution import Substitutions from .qommon.template import Template from .qommon.upload_storage import PicklableUpload from .roles import logged_users_role DRAFTS_DEFAULT_LIFESPAN = 100 # days DRAFTS_DEFAULT_MAX_PER_USER = 5 class FormdefImportError(Exception): def __init__(self, msg, msg_args=None, details=None): self.msg = msg self.msg_args = msg_args or () self.details = details class FormdefImportUnknownReferencedError(UnknownReferencedErrorMixin, FormdefImportError): pass class FormdefImportRecoverableError(FormdefImportError): pass class FormDefDoesNotExist(AttributeError): error_message = _('No such form: %s') def get_error_message(self): return self.error_message % self class FormField: # only used to unpickle form fields from older (<200603) versions def __setstate__(self, dict): type = dict['type'] self.real_field = fields.get_field_class_by_type(type)(**dict) def lax_int(s): try: return int(s) except (ValueError, TypeError): return -1 class FormDefForm(Form): ERROR_NOTICE = _('There were errors processing the form and you cannot go to the next page.') def __init__(self): super().__init__(enctype='multipart/form-data', use_tokens=False) self.attrs['data-warn-on-unsaved-content'] = 'true' def _render_error_notice_content(self, errors): t = TemplateIO(html=True) t += super()._render_error_notice_content(errors) widget_with_errors = [] for widget in self.get_all_widgets(): if hasattr(widget, 'field') and widget.has_error() and not getattr(widget, 'is_hidden', False): widget_with_errors.append(widget) if widget_with_errors: t += htmltext('') return t.getvalue() class FormDef(StorableObject): # noqa pylint: disable=too-many-public-methods _names = 'formdefs' _indexes = ['url_name'] _hashed_indexes = ['backoffice_submission_roles'] backoffice_class = 'wcs.admin.forms.FormDefPage' data_sql_prefix = 'formdata' pickle_module_name = 'formdef' xml_root_node = 'formdef' backoffice_section = 'forms' verbose_name = _('Form') verbose_name_plural = _('Forms') item_name = pgettext_lazy('item', 'form') item_name_plural = pgettext_lazy('item', 'forms') fields_count_total_soft_limit = 200 fields_count_total_hard_limit = 400 name = None description = None keywords = None url_name = None table_name = None # for SQL only fields = None category_id = None workflow_id = None workflow_options = None workflow_roles = None roles = None required_authentication_contexts = None backoffice_submission_roles = None discussion = False confirmation = True detailed_emails = True disabled = False only_allow_one = False enable_tracking_codes = False tracking_code_verify_fields = None disabled_redirection = None always_advertise = False publication_date = None expiration_date = None has_captcha = False skip_from_360_view = False management_sidebar_items = {'__default__'} submission_sidebar_items = {'__default__'} include_download_all_button = False appearance_keywords = None digest_templates = None lateral_template = None submission_lateral_template = None id_template = None drafts_lifespan = None drafts_max_per_user = None user_support = None submission_user_association = 'any' documentation = None geolocations = None history_pane_default_mode = 'expanded' sql_integrity_errors = None # store reverse relations reverse_relations = None # store fields in a separate pickle chunk lightweight = True # prefixes for formdata variables var_prefixes = ['form'] # users are allowed to access formdata where they're submitter. user_allowed_to_access_own_data = True submission_user_association_available_options = ['any', 'any-required', 'roles'] # declarations for serialization TEXT_ATTRIBUTES = [ 'name', 'url_name', 'description', 'keywords', 'publication_date', 'expiration_date', 'disabled_redirection', 'appearance_keywords', 'lateral_template', 'submission_lateral_template', 'id_template', 'drafts_lifespan', 'drafts_max_per_user', 'user_support', 'documentation', 'submission_user_association', ] BOOLEAN_ATTRIBUTES = [ 'discussion', 'detailed_emails', 'disabled', 'only_allow_one', 'enable_tracking_codes', 'confirmation', 'always_advertise', 'has_captcha', 'skip_from_360_view', ] category_class = Category def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.fields = [] def __eq__(self, other): return bool( isinstance(other, FormDef) and self.xml_root_node == other.xml_root_node and self.id == other.id ) def migrate(self): changed = False if self.__dict__.get('fields') is Ellipsis: # don't run migration on lightweight objects return if isinstance(self.category_id, int): self.category_id = str(self.category_id) changed = True if isinstance(self.workflow_id, int): self.workflow_id = str(self.workflow_id) changed = True if self.roles: for role in self.roles: if isinstance(role, int): self.roles = [str(x) for x in self.roles] changed = True break if self.workflow_roles: workflow_roles_list = self.workflow_roles.items() for role_id in self.workflow_roles.values(): if isinstance(role_id, int): self.workflow_roles = {x: str(y) for x, y in workflow_roles_list} changed = True break if self.include_download_all_button: # 2023-12-30 self.management_sidebar_items = self.get_default_management_sidebar_items() self.management_sidebar_items.add('download-files') self.include_download_all_button = False changed = True for f in self.fields or []: changed |= f.migrate() if changed: self.store(comment=_('Automatic update'), snapshot_store_user=False) @classmethod def remove_object(cls, id): super().remove_object(id) from . import sql sql.SearchableFormDef.update(removed_obj_type=cls.xml_root_node, removed_obj_id=id) if cls == FormDef: # recreate global views so they don't reference formdata from # deleted formefs conn, cur = sql.get_connection_and_cursor() with sql.atomic(): sql.clean_global_views(conn, cur) cur.close() def get_default_management_sidebar_items(self): return { 'general', 'submission-context', 'user', 'geolocation', 'custom-template', 'pending-forms', } def get_management_sidebar_available_items(self): return [ ('general', _('General Information')), ('download-files', _('Button to download all files')), ('submission-context', _('Submission context')), ('user', _('Associated User')), ('geolocation', _('Geolocation')), ('custom-template', _('Custom template')), ('pending-forms', _('User Pending Forms')), ] def management_sidebar_items_labels(self): # return ordered labels management_sidebar_items = self.get_management_sidebar_items() for key, label in self.get_management_sidebar_available_items(): if key in management_sidebar_items: yield label def get_management_sidebar_items(self): if self.management_sidebar_items == {'__default__'}: return self.get_default_management_sidebar_items() return self.management_sidebar_items or [] def get_default_submission_sidebar_items(self): return { 'general', 'submission-context', 'user', 'custom-template', } def get_submission_sidebar_available_items(self): return [ ('general', _('General Information')), ('submission-context', _('Submission context')), ('user', _('Associated User')), ('custom-template', _('Custom template')), ] def submission_sidebar_items_labels(self): # return ordered labels submission_sidebar_items = self.get_submission_sidebar_items() for key, label in self.get_submission_sidebar_available_items(): if key in submission_sidebar_items: yield label def get_submission_sidebar_items(self): if self.submission_sidebar_items == {'__default__'}: return self.get_default_submission_sidebar_items() return self.submission_sidebar_items or [] @property def data_class_name(self): return '_wcs_%s' % self.url_name.title() def data_class(self, mode=None): if 'formdef' not in sys.modules: sys.modules['formdef'] = sys.modules[__name__] if hasattr(sys.modules['formdef'], self.data_class_name): data_class = getattr(sys.modules['formdef'], self.data_class_name) # only use existing data class if it has a reference to this actual # formdef if data_class._formdef is self: return data_class from . import sql table_name = sql.get_formdef_table_name(self) cls = type(self.data_class_name, (sql.SqlFormData,), {'_formdef': self, '_table_name': table_name}) setattr(sys.modules['formdef'], self.data_class_name, cls) setattr(sys.modules['wcs.formdef'], self.data_class_name, cls) return cls def get_new_field_id(self): return str(uuid.uuid4()) def get_new_url_name(self): new_url_name = simplify(self.name)[:250] base_new_url_name = new_url_name suffix_no = 0 while True: try: obj = self.get_on_index(new_url_name, 'url_name', ignore_migration=True) except KeyError: break if obj.id == self.id: break suffix_no += 1 new_url_name = '%s-%s' % (base_new_url_name, suffix_no) return new_url_name @classmethod def get_new_id(cls, create=False): id = super().get_new_id(create=False) id = cls.get_sql_new_id(id_start=int(id)) if create: objects_dir = cls.get_objects_dir() object_filename = os.path.join(objects_dir, fix_key(id)) try: fd = os.open(object_filename, os.O_CREAT | os.O_EXCL) except OSError: return cls.get_new_id(create=True) os.close(fd) return str(id) @classmethod def get_sql_new_id(cls, id_start): from . import sql return sql.get_formdef_new_id(id_start=id_start) def get_order_by(self, order_by): if not order_by: return order_by direction = '' if order_by.startswith('-'): order_by = order_by[1:] direction = '-' for field in self.iter_fields(include_block_fields=True): if getattr(field, 'block_field', None): if field.key == 'items': # not yet continue if order_by not in [field.contextual_varname, 'f%s' % field.contextual_id]: continue if field.contextual_varname == order_by: order_by = 'f%s' % field.contextual_id if getattr(field, 'block_field', None) and 'f%s' % field.contextual_id == order_by: # field of block field, sort on the first element order_by = "f%s->'data'->0->>'%s%s'" % ( field.block_field.id, field.id, '_display' if field.store_display_value else '', ) elif field.store_display_value: order_by += '_display' break if order_by == 'digest': order_by = "digests->>'default'" order_by = '%s%s' % (direction, order_by) if order_by == 'criticality_level': order_by = [order_by, 'receipt_time'] elif order_by == '-criticality_level': order_by = [order_by, '-receipt_time'] return order_by def has_admin_access(self, user): # return True if user 1/ is global administrator for this type of object, or # 2/ has one of the management roles defined in its category. if get_publisher().get_backoffice_root().is_global_accessible(self.backoffice_section): return True if not user: return False if not self.category_id: return False management_roles = {x.id for x in getattr(self.category, 'management_roles') or []} user_roles = set(user.get_roles()) return management_roles.intersection(user_roles) @classonlymethod def wipe(cls): super().wipe() cls.sql_wipe() @classmethod def sql_wipe(cls): from . import sql sql.formdef_wipe() def store(self, comment=None, snapshot_store_user=True, application=None, *args, **kwargs): assert not self.is_readonly() if self.url_name is None: # set url name if it's not yet there self.url_name = self.get_new_url_name() object_only = kwargs.pop('object_only', False) if not object_only: self.update_relations() StorableObject.store(self, *args, **kwargs) if object_only: return if get_publisher().snapshot_class: get_publisher().snapshot_class.snap( instance=self, comment=comment, store_user=snapshot_store_user, application=application ) if get_publisher().has_postgresql_config(): self.update_storage() self.store_related_custom_views() self.update_searchable_formdefs_table() self.update_category_reference() if hasattr(self, 'xml_testdefs'): self.finish_tests_xml_import() def update_category_reference(self): if getattr(self, '_onload_category_id', None) != self.category_id: from . import sql sql.update_global_view_formdef_category(self) self._onload_category_id = self.category_id def has_captcha_enabled(self): return self.has_captcha and get_publisher().has_site_option('formdef-captcha-option') def update_storage(self): from . import sql actions = sql.do_formdef_tables(self, rebuild_views=True, rebuild_global_views=True) if actions: cls = self.data_class() for action in actions: getattr(cls, action)() def update_searchable_formdefs_table(self): from . import sql sql.SearchableFormDef.update(obj=self) def update_relations(self): from wcs.carddef import CardDef self_ref = '%s:%s' % (self.xml_root_node, self.url_name) self_relations_by_ref = self.build_relations_by_ref() reverse_relations = [] # cross each formdef and cardef and check relations for objdef in itertools.chain( FormDef.select(ignore_errors=True, ignore_migration=True), CardDef.select(ignore_errors=True, ignore_migration=True), ): objdef_ref = '%s:%s' % (objdef.xml_root_node, objdef.url_name) if objdef.xml_root_node == self.xml_root_node and objdef.id == self.id: # don't build relations twice objdef_relations_by_ref = self_relations_by_ref else: objdef_relations_by_ref = objdef.build_relations_by_ref() reverse_relations += objdef_relations_by_ref.get(self_ref, []) old_objdef_reverse_relations = copy.deepcopy(objdef.reverse_relations) # remove relations with self in objdef's reverse_relations new_objdef_reverse_relations = [ r for r in (objdef.reverse_relations or []) if r['obj'] != self_ref ] # and update objdef's reverse_relations from self_relations_by_ref new_objdef_reverse_relations += self_relations_by_ref.get(objdef_ref, []) # sort objectdef's reverse_relations new_objdef_reverse_relations = sorted( new_objdef_reverse_relations, key=itemgetter('obj', 'varname', 'type') ) if old_objdef_reverse_relations != new_objdef_reverse_relations: objdef.reverse_relations = new_objdef_reverse_relations objdef.store(object_only=True) # sort self's reverse_relations and set self.reverse_relations = sorted(reverse_relations, key=itemgetter('obj', 'varname', 'type')) def build_relations_by_ref(self): # build relations to other carddefs, to be stored in some object reverse field self_ref = '%s:%s' % (self.xml_root_node, self.url_name) relations_by_ref = collections.defaultdict(list) def _check_field(field): data_source = getattr(field, 'data_source', None) if not data_source or not data_source.get('type', '').startswith('carddef:'): return # reverse relation of data_source['type'] to this object obj_ref = ':'.join(data_source['type'].split(':')[:2]) # remove possible custom-view relations_by_ref[obj_ref].append( { 'varname': field.contextual_varname or '', 'type': field.key, 'obj': self_ref, } ) for field in self.iter_fields(include_block_fields=True): if field.key in ['item', 'items', 'computed']: _check_field(field) # remove duplicated items return { k: list(map(dict, {tuple(sorted(d.items())) for d in v})) for k, v in relations_by_ref.items() } def store_related_custom_views(self): for view in getattr(self, '_custom_views', []): if not view.id: existing_views = get_publisher().custom_view_class.select( [ Equal('formdef_type', self.xml_root_node), Equal('formdef_id', str(self.id)), Equal('visibility', view.visibility), Equal('slug', view.slug), ] ) if existing_views: view.id = existing_views[0].id view.formdef = self view.store() def get_all_fields(self): return (self.fields or []) + self.workflow.get_backoffice_fields() def get_all_fields_dict(self): return {x.id: x for x in self.get_all_fields()} def get_total_count_data_fields(self): count = len([x for x in self.fields or [] if not x.is_no_data_field and not x.key == 'block']) for field in self.fields or []: if not field.key == 'block': continue try: count += len([x for x in field.block.fields or [] if not x.is_no_data_field]) * ( field.default_items_count or 1 ) except KeyError: continue return count def iter_fields(self, include_block_fields=False, with_backoffice_fields=True, with_no_data_fields=True): def _iter_fields(fields, block_field=None): for field in fields: if with_no_data_fields is False and field.is_no_data_field: continue # add contextual_id/contextual_varname attributes # they are id/varname for normal fields # but in case of blocks they are concatenation of block id/varname + field id/varname field.contextual_id = field.id field.contextual_varname = None if block_field: field.block_field = block_field field.contextual_id = '%s-%s' % (field.block_field.id, field.id) if field.varname and field.block_field.varname: field.contextual_varname = '%s_%s' % ( field.block_field.varname, field.varname, ) else: field.contextual_varname = field.varname yield field if field.key == 'block' and include_block_fields: try: field.block # load block except KeyError: # blockdef not found continue yield from _iter_fields(field.block.fields, block_field=field) field._block = None # reset cache if with_backoffice_fields: fields = self.get_all_fields() else: fields = self.fields or [] yield from _iter_fields(fields) def get_widget_fields(self): return [field for field in self.fields or [] if isinstance(field, fields.WidgetField)] @property def default_digest_template(self): return (self.digest_templates or {}).get('default') def get_category(self): if self.category_id: try: return self.category_class.get(self.category_id) except KeyError: return None else: return None def set_category(self, category): if category: self.category_id = category.id elif self.category_id: self.category_id = None category = property(get_category, set_category) def get_drafts_lifespan(self): return int(self.drafts_lifespan or DRAFTS_DEFAULT_LIFESPAN) def get_drafts_max_per_user(self): return int(self.drafts_max_per_user or DRAFTS_DEFAULT_MAX_PER_USER) _workflow = None def get_workflow(self): if self._workflow and self._workflow.id == self.workflow_id: return self._workflow from wcs.workflows import Workflow if self.workflow_id: try: self._workflow = Workflow.get(self.workflow_id) except KeyError: return Workflow.get_unknown_workflow() return self._workflow else: self._workflow = self.get_default_workflow() return self._workflow @classmethod def get_default_workflow(cls): from wcs.workflows import Workflow return Workflow.get_default_workflow() def set_workflow(self, workflow): if workflow and workflow.id not in ['_carddef_default', '_default']: self.workflow_id = workflow.id self._workflow = workflow elif self.workflow_id: self.workflow_id = None self._workflow = None workflow = property(get_workflow, set_workflow) def get_dependencies(self): yield self.category if self.workflow_id and self.workflow.id not in ['_carddef_default', '_default']: yield self.workflow for field in self.fields or []: yield from field.get_dependencies() role_class = get_publisher().role_class for role_id in itertools.chain(self.roles or [], self.backoffice_submission_roles or []): yield role_class.get(role_id, ignore_errors=True) for role_id in (self.workflow_roles or {}).values(): yield role_class.get(role_id, ignore_errors=True) for view in get_publisher().custom_view_class.select_shared_for_formdef(formdef=self): yield from view.get_dependencies() for template in list((self.digest_templates or {}).values()) + [ self.lateral_template, self.submission_lateral_template, ]: yield from get_dependencies_from_template(template) from .testdef import TestDef for testdef in TestDef.select_for_objectdef(self): yield from testdef.get_dependencies() @property def keywords_list(self): if not self.keywords: return [] return [x.strip() for x in self.keywords.split(',')] @property def appearance_keywords_list(self): if not get_publisher().has_site_option('formdef-appearance-keywords'): return [] if not self.appearance_keywords: return [] return [x.strip() for x in self.appearance_keywords.split()] def get_variable_options(self): variables = {} if not self.workflow.variables_formdef: return variables workflow_options = self.workflow_options or {} for field in self.workflow.variables_formdef.fields: if not field.varname: continue option_name = 'form_option_' + field.varname variables[option_name] = getattr(field, 'default_value', None) if workflow_options.get(field.varname) is not None: variables[option_name] = workflow_options.get(field.varname) if field.store_display_value: if '%s_display' % field.varname in workflow_options: variables[option_name + '_raw'] = variables[option_name] variables[option_name] = workflow_options.get('%s_display' % field.varname) if field.store_structured_value: if '%s_structured' % field.varname in workflow_options: variables[option_name + '_structured'] = workflow_options.get( '%s_structured' % field.varname ) return variables def get_variable_options_for_form(self): variables = {} if not self.workflow.variables_formdef: return variables if not self.workflow_options: return {} for field in self.workflow.variables_formdef.fields: if not field.varname: continue variables[str(field.id)] = self.workflow_options.get(field.varname) return variables def set_variable_options(self, form): data = self.workflow.variables_formdef.get_data(form) variables = {} for field in self.workflow.variables_formdef.fields: if not field.varname: continue variables[field.varname] = data.get(field.id) if field.store_display_value: variables[field.varname + '_display'] = data.get(field.id + '_display') if field.store_structured_value: variables[field.varname + '_structured'] = data.get(field.id + '_structured') if not self.workflow_options: self.workflow_options = {} self.workflow_options.update(variables) @classmethod def get_by_urlname(cls, url_name, ignore_migration=False, ignore_errors=False, use_cache=False): return cls.get_on_index( url_name, 'url_name', ignore_migration=ignore_migration, ignore_errors=ignore_errors, use_cache=use_cache, ) get_by_slug = get_by_urlname @property def slug(self): return self.url_name @slug.setter def slug(self, value): self.url_name = value def get_url(self, backoffice=False, preview=False, include_category=False, language=None): if backoffice: base_url = get_publisher().get_backoffice_url() + '/management' elif preview: base_url = get_publisher().get_frontoffice_url() + '/preview' else: base_url = get_publisher().get_frontoffice_url() if language and get_publisher().has_i18n_enabled(): base_url += '/' + language if include_category and self.category_id: return '%s/%s/%s/' % (base_url, self.category.slug, self.url_name) return '%s/%s/' % (base_url, self.url_name) def get_api_url(self): base_url = get_publisher().get_frontoffice_url() return '%s/api/forms/%s/' % (base_url, self.url_name) def get_admin_url(self): base_url = get_publisher().get_backoffice_url() return '%s/forms/%s/' % (base_url, self.id) def get_backoffice_url(self): return self.get_url(backoffice=True) def get_preview_url(self): return self.get_url(preview=True) def get_field_admin_url(self, field): return self.get_admin_url() + 'fields/%s/' % field.id def get_submission_url(self, backoffice=False): if backoffice: return self.get_backoffice_submission_url() return self.get_url() def get_backoffice_submission_url(self): base_url = get_publisher().get_backoffice_url() + '/submission' return '%s/%s/' % (base_url, self.url_name) def get_display_id_format(self): return self.id_template or '{{formdef_id}}-{{form_number_raw}}' def get_by_id_criteria(self, value): if self.id_template: return Equal('id_display', str(value)) try: if int(value) >= 2**31: # out of range for postgresql integer type; would raise DataError. raise OverflowError except ValueError: # value not an integer, it could be id_display return Equal('id_display', str(value)) return Equal('id', value) def get_submission_lateral_block(self): context = get_publisher().substitutions.get_context_variables(mode='lazy') if self.submission_lateral_template is None: new_value = None else: try: new_value = Template(self.submission_lateral_template, autoescape=False, raises=True).render( context ) except Exception as e: get_publisher().record_error( _('Could not render submission lateral template (%s)' % e), formdef=self, exception=e, ) return None return new_value def create_form(self, page=None, displayed_fields=None, transient_formdata=None): form = FormDefForm() if self.appearance_keywords: form.attrs['class'] = 'quixote %s' % self.appearance_keywords if self.keywords: form.attrs['data-keywords'] = ' '.join(self.keywords_list) self.add_fields_to_form( form, page=page, displayed_fields=displayed_fields, transient_formdata=transient_formdata ) return form def get_computed_fields_from_page(self, page): on_page = page is None for field in self.fields: if field.key == 'page': if on_page: break if page.id == field.id: on_page = True continue if not on_page: continue if field.key == 'computed': yield field def add_fields_to_form( self, form, page=None, displayed_fields=None, form_data=None, # a dictionary, to fill fields transient_formdata=None, ): # a FormData on_page = page is None hidden_varnames = set() for field in self.fields: field.formdef = self if field.key == 'page': if on_page: break if page.id == field.id: on_page = True continue if not on_page: continue visible = field.is_visible(form_data, self) if not visible: if not field.has_live_conditions(self, hidden_varnames=hidden_varnames): # ignore field.varname when checking later conditions for liveness if field.varname: hidden_varnames.add(field.varname) # no live conditions so field can be skipped continue if isinstance(displayed_fields, list): displayed_fields.append(field) value = None if form_data: value = form_data.get(field.id) if not field.add_to_form: continue widget = field.add_to_form(form, value) widget.is_hidden = not (visible) widget.field = field if transient_formdata and not widget.is_hidden: transient_formdata.data.update(self.get_field_data(field, widget)) # invalidate cache as comment fields (and other things?) may # have accessed variables in non-lazy mode and caused a cache # with now-obsolete values. get_publisher().substitutions.invalidate_cache() widget._parsed = False widget.error = None def get_page(self, page_no): return [x for x in self.fields if x.key == 'page'][page_no] def page_count(self): return len([x for x in self.fields if x.key == 'page']) or 1 def create_view_form(self, dict=None, use_tokens=True, visible=True): dict = dict or {} form = Form(enctype='multipart/form-data', use_tokens=use_tokens) if not visible: form.attrs['style'] = 'display: none;' if self.keywords: form.attrs['data-keywords'] = ' '.join(self.keywords_list) form_fields = self.fields if form_fields and form_fields[0].key != 'page': # add fake initial page in case it's missing form_fields = [fields.PageField(label='', type='page')] + form_fields # 1st pass to group fields on different pages pages = [] current_page = {} for field in form_fields: if field.key == 'page': current_page = {'page': field, 'fields': []} current_page['disabled'] = not field.is_visible(dict, self) pages.append(current_page) continue if current_page['disabled']: continue if field.key == 'title' and ( not current_page['fields'] and current_page['page'].label == field.label ): # don't include first title of a page if that title has the # same text as the page. continue if field.key in ('title', 'subtitle', 'comment') and not field.include_in_validation_page: # don't render field that wouldn't be displayed. continue if not field.is_visible(dict, self): continue current_page['fields'].append(field) # 2nd pass to create view form for page in pages: visible_contents = False if page['fields'] and any(x.include_in_validation_page for x in page['fields']): visible_contents = True form.widgets.append(HtmlWidget(htmltext('
'))) if page['page'].label: form.widgets.append(HtmlWidget(htmltext('

%s

') % page['page'].label)) form.widgets.append(HtmlWidget(htmltext('
'))) for field in page['fields']: value = dict.get(field.id) if not field.add_to_view_form: continue if not field.include_in_validation_page: form.widgets.append(HtmlWidget(htmltext('
'))) field.add_to_view_form(form, value) form.widgets.append(HtmlWidget(htmltext('
'))) else: field.add_to_view_form(form, value) if visible_contents: form.widgets.append(HtmlWidget(htmltext('
'))) return form def set_live_condition_sources(self, form, fields): live_condition_fields = {} fields_ids = {str(x.id) for x in fields} for field in self.iter_fields(include_block_fields=True): if (hasattr(field, 'block_field') and str(field.block_field.id) not in fields_ids) or ( not hasattr(field, 'block_field') and str(field.id) not in fields_ids ): continue if field.condition: field.varnames = field.get_condition_varnames(formdef=self) for varname in field.varnames: if varname not in live_condition_fields: live_condition_fields[varname] = [] live_condition_fields[varname].append(field) if field.key in ('item', 'items') and field.data_source: data_source = data_sources.get_object(field.data_source) if data_source.type in ('json', 'jsonvalue', 'geojson') or data_source.type.startswith( 'carddef:' ): varnames = data_source.get_referenced_varnames(formdef=self) for varname in varnames: if varname not in live_condition_fields: live_condition_fields[varname] = [] live_condition_fields[varname].append(field) if field.prefill and field.prefill.get('type') == 'string': for varname in field.get_referenced_varnames( formdef=self, value=field.prefill.get('value', '') ): if varname not in live_condition_fields: live_condition_fields[varname] = [] live_condition_fields[varname].append(field) if field.key == 'comment': for varname in field.get_referenced_varnames(formdef=self, value=field.label): if varname not in live_condition_fields: live_condition_fields[varname] = [] live_condition_fields[varname].append(field) for field in fields: if field.varname in live_condition_fields: widget = form.get_widget('f%s' % field.id) if widget: widget.live_condition_source = True widget.live_condition_fields = live_condition_fields[field.varname] elif field.key == 'computed': field.live_condition_source = True field.live_condition_fields = live_condition_fields[field.varname] @classmethod def get_field_data(cls, field, widget, raise_on_error=False): d = {} d[field.id] = widget.parse() if d.get(field.id) is not None and field.convert_value_from_str: d[field.id] = field.convert_value_from_str(d[field.id]) field.set_value(d, d[field.id], raise_on_error=raise_on_error) if getattr(widget, 'cleanup', None): widget.cleanup() return d def get_data(self, form, raise_on_error=False, pages=None): d = {} current_page_id = None for field in self.fields: if field.key == 'page': current_page_id = field.id continue if pages not in (None, [None]) and current_page_id not in [x.id for x in pages]: continue widget = form.get_widget('f%s' % field.id) if widget: d.update(self.get_field_data(field, widget, raise_on_error=raise_on_error)) elif pages is not None: # reset d[f'{field.id}'] = d[f'{field.id}_display'] = d[f'{field.id}_structured'] = None return d def export_to_json(self, include_id=False, indent=None, with_user_fields=False): from wcs.carddef import CardDef root = {} root['name'] = self.name if include_id and self.id: root['id'] = str(self.id) if self.category: root['category'] = self.category.name root['category_id'] = str(self.category.id) if self.workflow: root['workflow'] = self.workflow.get_json_export_dict(include_id=include_id) more_attributes = ['tracking_code_verify_fields'] for attribute in self.TEXT_ATTRIBUTES + self.BOOLEAN_ATTRIBUTES + more_attributes: if not hasattr(self, attribute): continue root[attribute] = getattr(self, attribute) if isinstance(root[attribute], time.struct_time): root[attribute] = time.strftime('%Y-%m-%dT%H:%M:%S', root[attribute]) root['fields'] = [] if self.fields: for field in self.fields: root['fields'].append(field.export_to_json(include_id=include_id)) if self.geolocations: root['geolocations'] = self.geolocations.copy() if self.workflow_options: root['options'] = self.workflow_options.copy() for k, v in list(root['options'].items()): # convert time.struct_time to strings as python3 would # serialize it as tuple. if isinstance(v, time.struct_time): root['options'][k] = time.strftime('%Y-%m-%dT%H:%M:%S', v) if self.required_authentication_contexts: root['required_authentication_contexts'] = self.required_authentication_contexts[:] if self.management_sidebar_items: root['management_sidebar_items'] = sorted(self.management_sidebar_items) if isinstance(self, CardDef): all_carddefs = CardDef.select(ignore_errors=True) all_carddefs = [c for c in all_carddefs if c] all_carddefs_by_slug = {c.url_name: c for c in all_carddefs} def get_field_label(obj, field_varname): card_slug = obj.split(':')[1] carddef = all_carddefs_by_slug.get(card_slug) if not carddef: return for field in carddef.iter_fields(include_block_fields=True): if field.contextual_varname == field_varname: if getattr(field, 'block_field', None): return '%s - %s' % (field.block_field.label, field.label) return field.label card_relations = [] current_objdef_ref = '%s:%s' % (self.xml_root_node, self.url_name) for objdef, relations in self.build_relations_by_ref().items(): if not objdef.startswith('carddef:'): continue try: CardDef.get_by_urlname(objdef.split(':')[1]) except KeyError: continue for relation in relations: if not relation['varname']: continue card_relations.append( { 'varname': relation['varname'], 'label': get_field_label(current_objdef_ref, relation['varname']), 'type': relation['type'], 'obj': objdef, 'reverse': False, } ) for relation in self.reverse_relations or []: if not relation['obj'].startswith('carddef:'): continue if not relation['varname']: continue rel = relation.copy() rel.update( { 'reverse': True, 'label': get_field_label(relation['obj'], relation['varname']), } ) card_relations.append(rel) root['relations'] = sorted(card_relations, key=itemgetter('varname')) if with_user_fields: root['user'] = { 'fields': [ { 'varname': 'name', 'label': _('Full name'), 'type': 'string', }, { 'varname': 'email', 'label': _('Email'), 'type': 'email', }, ] } user_formdef = get_publisher().user_class.get_formdef() if user_formdef: root['user']['fields'] += [ { 'varname': f.varname or '', 'label': f.label, 'type': f.key, } for f in user_formdef.fields ] return json.dumps(root, indent=indent, sort_keys=True, cls=JSONEncoder) @classmethod def import_from_json(cls, fd, include_id=False): formdef = cls() def unicode2str(v): if isinstance(v, dict): return {unicode2str(k): unicode2str(v) for k, v in v.items()} elif isinstance(v, list): return [unicode2str(x) for x in v] elif isinstance(v, str): return force_str(v) else: return v # we have to make sure all strings are str object, not unicode. value = unicode2str(json.load(fd)) if include_id and 'id' in value: formdef.id = value.get('id') if include_id and 'category_id' in value: formdef.category_id = value.get('category_id') elif 'category' in value: category = value.get('category') for c in Category.select(): if c.name == category: formdef.category_id = c.id break if include_id and 'workflow_id' in value: formdef.workflow_id = value.get('workflow_id') elif ( include_id and 'workflow' in value and isinstance(value['workflow'], dict) and 'id' in value['workflow'] ): formdef.workflow_id = value['workflow'].get('id') elif 'workflow' in value: if isinstance(value['workflow'], str): workflow = value.get('workflow') else: workflow = value['workflow'].get('name') from wcs.workflows import Workflow for w in Workflow.select(): if w.name == workflow: formdef.workflow_id = w.id break more_attributes = ['tracking_code_verify_fields'] for attribute in cls.TEXT_ATTRIBUTES + cls.BOOLEAN_ATTRIBUTES + more_attributes: if attribute in value: setattr(formdef, attribute, value.get(attribute)) formdef.fields = [] for i, field in enumerate(value.get('fields', [])): try: field_o = fields.get_field_class_by_type(field.get('type'))() except KeyError: raise FormdefImportError(_('Unknown field type'), details=field.findtext('type')) field_o.init_with_json(field, include_id=True) if not field_o.id: # this assumes all fields will have id, or none of them field_o.id = str(i) formdef.fields.append(field_o) if value.get('options'): formdef.workflow_options = value.get('options') for option_key, option_value in formdef.workflow_options.items(): if isinstance(option_value, dict) and 'filename' in option_value: new_value = PicklableUpload( orig_filename=option_value['filename'], content_type=option_value['content_type'], ) new_value.receive([base64.decodebytes(force_bytes(option_value['content']))]) formdef.workflow_options[option_key] = new_value if value.get('geolocations'): formdef.geolocations = value.get('geolocations') if value.get('required_authentication_contexts'): formdef.required_authentication_contexts = [ str(x) for x in value.get('required_authentication_contexts') ] if value.get('management_sidebar_items'): formdef.management_sidebar_items = {str(x) for x in value.get('management_sidebar_items')} return formdef def export_to_xml(self, include_id=False, include_tests=False): root = ET.Element(self.xml_root_node) if include_id and self.id: root.attrib['id'] = str(self.id) for text_attribute in list(self.TEXT_ATTRIBUTES): if not hasattr(self, text_attribute) or not getattr(self, text_attribute): continue ET.SubElement(root, text_attribute).text = str(getattr(self, text_attribute)) for boolean_attribute in self.BOOLEAN_ATTRIBUTES: if not hasattr(self, boolean_attribute): continue value = getattr(self, boolean_attribute) if value: value = 'true' else: value = 'false' ET.SubElement(root, boolean_attribute).text = value self.category_class.object_category_xml_export(self, root, include_id=include_id) workflow = None if self.workflow_id: from wcs.workflows import Workflow workflow = Workflow.get(self.workflow_id, ignore_errors=True, ignore_migration=True) if not workflow: workflow = self.get_default_workflow() elem = ET.SubElement(root, 'workflow') elem.text = workflow.name if workflow.slug: elem.attrib['slug'] = str(workflow.slug) if include_id: elem.attrib['workflow_id'] = str(workflow.id) if self.tracking_code_verify_fields is not None: verify_fields = ET.SubElement(root, 'tracking_code_verify_fields') for field_id in self.tracking_code_verify_fields: ET.SubElement(verify_fields, 'field_id').text = str(field_id) fields = ET.SubElement(root, 'fields') for field in self.fields or []: fields.append(field.export_to_xml(include_id=include_id)) from wcs.workflows import get_role_name_and_slug def add_role_element(roles_root, role_id): if not role_id: return try: role_name, role_slug = get_role_name_and_slug(role_id) except KeyError: # skip broken/missing roles return sub = ET.SubElement(roles_root, 'role') if role_slug: sub.attrib['slug'] = role_slug if include_id: sub.attrib['role_id'] = str(role_id) sub.text = role_name return sub roles_elements = [ ('roles', 'user-roles'), ('backoffice_submission_roles', 'backoffice-submission-roles'), ] for attr_name, node_name in roles_elements: if not getattr(self, attr_name, None): continue roles = ET.SubElement(root, node_name) for role_id in getattr(self, attr_name): add_role_element(roles, role_id) if self.workflow_roles: roles = ET.SubElement(root, 'roles') for role_key, role_id in self.workflow_roles.items(): sub = add_role_element(roles, role_id) if sub is not None: sub.attrib['role_key'] = role_key def make_xml_value(element, value): if isinstance(value, str): element.text = value elif hasattr(value, 'base_filename'): element.attrib['type'] = 'file' ET.SubElement(element, 'filename').text = value.base_filename ET.SubElement(element, 'content_type').text = value.content_type or 'application/octet-stream' ET.SubElement(element, 'content').text = force_str(base64.b64encode(value.get_content())) elif isinstance(value, time.struct_time): element.text = time.strftime('%Y-%m-%d', value) element.attrib['type'] = 'date' elif isinstance(value, bool): element.text = 'true' if value else 'false' element.attrib['type'] = 'bool' elif isinstance(value, int): element.attrib['type'] = 'int' element.text = str(value) elif isinstance(value, float): element.attrib['type'] = 'float' element.text = str(value) elif isinstance(value, decimal.Decimal): element.attrib['type'] = 'decimal' element.text = str(value) elif isinstance(value, (set, tuple, list)): element.attrib['type'] = 'list' for child_value in value: sub_element = ET.SubElement(element, 'item') make_xml_value(sub_element, child_value) elif isinstance(value, dict): element.attrib['type'] = 'dict' for child_key, child_value in value.items(): sub_element = ET.SubElement(element, child_key) make_xml_value(sub_element, child_value) else: assert value is None, 'option variable of unknown type (%s)' % type(value) options = ET.SubElement(root, 'options') for option in sorted(self.workflow_options or []): element = ET.SubElement(options, 'option') element.attrib['varname'] = option option_value = self.workflow_options.get(option) make_xml_value(element, option_value) custom_views_element = ET.SubElement(root, 'custom_views') if hasattr(self, '_custom_views'): # it has just been loaded, it's reexported as part as the overwrite # confirmation dialog, do not get custom views from database. custom_views = self._custom_views else: custom_views = [] for view in get_publisher().custom_view_class.select_shared_for_formdef(formdef=self): custom_views.append(view) for view in custom_views: custom_view_node = view.export_to_xml(include_id=include_id) if custom_view_node is not None: custom_views_element.append(custom_view_node) geolocations = ET.SubElement(root, 'geolocations') for geoloc_key, geoloc_label in (self.geolocations or {}).items(): element = ET.SubElement(geolocations, 'geolocation') element.attrib['key'] = geoloc_key element.text = geoloc_label if self.required_authentication_contexts: element = ET.SubElement(root, 'required_authentication_contexts') for auth_context in self.required_authentication_contexts: ET.SubElement(element, 'method').text = force_str(auth_context) if self.management_sidebar_items: element = ET.SubElement(root, 'management_sidebar_items') for item in sorted(self.management_sidebar_items): ET.SubElement(element, 'item').text = force_str(item) if self.digest_templates: digest_templates = ET.SubElement(root, 'digest_templates') for key, value in self.digest_templates.items(): if not value: continue sub = ET.SubElement(digest_templates, 'template') sub.attrib['key'] = key sub.text = value if include_tests: from .testdef import TestDef testdefs = TestDef.select_for_objectdef(self) if testdefs: elem = ET.SubElement(root, 'testdefs') for testdef in testdefs: elem.append(testdef.export_to_xml()) return root def export_for_application(self): etree = self.export_to_xml(include_id=True, include_tests=True) indent_xml(etree) content = ET.tostring(etree) content_type = 'text/xml' return content, content_type @classmethod def import_from_xml( cls, fd, include_id=False, fix_on_error=False, check_datasources=True, check_deprecated=False ): try: tree = ET.parse(fd) except Exception: raise ValueError() formdef = cls.import_from_xml_tree( tree, include_id=include_id, fix_on_error=fix_on_error, check_datasources=check_datasources, check_deprecated=check_deprecated, ) if formdef.url_name: try: cls.get_on_index(formdef.url_name, 'url_name', ignore_migration=True) except KeyError: pass else: formdef._import_orig_slug = formdef.url_name formdef.url_name = formdef.get_new_url_name() # check if all field id are unique known_field_ids = set() for field in formdef.fields: if field.id in known_field_ids: raise FormdefImportRecoverableError(_('Duplicated field identifiers')) known_field_ids.add(field.id) return formdef @classmethod def import_from_xml_tree( cls, tree, include_id=False, fix_on_error=False, snapshot=False, check_datasources=True, check_deprecated=False, ): from wcs.backoffice.deprecations import DeprecatedElementsDetected, DeprecationsScan from wcs.carddef import CardDef formdef = cls() if tree.find('name') is None or not tree.find('name').text: raise FormdefImportError(_('Missing name')) # if the tree we get is actually a ElementTree for real, we get its # root element and go on happily. if not ET.iselement(tree): tree = tree.getroot() if tree.tag != cls.xml_root_node: raise FormdefImportError( _('Provided XML file is invalid, it starts with a <%(seen)s> tag instead of <%(expected)s>') % {'seen': tree.tag, 'expected': cls.xml_root_node} ) if include_id and tree.attrib.get('id'): formdef.id = tree.attrib.get('id') for text_attribute in list(cls.TEXT_ATTRIBUTES): value = tree.find(text_attribute) if value is None or value.text is None: continue setattr(formdef, text_attribute, xml_node_text(value)) for boolean_attribute in cls.BOOLEAN_ATTRIBUTES: value = tree.find(boolean_attribute) if value is None: continue setattr(formdef, boolean_attribute, value.text == 'true') formdef.fields = [] unknown_field_types = set() unknown_fields_blocks = set() for i, field in enumerate(tree.find('fields')): field_type = field.findtext('type') if field_type == 'block': field_type = 'block:%s' % field.findtext('block_slug') try: field_o = fields.get_field_class_by_type(field_type)() except KeyError: if field_type.startswith('block:'): unknown_fields_blocks.add(field_type.removeprefix('block:')) else: unknown_field_types.add(field_type) continue field_o.init_with_xml(field, include_id=True) if field_type.startswith('block:'): field_o.block_slug = field_type.removeprefix('block:') if fix_on_error or not field_o.id: # this assumes all fields will have id, or none of them field_o.id = str(i + 1) formdef.fields.append(field_o) if tree.find('tracking_code_verify_fields') is not None: formdef.tracking_code_verify_fields = [ xml_node_text(verify_field_id) for verify_field_id in tree.findall('tracking_code_verify_fields/field_id') ] formdef.workflow_options = {} def get_value_from_xml(element): if element.attrib.get('type') == 'int': return int(xml_node_text(element)) elif element.attrib.get('type') == 'float': return float(xml_node_text(element)) elif element.attrib.get('type') == 'decimal': return decimal.Decimal(xml_node_text(element)) if element.attrib.get('type') == 'date': return time.strptime(element.text, '%Y-%m-%d') elif element.attrib.get('type') == 'bool': return bool(element.text == 'true') elif element.attrib.get('type') == 'file' or element.findall('filename'): value = PicklableUpload( orig_filename=xml_node_text(element.find('filename')), content_type=xml_node_text(element.find('content_type')), ) value.receive([base64.decodebytes(force_bytes(xml_node_text(element.find('content'))))]) return value elif element.attrib.get('type') == 'list': return [get_value_from_xml(x) for x in element.findall('item')] elif element.attrib.get('type') == 'dict': return {x.tag: get_value_from_xml(x) for x in element.findall('*')} elif element.text: return xml_node_text(element) for option in tree.findall('options/option'): formdef.workflow_options[option.attrib.get('varname')] = get_value_from_xml(option) formdef._custom_views = [] for view in tree.findall('custom_views/%s' % get_publisher().custom_view_class.xml_root_node): view_o = get_publisher().custom_view_class() view_o.init_with_xml(view, include_id=include_id) formdef._custom_views.append(view_o) cls.category_class.object_category_xml_import(formdef, tree, include_id=include_id) if tree.find('workflow') is not None: from wcs.workflows import Workflow workflow_node = tree.find('workflow') if include_id and workflow_node.attrib.get('workflow_id'): workflow_id = workflow_node.attrib.get('workflow_id') if Workflow.has_key(workflow_id): formdef.workflow_id = workflow_id else: workflow_slug = workflow_node.attrib.get('slug') if workflow_slug: formdef.workflow = Workflow.get_by_slug(workflow_slug) else: workflow = xml_node_text(workflow_node) for w in Workflow.select(ignore_errors=True, ignore_migration=True): if w and w.name == workflow: formdef.workflow_id = w.id break roles_elements = [ ('roles', 'user-roles'), ('backoffice_submission_roles', 'backoffice-submission-roles'), ] for attr_name, node_name in roles_elements: if tree.find(node_name) is None: continue roles_node = tree.find(node_name) roles = [] setattr(formdef, attr_name, roles) for child in roles_node: role_id = get_publisher().role_class.get_role_by_node(child, include_id=include_id) if role_id: roles.append(role_id) if tree.find('roles') is not None: roles_node = tree.find('roles') formdef.workflow_roles = {} for child in roles_node: role_key = child.attrib['role_key'] role_id = get_publisher().role_class.get_role_by_node(child, include_id=include_id) formdef.workflow_roles[role_key] = role_id if tree.find('geolocations') is not None: geolocations_node = tree.find('geolocations') formdef.geolocations = {} for child in geolocations_node: geoloc_key = child.attrib['key'] geoloc_value = xml_node_text(child) formdef.geolocations[geoloc_key] = geoloc_value if tree.find('required_authentication_contexts') is not None: node = tree.find('required_authentication_contexts') formdef.required_authentication_contexts = [] for child in node: formdef.required_authentication_contexts.append(str(child.text)) if tree.find('management_sidebar_items') is not None: node = tree.find('management_sidebar_items') formdef.management_sidebar_items = set() for child in node: formdef.management_sidebar_items.add(str(child.text)) if tree.find('digest_templates') is not None: digest_templates_node = tree.find('digest_templates') formdef.digest_templates = {} for child in digest_templates_node: key = child.attrib['key'] value = xml_node_text(child) formdef.digest_templates[key] = value formdef.xml_testdefs = tree.find('testdefs') unknown_datasources = set() if check_datasources: # check if datasources are defined for field in formdef.fields: data_source = getattr(field, 'data_source', None) if data_source: data_source_id = data_source.get('type') if isinstance(data_sources.get_object(data_source), data_sources.StubNamedDataSource): unknown_datasources.add(data_source_id) elif data_source_id and data_source_id.startswith('carddef:'): parts = data_source_id.split(':') # check if carddef exists url_name = parts[1] if formdef.xml_root_node == 'carddef' and formdef.url_name == url_name: # reference to itself, it's ok continue try: CardDef.get_by_urlname(url_name) except KeyError: unknown_datasources.add(data_source_id) continue if len(parts) == 2 or parts[2] == '_with_user_filter': continue lookup_criterias = [ Equal('formdef_type', 'carddef'), Equal('visibility', 'datasource'), Equal('slug', parts[2]), ] try: get_publisher().custom_view_class.select(lookup_criterias)[0] except IndexError: unknown_datasources.add(data_source_id) if unknown_field_types or unknown_fields_blocks or unknown_datasources: details = collections.defaultdict(set) if unknown_field_types: details[_('Unknown field types')].update(unknown_field_types) if unknown_fields_blocks: details[_('Unknown fields blocks')].update(unknown_fields_blocks) if unknown_datasources: details[_('Unknown datasources')].update(unknown_datasources) raise FormdefImportUnknownReferencedError(_('Unknown referenced objects'), details=details) if check_deprecated: # check for deprecated elements job = DeprecationsScan() try: job.check_deprecated_elements_in_object(formdef) except DeprecatedElementsDetected as e: raise FormdefImportError(str(e)) return formdef def finish_tests_xml_import(self): from .testdef import TestDef for testdef in TestDef.select_for_objectdef(self): TestDef.remove_object(testdef.id) for testdef in self.xml_testdefs or []: obj = TestDef.import_from_xml_tree(testdef, self) obj.store(comment=_('Creation on form import')) del self.xml_testdefs def get_detailed_email_form(self, formdata, url): r = '' if formdata.user_id and formdata.user: r = '%s\n %s\n\n' % (_('User name:'), formdata.user.name) return r + formdata.get_rst_summary(url) def get_user_prefilled_data(self, formdata, field_id): # look up in submitted form for one that would hold the user # email (the one set to be prefilled by user email) if not formdata.data: return None def is_user_field(field): if not getattr(field, 'prefill', None): return False if field.prefill.get('type') != 'user': return False if field.prefill.get('value') != field_id: return False return True # check first in "normal" fields for field in self.fields: if not is_user_field(field): continue v = formdata.data.get(field.id) if v: return v # then check in block fields for field in self.fields: if field.key != 'block': continue for subfield in field.block.fields: if not is_user_field(subfield): continue v = formdata.data.get(field.id) if not (v and v.get('data')): continue for data in v.get('data'): w = data.get(subfield.id) if w: return w def get_submitter_email(self, formdata): users_cfg = get_cfg('users', {}) field_email_id = users_cfg.get('field_email') or 'email' value = self.get_user_prefilled_data(formdata, field_email_id) if value: return value # if nothing was found, get email from user profile if formdata.user and formdata.user.email and formdata.user.is_active: return formdata.user.email return None def get_submitter_phone(self, formdata): users_cfg = get_cfg('users', {}) field_phone_id = users_cfg.get('field_phone') if not field_phone_id: return None value = self.get_user_prefilled_data(formdata, field_phone_id) if value: return value # if nothing was found, get email from user profile if formdata.user and formdata.user.is_active: return formdata.user.get_formatted_phone() return None def get_static_substitution_variables(self, minimal=False): d = { 'form_name': self.name, 'form_slug': self.url_name, 'form_class_name': self.__class__.__name__, # reserved for logged errors } if not minimal: from wcs.variables import LazyFormDef d['form_objects'] = LazyFormDef(self).objects if self.category: d.update(self.category.get_substitution_variables(minimal=minimal)) d.update(self.get_variable_options()) return d def get_substitution_variables(self, minimal=False): from wcs.variables import LazyFormDef from .qommon.substitution import CompatibilityNamesDict return CompatibilityNamesDict({'form': LazyFormDef(self)}) def get_detailed_evolution(self, formdata): if not formdata.evolution: return None details = [] evo = formdata.evolution[-1] if evo.who: evo_who = None if evo.who == '_submitter': if formdata.user_id: evo_who = formdata.user_id else: evo_who = evo.who if evo_who: user_who = get_publisher().user_class.get(evo_who, ignore_errors=True) if user_who: details.append(_('User name')) details.append(' %s' % user_who.name) if evo.status: details.append(_('Status')) details.append(' %s' % formdata.get_status_label()) comment = evo.get_plain_text_comment() if comment: details.append('\n%s\n' % comment) return '\n\n----\n\n' + '\n'.join([str(x) for x in details]) def is_of_concern_for_role_id(self, role_id): if not self.workflow_roles: return False return role_id in self.workflow_roles.values() def is_of_concern_for_user(self, user, formdata=None): if not self.workflow_roles: self.workflow_roles = {} user_roles = set(user.get_roles()) # if the formdef itself has some function attributed to the user, grant # access. for role_id in self.workflow_roles.values(): if role_id in user_roles: return True # if there was some redispatching of function, values will be different # in formdata, check them. if formdata and formdata.workflow_roles: for role_id in formdata.workflow_roles.values(): if role_id is None: continue if isinstance(role_id, list): role_ids = set(role_id) else: role_ids = {role_id} if user_roles.intersection(role_ids): return True # if no formdata was given, lookup if there are some existing formdata # where the user has access. if not formdata: data_class = self.data_class() for role_id in user.get_roles(): if data_class.get_ids_with_indexed_value('workflow_roles', role_id): return True return False def is_user_allowed_read(self, user, formdata=None): if not user: if formdata and get_session() and get_session().is_anonymous_submitter(formdata): return True return False if user.is_admin: return True user_roles = set(user.get_roles()) user_roles.add(logged_users_role().id) def ensure_role_are_strings(roles): # makes sure all roles are defined as strings, as different origins # (formdef, user, workflow status...) may define them differently. return {str(x) for x in roles if x} user_roles = ensure_role_are_strings(user_roles) if self.user_allowed_to_access_own_data and formdata and formdata.is_submitter(user): return True if self.is_of_concern_for_user(user): if not formdata: return True if formdata: # current status concerned_roles = ensure_role_are_strings(formdata.get_concerned_roles()) if '_submitter' in concerned_roles and formdata.is_submitter(user): return True if user_roles.intersection(concerned_roles): return True return False def is_user_allowed_read_status_and_history(self, user, formdata=None): if user and user.is_admin: return True if not self.workflow_roles: self.workflow_roles = {} form_roles = [x for x in self.workflow_roles.values() if x] if formdata and formdata.workflow_roles: for x in formdata.workflow_roles.values(): if isinstance(x, list): form_roles.extend(x) elif x: form_roles.append(x) return self.is_user_allowed_read(user, formdata=formdata) @property def publication_datetime(self): try: return get_as_datetime(self.publication_date) except (TypeError, ValueError): return None @property def expiration_datetime(self): try: return get_as_datetime(self.expiration_date) except (TypeError, ValueError): return None def is_disabled(self): if self.disabled: return True if self.publication_datetime and self.publication_datetime > datetime.datetime.now(): return True if self.expiration_datetime and self.expiration_datetime < datetime.datetime.now(): return True return False class _EmptyClass: # helper for instance creation without calling __init__ pass def __copy__(self, memo=None, deepcopy=False): formdef_copy = self._EmptyClass() formdef_copy.__class__ = self.__class__ if deepcopy: formdef_copy.__dict__ = copy.deepcopy(self.__dict__, memo=memo) else: formdef_copy.__dict__ = copy.copy(self.__dict__) return formdef_copy def __deepcopy__(self, memo=None): return self.__copy__(memo=memo, deepcopy=True) # don't pickle computed attributes def __getstate__(self): odict = copy.copy(self.__dict__) if '_workflow' in odict: del odict['_workflow'] if '_start_page' in odict: del odict['_start_page'] if self.lightweight and 'fields' in odict: # will be stored independently del odict['fields'] if '_custom_views' in odict: del odict['_custom_views'] if '_import_orig_slug' in odict: del odict['_import_orig_slug'] if '_onload_category_id' in odict: del odict['_onload_category_id'] return odict def __setstate__(self, dict): super().__setstate__(dict) self.__dict__ = dict self._workflow = None self._start_page = None if hasattr(self, 'snapshot_object'): # don't restore snapshot object that would have been stored erroneously delattr(self, 'snapshot_object') @classmethod def storage_load(cls, fd, **kwargs): o = super().storage_load(fd) o._onload_category_id = o.category_id # keep track of category, to update wcs_all_forms if changed if kwargs.get('lightweight'): o.fields = Ellipsis return o if cls.lightweight: try: o.fields = pickle.load(fd, **PICKLE_KWARGS) except EOFError: pass # old format for field in o.fields or []: field._formdef = o # keep formdef reference return o @classmethod def storage_dumps(cls, object): if getattr(object, 'fields', None) is Ellipsis: raise RuntimeError('storing a lightweight object is not allowed') # use two separate pickle chunks to store the formdef, the first field # is everything but fields (excluded via __getstate__) while the second # chunk contains the fields. return pickle.dumps(object, protocol=2) + pickle.dumps(object.fields, protocol=2) def change_workflow(self, new_workflow, status_mapping=None, user_id=None): old_workflow = self.get_workflow() formdata_count = self.data_class().count() status_changes = False if formdata_count: assert status_mapping, 'status mapping is required if there are formdatas' mapping = {'draft': 'draft'} if '_all' in status_mapping: # remapping everything to same status for formdata in self.data_class().select_iterator( [NotEqual('status', 'draft')], ignore_errors=True, itersize=200 ): mapping[formdata.status] = 'wf-%s' % status_mapping['_all'] else: assert all( status.id in status_mapping for status in old_workflow.possible_status ), 'a status was not mapped' for old_status, new_status in status_mapping.items(): mapping['wf-%s' % old_status] = 'wf-%s' % new_status status_changes = any(x[0] != x[1] for x in mapping.items()) if status_changes: # if there are status changes, update all formdatas (except drafts) from . import sql sql.formdef_remap_statuses(self, mapping) self.workflow = new_workflow if new_workflow.has_action('geolocate') and not self.geolocations: self.geolocations = {'base': str(_('Geolocation'))} removed_functions = set() for function_key in list((self.workflow_roles or {}).keys()): if function_key not in new_workflow.roles: del self.workflow_roles[function_key] removed_functions.add(function_key) self.store(comment=_('Workflow change'), snapshot_store_user=user_id) if formdata_count: # instruct formdef to update its security rules self.data_class().rebuild_security() if removed_functions or status_changes: # status changes require to update jump markers and change in functions # requires to update all formdatas to remove old keys mapping_without_prefix = {x.removeprefix('wf-'): y for x, y in mapping.items()} for formdata in self.data_class().select_iterator(ignore_errors=True, itersize=200): changed = False for function_key in removed_functions: if function_key in (formdata.workflow_roles or {}): del formdata.workflow_roles[function_key] changed = True if ( status_changes and formdata.workflow_data and '_markers_stack' in formdata.workflow_data ): current_markers_stack = formdata.workflow_data['_markers_stack'] formdata.workflow_data['_markers_stack'] = [ {'status_id': mapping_without_prefix.get(x['status_id'])} for x in current_markers_stack ] if formdata.workflow_data['_markers_stack'] != current_markers_stack: changed = True if changed: formdata.store() def i18n_scan(self): location = '%s/%s/' % (self.backoffice_section, self.id) yield location, None, self.name yield location, None, self.description for field in self.fields or []: yield from field.i18n_scan(base_location=location + 'fields/') EmailsDirectory.register( 'new_user', _('Notification of creation to user'), enabled=False, category=_('Workflow'), default_subject=_('New form ({{ form_name }})'), default_body=_( '''\ Hello, This mail is a reminder about the form you just submitted. {% if form_user %} You can consult it with this link: {{ form_url }} {% endif %} {% if form_details %} For reference, here are the details: {{ form_details }} {% endif %} ''' ), ) EmailsDirectory.register( 'change_user', _('Notification of change to user'), category=_('Workflow'), default_subject=_('Form status change ({{ form_name }})'), default_body=_( '''\ Hello, {% if form_status_changed %} Status of the form you submitted just changed (from "{{ form_previous_status }}" to "{{ form_status }}"). {% endif %} {% if form_user %} You can consult it with this link: {{ form_url }} {% endif %} {% if form_comment %}New comment: {{ form_comment }}{% endif %} {% if form_evolution %} {{ form_evolution }} {% endif %} ''' ), ) EmailsDirectory.register( 'new_receiver', _('Notification of creation to receiver'), enabled=False, category=_('Workflow'), default_subject=_('New form ({{ form_name }})'), default_body=_( '''\ Hello, A new form has been submitted, you can see it with this link: {{ form_url_backoffice }} {% if form_details %} For reference, here are the details: {{ form_details }} {% endif %} ''' ), ) EmailsDirectory.register( 'change_receiver', _('Notification of change to receiver'), category=_('Workflow'), default_subject=_('Form status change ({{ form_name }})'), default_body=_( '''\ Hello, A form just changed, you can consult it with this link: {{ form_url_backoffice }} {% if form_status_changed %} Status of the form just changed (from "{{ form_previous_status }}" to "{{ form_status }}"). {% endif %} {% if form_comment %}New comment: {{ form_comment }}{% endif %} {% if form_evolution %} {{ form_evolution }} {% endif %} ''' ), ) Substitutions.register('form_name', category=_('Form'), comment=_('Form Name')) def clean_drafts(publisher, **kwargs): import wcs.qommon.storage as st from wcs.carddef import CardDef job = kwargs.pop('job', None) for formdef in FormDef.select() + CardDef.select(): with job.log_long_job( '%s %s' % (formdef.xml_root_node, formdef.url_name) ) if job else contextlib.ExitStack(): removal_date = datetime.date.today() - datetime.timedelta(days=formdef.get_drafts_lifespan()) for formdata in formdef.data_class().select( [st.Equal('status', 'draft'), st.Less('receipt_time', removal_date.timetuple())] ): formdata.remove_self() def clean_unused_files(publisher, **kwargs): unused_files_behaviour = publisher.get_site_option('unused-files-behaviour') if unused_files_behaviour not in ('move', 'remove'): return known_filenames = set() known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'uploads/*'))]) known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'attachments/*/*'))]) def accumulate_filenames(): from wcs.applications import Application from wcs.carddef import CardDef for formdef in FormDef.select(ignore_migration=True) + CardDef.select(ignore_migration=True): for option_data in (formdef.workflow_options or {}).values(): if is_upload(option_data): yield option_data.get_fs_filename() for formdata in formdef.data_class().select_iterator(ignore_errors=True, itersize=200): for field_data in formdata.get_all_file_data(with_history=True): if is_upload(field_data): yield field_data.get_fs_filename() elif is_attachment(field_data): yield field_data.filename for user in publisher.user_class.select(): for field_data in (user.form_data or {}).values(): if is_upload(field_data): yield field_data.get_fs_filename() for application in Application.select(): if is_upload(application.icon): yield application.icon.get_fs_filename() used_filenames = set() for filename in accumulate_filenames(): if not filename: # alternative storage continue if not os.path.isabs(filename): filename = os.path.join(publisher.app_dir, filename) used_filenames.add(filename) unused_filenames = known_filenames - used_filenames for filename in unused_filenames: try: if unused_files_behaviour == 'move': new_filename = os.path.join( publisher.app_dir, 'unused-files', filename[len(publisher.app_dir) + 1 :] ) if os.path.exists(new_filename): os.unlink(filename) else: new_dirname = os.path.dirname(new_filename) if not os.path.exists(new_dirname): os.makedirs(new_dirname) os.rename(filename, new_filename) else: os.unlink(filename) except OSError: pass def update_storage_all_formdefs(publisher, **kwargs): from wcs.carddef import CardDef for formdef in itertools.chain(FormDef.select(), CardDef.select()): formdef.update_storage() if formdef.sql_integrity_errors: # print errors, this will get them in the cron output, that hopefully # a sysadmin will read. print(f'! Integrity errors in {formdef.get_admin_url()}') def get_formdefs_of_all_kinds(**kwargs): from wcs.admin.settings import UserFieldsFormDef from wcs.blocks import BlockDef from wcs.carddef import CardDef from wcs.wf.form import FormWorkflowStatusItem from wcs.workflows import Workflow select_kwargs = { 'ignore_errors': True, 'ignore_migration': True, } select_kwargs.update(kwargs) formdefs = [UserFieldsFormDef()] formdefs += FormDef.select(**select_kwargs) formdefs += BlockDef.select(**select_kwargs) formdefs += CardDef.select(**select_kwargs) for workflow in Workflow.select(**select_kwargs): for status in workflow.possible_status: for item in status.items: if isinstance(item, FormWorkflowStatusItem) and item.formdef: formdefs.append(item.formdef) if workflow.variables_formdef: formdefs.append(workflow.variables_formdef) if workflow.backoffice_fields_formdef: formdefs.append(workflow.backoffice_fields_formdef) return formdefs def register_cronjobs(): # every night: # * update storage of all formdefs get_publisher_class().register_cronjob( CronJob(update_storage_all_formdefs, name='update_storage', hours=[2], minutes=[0]) ) # * and look for: # * expired drafts get_publisher_class().register_cronjob(CronJob(clean_drafts, name='clean_drafts', hours=[2], minutes=[0])) # * and unused files get_publisher_class().register_cronjob( CronJob(clean_unused_files, name='clean_unused_files', hours=[2], minutes=[0]) ) class UpdateDigestAfterJob(AfterJob): label = _('Updating digests') def __init__(self, formdefs): super().__init__(formdefs=[(x.__class__, x.id) for x in formdefs if x.id]) def execute(self): for formdef_class, formdef_id in self.kwargs['formdefs']: formdef = formdef_class.get(formdef_id) for formdata in formdef.data_class().select_iterator(order_by='id', itersize=200): formdata.store() class UpdateStatisticsDataAfterJob(UpdateDigestAfterJob): label = _('Updating statistics data')