2257 lines
85 KiB
Python
2257 lines
85 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import collections
|
|
import contextlib
|
|
import copy
|
|
import datetime
|
|
import glob
|
|
import itertools
|
|
import json
|
|
import os
|
|
import pickle
|
|
import sys
|
|
import time
|
|
import types
|
|
import uuid
|
|
import xml.etree.ElementTree as ET
|
|
from operator import itemgetter
|
|
|
|
from django.utils.encoding import force_bytes, force_str
|
|
from quixote import get_publisher, get_session
|
|
from quixote.html import TemplateIO, htmltext
|
|
|
|
from . import data_sources, fields
|
|
from .categories import Category
|
|
from .qommon import PICKLE_KWARGS, _, get_cfg, ngettext, pgettext_lazy
|
|
from .qommon.admin.emails import EmailsDirectory
|
|
from .qommon.afterjobs import AfterJob
|
|
from .qommon.cron import CronJob
|
|
from .qommon.errors import UnknownReferencedErrorMixin
|
|
from .qommon.form import Form, HtmlWidget
|
|
from .qommon.misc import (
|
|
JSONEncoder,
|
|
get_as_datetime,
|
|
get_dependencies_from_template,
|
|
is_attachment,
|
|
is_upload,
|
|
simplify,
|
|
xml_node_text,
|
|
)
|
|
from .qommon.publisher import get_publisher_class
|
|
from .qommon.storage import Equal, NotEqual, StorableObject, classonlymethod, fix_key
|
|
from .qommon.substitution import Substitutions
|
|
from .qommon.template import Template
|
|
from .qommon.upload_storage import PicklableUpload
|
|
from .roles import logged_users_role
|
|
|
|
DRAFTS_DEFAULT_LIFESPAN = 100 # days
|
|
|
|
if not hasattr(types, 'ClassType'):
|
|
types.ClassType = type
|
|
|
|
|
|
class FormdefImportError(Exception):
|
|
def __init__(self, msg, msg_args=None, details=None):
|
|
self.msg = msg
|
|
self.msg_args = msg_args or ()
|
|
self.details = details
|
|
|
|
|
|
class FormdefImportUnknownReferencedError(UnknownReferencedErrorMixin, FormdefImportError):
|
|
pass
|
|
|
|
|
|
class FormdefImportRecoverableError(FormdefImportError):
|
|
pass
|
|
|
|
|
|
class FormDefDoesNotExist(AttributeError):
|
|
error_message = _('No such form: %s')
|
|
|
|
def get_error_message(self):
|
|
return self.error_message % self
|
|
|
|
|
|
class FormField:
|
|
# only used to unpickle form fields from older (<200603) versions
|
|
def __setstate__(self, dict):
|
|
type = dict['type']
|
|
self.real_field = fields.get_field_class_by_type(type)(**dict)
|
|
|
|
|
|
def lax_int(s):
|
|
try:
|
|
return int(s)
|
|
except (ValueError, TypeError):
|
|
return -1
|
|
|
|
|
|
class FormDefForm(Form):
|
|
ERROR_NOTICE = _('There were errors processing the form and you cannot go to the next page.')
|
|
|
|
def __init__(self):
|
|
super().__init__(enctype='multipart/form-data', use_tokens=False)
|
|
|
|
def _render_error_notice_content(self, errors):
|
|
t = TemplateIO(html=True)
|
|
t += super()._render_error_notice_content(errors)
|
|
widget_with_errors = []
|
|
for widget in self.get_all_widgets():
|
|
if hasattr(widget, 'field') and widget.has_error() and not getattr(widget, 'is_hidden', False):
|
|
widget_with_errors.append(widget)
|
|
if widget_with_errors:
|
|
t += htmltext('<p>')
|
|
t += str(
|
|
ngettext(
|
|
'The following field has an error:',
|
|
'The following fields have an error:',
|
|
len(widget_with_errors),
|
|
)
|
|
)
|
|
t += ' '
|
|
for i, widget in enumerate(widget_with_errors):
|
|
t += htmltext('<a href="#form_label_%s">%s</a>') % (widget.get_name_for_id(), widget.title)
|
|
if i < len(widget_with_errors) - 1:
|
|
t += str(_(', '))
|
|
t += htmltext('</p>')
|
|
return t.getvalue()
|
|
|
|
|
|
class FormDef(StorableObject):
|
|
# noqa pylint: disable=too-many-public-methods
|
|
_names = 'formdefs'
|
|
_indexes = ['url_name']
|
|
_hashed_indexes = ['backoffice_submission_roles']
|
|
backoffice_class = 'wcs.admin.forms.FormDefPage'
|
|
data_sql_prefix = 'formdata'
|
|
pickle_module_name = 'formdef'
|
|
xml_root_node = 'formdef'
|
|
backoffice_section = 'forms'
|
|
verbose_name = _('Form')
|
|
verbose_name_plural = _('Forms')
|
|
item_name = pgettext_lazy('item', 'form')
|
|
item_name_plural = pgettext_lazy('item', 'forms')
|
|
|
|
name = None
|
|
description = None
|
|
keywords = None
|
|
url_name = None
|
|
table_name = None # for SQL only
|
|
fields = None
|
|
category_id = None
|
|
workflow_id = None
|
|
workflow_options = None
|
|
workflow_roles = None
|
|
roles = None
|
|
required_authentication_contexts = None
|
|
backoffice_submission_roles = None
|
|
discussion = False
|
|
confirmation = True
|
|
detailed_emails = True
|
|
disabled = False
|
|
only_allow_one = False
|
|
enable_tracking_codes = False
|
|
tracking_code_verify_fields = None
|
|
disabled_redirection = None
|
|
always_advertise = False
|
|
publication_date = None
|
|
expiration_date = None
|
|
has_captcha = False
|
|
skip_from_360_view = False
|
|
include_download_all_button = False
|
|
appearance_keywords = None
|
|
digest_templates = None
|
|
lateral_template = None
|
|
submission_lateral_template = None
|
|
id_template = None
|
|
drafts_lifespan = None
|
|
user_support = None
|
|
|
|
geolocations = None
|
|
|
|
# store reverse relations
|
|
reverse_relations = None
|
|
|
|
# store fields in a separate pickle chunk
|
|
lightweight = True
|
|
|
|
# prefixes for formdata variables
|
|
var_prefixes = ['form']
|
|
|
|
# users are allowed to access formdata where they're submitter.
|
|
user_allowed_to_access_own_data = True
|
|
|
|
# declarations for serialization
|
|
TEXT_ATTRIBUTES = [
|
|
'name',
|
|
'url_name',
|
|
'description',
|
|
'keywords',
|
|
'publication_date',
|
|
'expiration_date',
|
|
'disabled_redirection',
|
|
'appearance_keywords',
|
|
'lateral_template',
|
|
'submission_lateral_template',
|
|
'id_template',
|
|
'drafts_lifespan',
|
|
'user_support',
|
|
]
|
|
BOOLEAN_ATTRIBUTES = [
|
|
'discussion',
|
|
'detailed_emails',
|
|
'disabled',
|
|
'only_allow_one',
|
|
'enable_tracking_codes',
|
|
'confirmation',
|
|
'always_advertise',
|
|
'include_download_all_button',
|
|
'has_captcha',
|
|
'skip_from_360_view',
|
|
]
|
|
|
|
category_class = Category
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.fields = []
|
|
|
|
def __eq__(self, other):
|
|
return bool(
|
|
isinstance(other, FormDef) and self.xml_root_node == other.xml_root_node and self.id == other.id
|
|
)
|
|
|
|
def migrate(self):
|
|
changed = False
|
|
|
|
if self.__dict__.get('fields') is Ellipsis:
|
|
# don't run migration on lightweight objects
|
|
return
|
|
|
|
if isinstance(self.category_id, int):
|
|
self.category_id = str(self.category_id)
|
|
changed = True
|
|
|
|
if isinstance(self.workflow_id, int):
|
|
self.workflow_id = str(self.workflow_id)
|
|
changed = True
|
|
|
|
if self.roles:
|
|
for role in self.roles:
|
|
if isinstance(role, int):
|
|
self.roles = [str(x) for x in self.roles]
|
|
changed = True
|
|
break
|
|
|
|
if self.workflow_roles:
|
|
workflow_roles_list = self.workflow_roles.items()
|
|
for role_id in self.workflow_roles.values():
|
|
if isinstance(role_id, int):
|
|
self.workflow_roles = {x: str(y) for x, y in workflow_roles_list}
|
|
changed = True
|
|
break
|
|
|
|
for f in self.fields or []:
|
|
changed |= f.migrate()
|
|
|
|
if changed:
|
|
self.store(comment=_('Automatic update'), snapshot_store_user=False)
|
|
|
|
@classmethod
|
|
def remove_object(cls, id):
|
|
super().remove_object(id)
|
|
from . import sql
|
|
|
|
sql.SearchableFormDef.update(removed_obj_type=cls.xml_root_node, removed_obj_id=id)
|
|
if cls == FormDef:
|
|
# recreate global views so they don't reference formdata from
|
|
# deleted formefs
|
|
conn, cur = sql.get_connection_and_cursor()
|
|
with sql.atomic():
|
|
sql.clean_global_views(conn, cur)
|
|
cur.close()
|
|
|
|
@property
|
|
def data_class_name(self):
|
|
return '_wcs_%s' % self.url_name.title()
|
|
|
|
def data_class(self, mode=None):
|
|
if 'formdef' not in sys.modules:
|
|
sys.modules['formdef'] = sys.modules[__name__]
|
|
if hasattr(sys.modules['formdef'], self.data_class_name):
|
|
data_class = getattr(sys.modules['formdef'], self.data_class_name)
|
|
# only use existing data class if it has a reference to this actual
|
|
# formdef
|
|
if data_class._formdef is self:
|
|
return data_class
|
|
|
|
from . import sql
|
|
|
|
table_name = sql.get_formdef_table_name(self)
|
|
cls = types.ClassType(
|
|
self.data_class_name, (sql.SqlFormData,), {'_formdef': self, '_table_name': table_name}
|
|
)
|
|
setattr(sys.modules['formdef'], self.data_class_name, cls)
|
|
setattr(sys.modules['wcs.formdef'], self.data_class_name, cls)
|
|
|
|
return cls
|
|
|
|
def get_new_field_id(self):
|
|
return str(uuid.uuid4())
|
|
|
|
def get_new_url_name(self):
|
|
new_url_name = simplify(self.name)[:250]
|
|
base_new_url_name = new_url_name
|
|
suffix_no = 0
|
|
while True:
|
|
try:
|
|
obj = self.get_on_index(new_url_name, 'url_name', ignore_migration=True)
|
|
except KeyError:
|
|
break
|
|
if obj.id == self.id:
|
|
break
|
|
suffix_no += 1
|
|
new_url_name = '%s-%s' % (base_new_url_name, suffix_no)
|
|
return new_url_name
|
|
|
|
@classmethod
|
|
def get_new_id(cls, create=False):
|
|
id = super().get_new_id(create=False)
|
|
id = cls.get_sql_new_id(id_start=int(id))
|
|
if create:
|
|
objects_dir = cls.get_objects_dir()
|
|
object_filename = os.path.join(objects_dir, fix_key(id))
|
|
try:
|
|
fd = os.open(object_filename, os.O_CREAT | os.O_EXCL)
|
|
except OSError:
|
|
return cls.get_new_id(create=True)
|
|
os.close(fd)
|
|
return str(id)
|
|
|
|
@classmethod
|
|
def get_sql_new_id(cls, id_start):
|
|
from . import sql
|
|
|
|
return sql.get_formdef_new_id(id_start=id_start)
|
|
|
|
def get_order_by(self, order_by):
|
|
if not order_by:
|
|
return order_by
|
|
direction = ''
|
|
if order_by.startswith('-'):
|
|
order_by = order_by[1:]
|
|
direction = '-'
|
|
for field in self.iter_fields(include_block_fields=True):
|
|
if getattr(field, 'block_field', None):
|
|
if field.key == 'items':
|
|
# not yet
|
|
continue
|
|
if order_by not in [field.contextual_varname, 'f%s' % field.contextual_id]:
|
|
continue
|
|
if field.contextual_varname == order_by:
|
|
order_by = 'f%s' % field.contextual_id
|
|
if getattr(field, 'block_field', None) and 'f%s' % field.contextual_id == order_by:
|
|
# field of block field, sort on the first element
|
|
order_by = "f%s->'data'->0->>'%s%s'" % (
|
|
field.block_field.id,
|
|
field.id,
|
|
'_display' if field.store_display_value else '',
|
|
)
|
|
elif field.store_display_value:
|
|
order_by += '_display'
|
|
break
|
|
if order_by == 'digest':
|
|
order_by = "digests->>'default'"
|
|
order_by = '%s%s' % (direction, order_by)
|
|
if order_by == 'criticality_level':
|
|
order_by = [order_by, 'receipt_time']
|
|
elif order_by == '-criticality_level':
|
|
order_by = [order_by, '-receipt_time']
|
|
return order_by
|
|
|
|
def has_admin_access(self, user):
|
|
# return True if user 1/ is global administrator for this type of object, or
|
|
# 2/ has one of the management roles defined in its category.
|
|
if get_publisher().get_backoffice_root().is_global_accessible(self.backoffice_section):
|
|
return True
|
|
if not user:
|
|
return False
|
|
if not self.category_id:
|
|
return False
|
|
management_roles = {x.id for x in getattr(self.category, 'management_roles') or []}
|
|
user_roles = set(user.get_roles())
|
|
return management_roles.intersection(user_roles)
|
|
|
|
@classonlymethod
|
|
def wipe(cls):
|
|
super().wipe()
|
|
cls.sql_wipe()
|
|
|
|
@classmethod
|
|
def sql_wipe(cls):
|
|
from . import sql
|
|
|
|
sql.formdef_wipe()
|
|
|
|
def store(self, comment=None, snapshot_store_user=True, application=None, *args, **kwargs):
|
|
assert not self.is_readonly()
|
|
if self.url_name is None:
|
|
# set url name if it's not yet there
|
|
self.url_name = self.get_new_url_name()
|
|
|
|
object_only = kwargs.pop('object_only', False)
|
|
|
|
if not object_only:
|
|
self.update_relations()
|
|
|
|
StorableObject.store(self, *args, **kwargs)
|
|
|
|
if object_only:
|
|
return
|
|
if get_publisher().snapshot_class:
|
|
get_publisher().snapshot_class.snap(
|
|
instance=self, comment=comment, store_user=snapshot_store_user, application=application
|
|
)
|
|
|
|
if get_publisher().has_postgresql_config():
|
|
self.update_storage()
|
|
self.store_related_custom_views()
|
|
self.update_searchable_formdefs_table()
|
|
|
|
def has_captcha_enabled(self):
|
|
return self.has_captcha and get_publisher().has_site_option('formdef-captcha-option')
|
|
|
|
def update_storage(self):
|
|
from . import sql
|
|
|
|
actions = sql.do_formdef_tables(self, rebuild_views=True, rebuild_global_views=True)
|
|
if actions:
|
|
cls = self.data_class()
|
|
for action in actions:
|
|
getattr(cls, action)()
|
|
|
|
def update_searchable_formdefs_table(self):
|
|
from . import sql
|
|
|
|
sql.SearchableFormDef.update(obj=self)
|
|
|
|
def update_relations(self):
|
|
from wcs.carddef import CardDef
|
|
|
|
self_ref = '%s:%s' % (self.xml_root_node, self.url_name)
|
|
self_relations_by_ref = self.build_relations_by_ref()
|
|
reverse_relations = []
|
|
|
|
# cross each formdef and cardef and check relations
|
|
for objdef in itertools.chain(
|
|
FormDef.select(ignore_errors=True, ignore_migration=True),
|
|
CardDef.select(ignore_errors=True, ignore_migration=True),
|
|
):
|
|
objdef_ref = '%s:%s' % (objdef.xml_root_node, objdef.url_name)
|
|
if objdef.xml_root_node == self.xml_root_node and objdef.id == self.id:
|
|
# don't build relations twice
|
|
objdef_relations_by_ref = self_relations_by_ref
|
|
else:
|
|
objdef_relations_by_ref = objdef.build_relations_by_ref()
|
|
reverse_relations += objdef_relations_by_ref.get(self_ref, [])
|
|
|
|
old_objdef_reverse_relations = copy.deepcopy(objdef.reverse_relations)
|
|
# remove relations with self in objdef's reverse_relations
|
|
new_objdef_reverse_relations = [
|
|
r for r in (objdef.reverse_relations or []) if r['obj'] != self_ref
|
|
]
|
|
# and update objdef's reverse_relations from self_relations_by_ref
|
|
new_objdef_reverse_relations += self_relations_by_ref.get(objdef_ref, [])
|
|
# sort objectdef's reverse_relations
|
|
new_objdef_reverse_relations = sorted(
|
|
new_objdef_reverse_relations, key=itemgetter('obj', 'varname', 'type')
|
|
)
|
|
if old_objdef_reverse_relations != new_objdef_reverse_relations:
|
|
objdef.reverse_relations = new_objdef_reverse_relations
|
|
objdef.store(object_only=True)
|
|
# sort self's reverse_relations and set
|
|
self.reverse_relations = sorted(reverse_relations, key=itemgetter('obj', 'varname', 'type'))
|
|
|
|
def build_relations_by_ref(self):
|
|
# build relations to other carddefs, to be stored in some object reverse field
|
|
self_ref = '%s:%s' % (self.xml_root_node, self.url_name)
|
|
relations_by_ref = collections.defaultdict(list)
|
|
|
|
def _check_field(field):
|
|
data_source = getattr(field, 'data_source', None)
|
|
if not data_source or not data_source.get('type', '').startswith('carddef:'):
|
|
return
|
|
# reverse relation of data_source['type'] to this object
|
|
obj_ref = ':'.join(data_source['type'].split(':')[:2]) # remove possible custom-view
|
|
relations_by_ref[obj_ref].append(
|
|
{
|
|
'varname': field.contextual_varname or '',
|
|
'type': field.key,
|
|
'obj': self_ref,
|
|
}
|
|
)
|
|
|
|
for field in self.iter_fields(include_block_fields=True):
|
|
if field.key in ['item', 'items', 'computed']:
|
|
_check_field(field)
|
|
|
|
# remove duplicated items
|
|
return {
|
|
k: list(map(dict, {tuple(sorted(d.items())) for d in v})) for k, v in relations_by_ref.items()
|
|
}
|
|
|
|
def store_related_custom_views(self):
|
|
for view in getattr(self, '_custom_views', []):
|
|
if not view.id:
|
|
existing_views = get_publisher().custom_view_class.select(
|
|
[
|
|
Equal('formdef_type', self.xml_root_node),
|
|
Equal('formdef_id', str(self.id)),
|
|
Equal('visibility', view.visibility),
|
|
Equal('slug', view.slug),
|
|
]
|
|
)
|
|
if existing_views:
|
|
view.id = existing_views[0].id
|
|
view.formdef = self
|
|
view.store()
|
|
|
|
def get_all_fields(self):
|
|
return (self.fields or []) + self.workflow.get_backoffice_fields()
|
|
|
|
def iter_fields(self, include_block_fields=False, with_backoffice_fields=True):
|
|
def _iter_fields(fields, block_field=None):
|
|
for field in fields:
|
|
# add contextual_id/contextual_varname attributes
|
|
# they are id/varname for normal fields
|
|
# but in case of blocks they are concatenation of block id/varname + field id/varname
|
|
field.contextual_id = field.id
|
|
field.contextual_varname = None
|
|
if block_field:
|
|
field.block_field = block_field
|
|
field.contextual_id = '%s-%s' % (field.block_field.id, field.id)
|
|
if field.varname and field.block_field.varname:
|
|
field.contextual_varname = '%s_%s' % (
|
|
field.block_field.varname,
|
|
field.varname,
|
|
)
|
|
else:
|
|
field.contextual_varname = field.varname
|
|
yield field
|
|
if field.key == 'block' and include_block_fields:
|
|
try:
|
|
field.block # load block
|
|
except KeyError:
|
|
# blockdef not found
|
|
continue
|
|
yield from _iter_fields(field.block.fields, block_field=field)
|
|
field._block = None # reset cache
|
|
|
|
if with_backoffice_fields:
|
|
fields = self.get_all_fields()
|
|
else:
|
|
fields = self.fields or []
|
|
yield from _iter_fields(fields)
|
|
|
|
def get_widget_fields(self):
|
|
return [field for field in self.fields or [] if isinstance(field, fields.WidgetField)]
|
|
|
|
@property
|
|
def default_digest_template(self):
|
|
return (self.digest_templates or {}).get('default')
|
|
|
|
def get_category(self):
|
|
if self.category_id:
|
|
try:
|
|
return self.category_class.get(self.category_id)
|
|
except KeyError:
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
def set_category(self, category):
|
|
if category:
|
|
self.category_id = category.id
|
|
elif self.category_id:
|
|
self.category_id = None
|
|
|
|
category = property(get_category, set_category)
|
|
|
|
def get_drafts_lifespan(self):
|
|
return int(self.drafts_lifespan or DRAFTS_DEFAULT_LIFESPAN)
|
|
|
|
_workflow = None
|
|
|
|
def get_workflow(self):
|
|
if self._workflow and self._workflow.id == self.workflow_id:
|
|
return self._workflow
|
|
from wcs.workflows import Workflow
|
|
|
|
if self.workflow_id:
|
|
try:
|
|
workflow = Workflow.get(self.workflow_id)
|
|
except KeyError:
|
|
return Workflow.get_unknown_workflow()
|
|
self._workflow = self.get_workflow_with_options(workflow)
|
|
return self._workflow
|
|
else:
|
|
self._workflow = self.get_default_workflow()
|
|
return self._workflow
|
|
|
|
@classmethod
|
|
def get_default_workflow(cls):
|
|
from wcs.workflows import Workflow
|
|
|
|
return Workflow.get_default_workflow()
|
|
|
|
def get_workflow_with_options(self, workflow):
|
|
# this needs to be kept in sync with admin/forms.ptl,
|
|
# FormDefPage::workflow
|
|
if not self.workflow_options:
|
|
return workflow
|
|
for status in workflow.possible_status:
|
|
for item in status.items:
|
|
prefix = '%s*%s*' % (status.id, item.id)
|
|
for parameter in item.get_parameters():
|
|
value = self.workflow_options.get(prefix + parameter)
|
|
if value:
|
|
setattr(item, parameter, value)
|
|
return workflow
|
|
|
|
def set_workflow(self, workflow):
|
|
if workflow and workflow.id not in ['_carddef_default', '_default']:
|
|
self.workflow_id = workflow.id
|
|
self._workflow = workflow
|
|
elif self.workflow_id:
|
|
self.workflow_id = None
|
|
self._workflow = None
|
|
|
|
workflow = property(get_workflow, set_workflow)
|
|
|
|
def get_dependencies(self):
|
|
yield self.category
|
|
if self.workflow_id and self.workflow.id not in ['_carddef_default', '_default']:
|
|
yield self.workflow
|
|
for field in self.fields or []:
|
|
yield from field.get_dependencies()
|
|
role_class = get_publisher().role_class
|
|
for role_id in itertools.chain(self.roles or [], self.backoffice_submission_roles or []):
|
|
yield role_class.get(role_id, ignore_errors=True)
|
|
for role_id in (self.workflow_roles or {}).values():
|
|
yield role_class.get(role_id, ignore_errors=True)
|
|
for view in get_publisher().custom_view_class.select_shared_for_formdef(formdef=self):
|
|
yield from view.get_dependencies()
|
|
for template in list((self.digest_templates or {}).values()) + [
|
|
self.lateral_template,
|
|
self.submission_lateral_template,
|
|
]:
|
|
yield from get_dependencies_from_template(template)
|
|
|
|
@property
|
|
def keywords_list(self):
|
|
if not self.keywords:
|
|
return []
|
|
return [x.strip() for x in self.keywords.split(',')]
|
|
|
|
@property
|
|
def appearance_keywords_list(self):
|
|
if not get_publisher().has_site_option('formdef-appearance-keywords'):
|
|
return []
|
|
if not self.appearance_keywords:
|
|
return []
|
|
return [x.strip() for x in self.appearance_keywords.split()]
|
|
|
|
def get_variable_options(self):
|
|
variables = {}
|
|
if not self.workflow.variables_formdef:
|
|
return variables
|
|
workflow_options = self.workflow_options or {}
|
|
for field in self.workflow.variables_formdef.fields:
|
|
if not field.varname:
|
|
continue
|
|
option_name = 'form_option_' + field.varname
|
|
variables[option_name] = getattr(field, 'default_value', None)
|
|
if workflow_options.get(field.varname) is not None:
|
|
variables[option_name] = workflow_options.get(field.varname)
|
|
if field.store_display_value:
|
|
if '%s_display' % field.varname in workflow_options:
|
|
variables[option_name + '_raw'] = variables[option_name]
|
|
variables[option_name] = workflow_options.get('%s_display' % field.varname)
|
|
if field.store_structured_value:
|
|
if '%s_structured' % field.varname in workflow_options:
|
|
variables[option_name + '_structured'] = workflow_options.get(
|
|
'%s_structured' % field.varname
|
|
)
|
|
return variables
|
|
|
|
def get_variable_options_for_form(self):
|
|
variables = {}
|
|
if not self.workflow.variables_formdef:
|
|
return variables
|
|
if not self.workflow_options:
|
|
return {}
|
|
for field in self.workflow.variables_formdef.fields:
|
|
if not field.varname:
|
|
continue
|
|
variables[str(field.id)] = self.workflow_options.get(field.varname)
|
|
return variables
|
|
|
|
def set_variable_options(self, form):
|
|
data = self.workflow.variables_formdef.get_data(form)
|
|
variables = {}
|
|
for field in self.workflow.variables_formdef.fields:
|
|
if not field.varname:
|
|
continue
|
|
variables[field.varname] = data.get(field.id)
|
|
if field.store_display_value:
|
|
variables[field.varname + '_display'] = data.get(field.id + '_display')
|
|
if field.store_structured_value:
|
|
variables[field.varname + '_structured'] = data.get(field.id + '_structured')
|
|
if not self.workflow_options:
|
|
self.workflow_options = {}
|
|
self.workflow_options.update(variables)
|
|
|
|
@classmethod
|
|
def get_by_urlname(cls, url_name, ignore_migration=False, ignore_errors=False):
|
|
return cls.get_on_index(
|
|
url_name, 'url_name', ignore_migration=ignore_migration, ignore_errors=ignore_errors
|
|
)
|
|
|
|
get_by_slug = get_by_urlname
|
|
|
|
@property
|
|
def slug(self):
|
|
return self.url_name
|
|
|
|
@slug.setter
|
|
def slug(self, value):
|
|
self.url_name = value
|
|
|
|
def get_url(self, backoffice=False, preview=False, include_category=False, language=None):
|
|
if backoffice:
|
|
base_url = get_publisher().get_backoffice_url() + '/management'
|
|
elif preview:
|
|
base_url = get_publisher().get_frontoffice_url() + '/preview'
|
|
else:
|
|
base_url = get_publisher().get_frontoffice_url()
|
|
if language and get_publisher().has_i18n_enabled():
|
|
base_url += '/' + language
|
|
if include_category and self.category_id:
|
|
return '%s/%s/%s/' % (base_url, self.category.slug, self.url_name)
|
|
return '%s/%s/' % (base_url, self.url_name)
|
|
|
|
def get_api_url(self):
|
|
base_url = get_publisher().get_frontoffice_url()
|
|
return '%s/api/forms/%s/' % (base_url, self.url_name)
|
|
|
|
def get_admin_url(self):
|
|
base_url = get_publisher().get_backoffice_url()
|
|
return '%s/forms/%s/' % (base_url, self.id)
|
|
|
|
def get_backoffice_url(self):
|
|
return self.get_url(backoffice=True)
|
|
|
|
def get_preview_url(self):
|
|
return self.get_url(preview=True)
|
|
|
|
def get_field_admin_url(self, field):
|
|
return self.get_admin_url() + 'fields/%s/' % field.id
|
|
|
|
def get_submission_url(self, backoffice=False):
|
|
if backoffice:
|
|
return self.get_backoffice_submission_url()
|
|
return self.get_url()
|
|
|
|
def get_backoffice_submission_url(self):
|
|
base_url = get_publisher().get_backoffice_url() + '/submission'
|
|
return '%s/%s/' % (base_url, self.url_name)
|
|
|
|
def get_display_id_format(self):
|
|
return self.id_template or '{{formdef_id}}-{{form_number_raw}}'
|
|
|
|
def get_by_id_criteria(self, value):
|
|
if self.id_template:
|
|
return Equal('id_display', str(value))
|
|
try:
|
|
if int(value) >= 2**31:
|
|
# out of range for postgresql integer type; would raise DataError.
|
|
raise OverflowError
|
|
except ValueError:
|
|
# value not an integer, it could be id_display
|
|
return Equal('id_display', str(value))
|
|
return Equal('id', value)
|
|
|
|
def get_submission_lateral_block(self):
|
|
context = get_publisher().substitutions.get_context_variables(mode='lazy')
|
|
if self.submission_lateral_template is None:
|
|
new_value = None
|
|
else:
|
|
try:
|
|
new_value = Template(self.submission_lateral_template, autoescape=False).render(context)
|
|
except Exception as e:
|
|
get_publisher().record_error(
|
|
_('Could not render submission lateral template (%s)' % e),
|
|
formdef=self,
|
|
exception=e,
|
|
)
|
|
return None
|
|
|
|
return new_value
|
|
|
|
def create_form(self, page=None, displayed_fields=None, transient_formdata=None):
|
|
form = FormDefForm()
|
|
if self.appearance_keywords:
|
|
form.attrs['class'] = 'quixote %s' % self.appearance_keywords
|
|
if self.keywords:
|
|
form.attrs['data-keywords'] = ' '.join(self.keywords_list)
|
|
self.add_fields_to_form(
|
|
form, page=page, displayed_fields=displayed_fields, transient_formdata=transient_formdata
|
|
)
|
|
return form
|
|
|
|
def get_computed_fields_from_page(self, page):
|
|
on_page = page is None
|
|
for field in self.fields:
|
|
if field.key == 'page':
|
|
if on_page:
|
|
break
|
|
if page.id == field.id:
|
|
on_page = True
|
|
continue
|
|
if not on_page:
|
|
continue
|
|
if field.key == 'computed':
|
|
yield field
|
|
|
|
def add_fields_to_form(
|
|
self,
|
|
form,
|
|
page=None,
|
|
displayed_fields=None,
|
|
form_data=None, # a dictionary, to fill fields
|
|
transient_formdata=None,
|
|
): # a FormData
|
|
on_page = page is None
|
|
hidden_varnames = set()
|
|
for field in self.fields:
|
|
field.formdef = self
|
|
if field.key == 'page':
|
|
if on_page:
|
|
break
|
|
if page.id == field.id:
|
|
on_page = True
|
|
continue
|
|
if not on_page:
|
|
continue
|
|
visible = field.is_visible(form_data, self)
|
|
if not visible:
|
|
if not field.has_live_conditions(self, hidden_varnames=hidden_varnames):
|
|
# ignore field.varname when checking later conditions for liveness
|
|
if field.varname:
|
|
hidden_varnames.add(field.varname)
|
|
# no live conditions so field can be skipped
|
|
continue
|
|
if isinstance(displayed_fields, list):
|
|
displayed_fields.append(field)
|
|
value = None
|
|
if form_data:
|
|
value = form_data.get(field.id)
|
|
if not field.add_to_form:
|
|
continue
|
|
widget = field.add_to_form(form, value)
|
|
widget.is_hidden = not (visible)
|
|
widget.field = field
|
|
if transient_formdata and not widget.is_hidden:
|
|
transient_formdata.data.update(self.get_field_data(field, widget))
|
|
# invalidate cache as comment fields (and other things?) may
|
|
# have accessed variables in non-lazy mode and caused a cache
|
|
# with now-obsolete values.
|
|
get_publisher().substitutions.invalidate_cache()
|
|
widget._parsed = False
|
|
widget.error = None
|
|
|
|
def get_page(self, page_no):
|
|
return [x for x in self.fields if x.key == 'page'][page_no]
|
|
|
|
def page_count(self):
|
|
return len([x for x in self.fields if x.key == 'page']) or 1
|
|
|
|
def create_view_form(self, dict=None, use_tokens=True, visible=True):
|
|
dict = dict or {}
|
|
form = Form(enctype='multipart/form-data', use_tokens=use_tokens)
|
|
if not visible:
|
|
form.attrs['style'] = 'display: none;'
|
|
if self.keywords:
|
|
form.attrs['data-keywords'] = ' '.join(self.keywords_list)
|
|
|
|
form_fields = self.fields
|
|
if form_fields and form_fields[0].key != 'page':
|
|
# add fake initial page in case it's missing
|
|
form_fields = [fields.PageField(label='', type='page')] + form_fields
|
|
|
|
# 1st pass to group fields on different pages
|
|
pages = []
|
|
current_page = {}
|
|
for field in form_fields:
|
|
if field.key == 'page':
|
|
current_page = {'page': field, 'fields': []}
|
|
current_page['disabled'] = not field.is_visible(dict, self)
|
|
pages.append(current_page)
|
|
continue
|
|
|
|
if current_page['disabled']:
|
|
continue
|
|
|
|
if field.key == 'title' and (
|
|
not current_page['fields'] and current_page['page'].label == field.label
|
|
):
|
|
# don't include first title of a page if that title has the
|
|
# same text as the page.
|
|
continue
|
|
|
|
if field.key in ('title', 'subtitle', 'comment') and not field.include_in_validation_page:
|
|
# don't render field that wouldn't be displayed.
|
|
continue
|
|
|
|
if not field.is_visible(dict, self):
|
|
continue
|
|
|
|
current_page['fields'].append(field)
|
|
|
|
# 2nd pass to create view form
|
|
for page in pages:
|
|
visible_contents = False
|
|
if page['fields'] and any(x.include_in_validation_page for x in page['fields']):
|
|
visible_contents = True
|
|
form.widgets.append(HtmlWidget(htmltext('<div class="page">')))
|
|
if page['page'].label:
|
|
form.widgets.append(HtmlWidget(htmltext('<h3>%s</h3>') % page['page'].label))
|
|
form.widgets.append(HtmlWidget(htmltext('<div>')))
|
|
|
|
for field in page['fields']:
|
|
value = dict.get(field.id)
|
|
if not field.add_to_view_form:
|
|
continue
|
|
if not field.include_in_validation_page:
|
|
form.widgets.append(HtmlWidget(htmltext('<div style="display: none;">')))
|
|
field.add_to_view_form(form, value)
|
|
form.widgets.append(HtmlWidget(htmltext('</div>')))
|
|
else:
|
|
field.add_to_view_form(form, value)
|
|
|
|
if visible_contents:
|
|
form.widgets.append(HtmlWidget(htmltext('</div></div>')))
|
|
|
|
return form
|
|
|
|
def set_live_condition_sources(self, form, fields):
|
|
live_condition_fields = {}
|
|
|
|
fields_ids = {str(x.id) for x in fields}
|
|
|
|
for field in self.iter_fields(include_block_fields=True):
|
|
if (hasattr(field, 'block_field') and str(field.block_field.id) not in fields_ids) or (
|
|
not hasattr(field, 'block_field') and str(field.id) not in fields_ids
|
|
):
|
|
continue
|
|
if field.condition:
|
|
field.varnames = field.get_condition_varnames(formdef=self)
|
|
for varname in field.varnames:
|
|
if varname not in live_condition_fields:
|
|
live_condition_fields[varname] = []
|
|
live_condition_fields[varname].append(field)
|
|
if field.key in ('item', 'items') and field.data_source:
|
|
data_source = data_sources.get_object(field.data_source)
|
|
if data_source.type in ('json', 'jsonvalue', 'geojson') or data_source.type.startswith(
|
|
'carddef:'
|
|
):
|
|
varnames = data_source.get_referenced_varnames(formdef=self)
|
|
for varname in varnames:
|
|
if varname not in live_condition_fields:
|
|
live_condition_fields[varname] = []
|
|
live_condition_fields[varname].append(field)
|
|
if field.prefill and field.prefill.get('type') == 'string':
|
|
for varname in field.get_referenced_varnames(
|
|
formdef=self, value=field.prefill.get('value', '')
|
|
):
|
|
if varname not in live_condition_fields:
|
|
live_condition_fields[varname] = []
|
|
live_condition_fields[varname].append(field)
|
|
if field.key == 'comment':
|
|
for varname in field.get_referenced_varnames(formdef=self, value=field.label):
|
|
if varname not in live_condition_fields:
|
|
live_condition_fields[varname] = []
|
|
live_condition_fields[varname].append(field)
|
|
|
|
for field in fields:
|
|
if field.varname in live_condition_fields:
|
|
widget = form.get_widget('f%s' % field.id)
|
|
if widget:
|
|
widget.live_condition_source = True
|
|
widget.live_condition_fields = live_condition_fields[field.varname]
|
|
elif field.key == 'computed':
|
|
field.live_condition_source = True
|
|
field.live_condition_fields = live_condition_fields[field.varname]
|
|
|
|
@classmethod
|
|
def get_field_data(cls, field, widget, raise_on_error=False):
|
|
d = {}
|
|
d[field.id] = widget.parse()
|
|
if d.get(field.id) is not None and field.convert_value_from_str:
|
|
d[field.id] = field.convert_value_from_str(d[field.id])
|
|
field.set_value(d, d[field.id], raise_on_error=raise_on_error)
|
|
if getattr(widget, 'cleanup', None):
|
|
widget.cleanup()
|
|
return d
|
|
|
|
def get_data(self, form, raise_on_error=False, pages=None):
|
|
d = {}
|
|
current_page_id = None
|
|
for field in self.fields:
|
|
if field.key == 'page':
|
|
current_page_id = field.id
|
|
continue
|
|
if pages not in (None, [None]) and current_page_id not in [x.id for x in pages]:
|
|
continue
|
|
widget = form.get_widget('f%s' % field.id)
|
|
if widget:
|
|
d.update(self.get_field_data(field, widget, raise_on_error=raise_on_error))
|
|
elif pages is not None:
|
|
# reset
|
|
d[f'{field.id}'] = d[f'{field.id}_display'] = d[f'{field.id}_structured'] = None
|
|
return d
|
|
|
|
def export_to_json(self, include_id=False, indent=None, with_user_fields=False):
|
|
from wcs.carddef import CardDef
|
|
|
|
charset = get_publisher().site_charset
|
|
root = {}
|
|
root['name'] = force_str(self.name, charset)
|
|
if include_id and self.id:
|
|
root['id'] = str(self.id)
|
|
if self.category:
|
|
root['category'] = force_str(self.category.name, charset)
|
|
root['category_id'] = str(self.category.id)
|
|
if self.workflow:
|
|
root['workflow'] = self.workflow.get_json_export_dict(include_id=include_id)
|
|
|
|
more_attributes = ['tracking_code_verify_fields']
|
|
|
|
for attribute in self.TEXT_ATTRIBUTES + self.BOOLEAN_ATTRIBUTES + more_attributes:
|
|
if not hasattr(self, attribute):
|
|
continue
|
|
root[attribute] = getattr(self, attribute)
|
|
if isinstance(root[attribute], time.struct_time):
|
|
root[attribute] = time.strftime('%Y-%m-%dT%H:%M:%S', root[attribute])
|
|
|
|
root['fields'] = []
|
|
if self.fields:
|
|
for field in self.fields:
|
|
root['fields'].append(field.export_to_json(include_id=include_id))
|
|
|
|
if self.geolocations:
|
|
root['geolocations'] = self.geolocations.copy()
|
|
|
|
if self.workflow_options:
|
|
root['options'] = self.workflow_options.copy()
|
|
for k, v in list(root['options'].items()):
|
|
# convert time.struct_time to strings as python3 would
|
|
# serialize it as tuple.
|
|
if isinstance(v, time.struct_time):
|
|
root['options'][k] = time.strftime('%Y-%m-%dT%H:%M:%S', v)
|
|
|
|
if self.required_authentication_contexts:
|
|
root['required_authentication_contexts'] = self.required_authentication_contexts[:]
|
|
|
|
if isinstance(self, CardDef):
|
|
all_carddefs = CardDef.select(ignore_errors=True)
|
|
all_carddefs = [c for c in all_carddefs if c]
|
|
all_carddefs_by_slug = {c.url_name: c for c in all_carddefs}
|
|
|
|
def get_field_label(obj, field_varname):
|
|
card_slug = obj.split(':')[1]
|
|
carddef = all_carddefs_by_slug.get(card_slug)
|
|
if not carddef:
|
|
return
|
|
for field in carddef.iter_fields(include_block_fields=True):
|
|
if field.contextual_varname == field_varname:
|
|
if getattr(field, 'block_field', None):
|
|
return '%s - %s' % (field.block_field.label, field.label)
|
|
return field.label
|
|
|
|
card_relations = []
|
|
current_objdef_ref = '%s:%s' % (self.xml_root_node, self.url_name)
|
|
for objdef, relations in self.build_relations_by_ref().items():
|
|
if not objdef.startswith('carddef:'):
|
|
continue
|
|
try:
|
|
CardDef.get_by_urlname(objdef.split(':')[1])
|
|
except KeyError:
|
|
continue
|
|
for relation in relations:
|
|
if not relation['varname']:
|
|
continue
|
|
card_relations.append(
|
|
{
|
|
'varname': relation['varname'],
|
|
'label': get_field_label(current_objdef_ref, relation['varname']),
|
|
'type': relation['type'],
|
|
'obj': objdef,
|
|
'reverse': False,
|
|
}
|
|
)
|
|
|
|
for relation in self.reverse_relations or []:
|
|
if not relation['obj'].startswith('carddef:'):
|
|
continue
|
|
if not relation['varname']:
|
|
continue
|
|
rel = relation.copy()
|
|
rel.update(
|
|
{
|
|
'reverse': True,
|
|
'label': get_field_label(relation['obj'], relation['varname']),
|
|
}
|
|
)
|
|
card_relations.append(rel)
|
|
|
|
root['relations'] = sorted(card_relations, key=itemgetter('varname'))
|
|
if with_user_fields:
|
|
root['user'] = {
|
|
'fields': [
|
|
{
|
|
'varname': 'name',
|
|
'label': _('Full name'),
|
|
'type': 'string',
|
|
},
|
|
{
|
|
'varname': 'email',
|
|
'label': _('Email'),
|
|
'type': 'email',
|
|
},
|
|
]
|
|
}
|
|
user_formdef = get_publisher().user_class.get_formdef()
|
|
if user_formdef:
|
|
root['user']['fields'] += [
|
|
{
|
|
'varname': f.varname or '',
|
|
'label': f.label,
|
|
'type': f.key,
|
|
}
|
|
for f in user_formdef.fields
|
|
]
|
|
|
|
return json.dumps(root, indent=indent, sort_keys=True, cls=JSONEncoder)
|
|
|
|
@classmethod
|
|
def import_from_json(cls, fd, charset=None, include_id=False):
|
|
if charset is None:
|
|
charset = get_publisher().site_charset
|
|
formdef = cls()
|
|
|
|
def unicode2str(v):
|
|
if isinstance(v, dict):
|
|
return {unicode2str(k): unicode2str(v) for k, v in v.items()}
|
|
elif isinstance(v, list):
|
|
return [unicode2str(x) for x in v]
|
|
elif isinstance(v, str):
|
|
return force_str(v)
|
|
else:
|
|
return v
|
|
|
|
# we have to make sure all strings are str object, not unicode.
|
|
value = unicode2str(json.load(fd))
|
|
|
|
if include_id and 'id' in value:
|
|
formdef.id = value.get('id')
|
|
|
|
if include_id and 'category_id' in value:
|
|
formdef.category_id = value.get('category_id')
|
|
elif 'category' in value:
|
|
category = value.get('category')
|
|
for c in Category.select():
|
|
if c.name == category:
|
|
formdef.category_id = c.id
|
|
break
|
|
|
|
if include_id and 'workflow_id' in value:
|
|
formdef.workflow_id = value.get('workflow_id')
|
|
elif (
|
|
include_id
|
|
and 'workflow' in value
|
|
and isinstance(value['workflow'], dict)
|
|
and 'id' in value['workflow']
|
|
):
|
|
formdef.workflow_id = value['workflow'].get('id')
|
|
elif 'workflow' in value:
|
|
if isinstance(value['workflow'], str):
|
|
workflow = value.get('workflow')
|
|
else:
|
|
workflow = value['workflow'].get('name')
|
|
from wcs.workflows import Workflow
|
|
|
|
for w in Workflow.select():
|
|
if w.name == workflow:
|
|
formdef.workflow_id = w.id
|
|
break
|
|
|
|
more_attributes = ['tracking_code_verify_fields']
|
|
for attribute in cls.TEXT_ATTRIBUTES + cls.BOOLEAN_ATTRIBUTES + more_attributes:
|
|
if attribute in value:
|
|
setattr(formdef, attribute, value.get(attribute))
|
|
|
|
formdef.fields = []
|
|
for i, field in enumerate(value.get('fields', [])):
|
|
try:
|
|
field_o = fields.get_field_class_by_type(field.get('type'))()
|
|
except KeyError:
|
|
raise FormdefImportError(_('Unknown field type'), details=field.findtext('type'))
|
|
field_o.init_with_json(field, include_id=True)
|
|
if not field_o.id:
|
|
# this assumes all fields will have id, or none of them
|
|
field_o.id = str(i)
|
|
formdef.fields.append(field_o)
|
|
|
|
if value.get('options'):
|
|
formdef.workflow_options = value.get('options')
|
|
for option_key, option_value in formdef.workflow_options.items():
|
|
if isinstance(option_value, dict) and 'filename' in option_value:
|
|
new_value = PicklableUpload(
|
|
orig_filename=option_value['filename'],
|
|
content_type=option_value['content_type'],
|
|
)
|
|
new_value.receive([base64.decodebytes(force_bytes(option_value['content']))])
|
|
formdef.workflow_options[option_key] = new_value
|
|
|
|
if value.get('geolocations'):
|
|
formdef.geolocations = value.get('geolocations')
|
|
|
|
if value.get('required_authentication_contexts'):
|
|
formdef.required_authentication_contexts = [
|
|
str(x) for x in value.get('required_authentication_contexts')
|
|
]
|
|
|
|
return formdef
|
|
|
|
def export_to_xml(self, include_id=False):
|
|
charset = get_publisher().site_charset
|
|
root = ET.Element(self.xml_root_node)
|
|
if include_id and self.id:
|
|
root.attrib['id'] = str(self.id)
|
|
for text_attribute in list(self.TEXT_ATTRIBUTES):
|
|
if not hasattr(self, text_attribute) or not getattr(self, text_attribute):
|
|
continue
|
|
ET.SubElement(root, text_attribute).text = force_str(getattr(self, text_attribute), charset)
|
|
for boolean_attribute in self.BOOLEAN_ATTRIBUTES:
|
|
if not hasattr(self, boolean_attribute):
|
|
continue
|
|
value = getattr(self, boolean_attribute)
|
|
if value:
|
|
value = 'true'
|
|
else:
|
|
value = 'false'
|
|
ET.SubElement(root, boolean_attribute).text = value
|
|
|
|
self.category_class.object_category_xml_export(self, root, include_id=include_id)
|
|
|
|
workflow = None
|
|
if self.workflow_id:
|
|
from wcs.workflows import Workflow
|
|
|
|
workflow = Workflow.get(self.workflow_id, ignore_errors=True, ignore_migration=True)
|
|
if not workflow:
|
|
workflow = self.get_default_workflow()
|
|
elem = ET.SubElement(root, 'workflow')
|
|
elem.text = force_str(workflow.name, charset)
|
|
if workflow.slug:
|
|
elem.attrib['slug'] = str(workflow.slug)
|
|
if include_id:
|
|
elem.attrib['workflow_id'] = str(workflow.id)
|
|
|
|
if self.tracking_code_verify_fields is not None:
|
|
verify_fields = ET.SubElement(root, 'tracking_code_verify_fields')
|
|
for field_id in self.tracking_code_verify_fields:
|
|
ET.SubElement(verify_fields, 'field_id').text = str(field_id)
|
|
|
|
fields = ET.SubElement(root, 'fields')
|
|
for field in self.fields or []:
|
|
fields.append(field.export_to_xml(charset=charset, include_id=include_id))
|
|
|
|
from wcs.workflows import get_role_name_and_slug
|
|
|
|
def add_role_element(roles_root, role_id):
|
|
if not role_id:
|
|
return
|
|
try:
|
|
role_name, role_slug = get_role_name_and_slug(role_id)
|
|
except KeyError:
|
|
# skip broken/missing roles
|
|
return
|
|
sub = ET.SubElement(roles_root, 'role')
|
|
if role_slug:
|
|
sub.attrib['slug'] = role_slug
|
|
if include_id:
|
|
sub.attrib['role_id'] = str(role_id)
|
|
sub.text = role_name
|
|
return sub
|
|
|
|
roles_elements = [
|
|
('roles', 'user-roles'),
|
|
('backoffice_submission_roles', 'backoffice-submission-roles'),
|
|
]
|
|
for attr_name, node_name in roles_elements:
|
|
if not getattr(self, attr_name, None):
|
|
continue
|
|
roles = ET.SubElement(root, node_name)
|
|
for role_id in getattr(self, attr_name):
|
|
add_role_element(roles, role_id)
|
|
|
|
if self.workflow_roles:
|
|
roles = ET.SubElement(root, 'roles')
|
|
for role_key, role_id in self.workflow_roles.items():
|
|
sub = add_role_element(roles, role_id)
|
|
if sub is not None:
|
|
sub.attrib['role_key'] = role_key
|
|
|
|
def make_xml_value(element, value):
|
|
if isinstance(value, str):
|
|
element.text = value
|
|
elif hasattr(value, 'base_filename'):
|
|
element.attrib['type'] = 'file'
|
|
ET.SubElement(element, 'filename').text = value.base_filename
|
|
ET.SubElement(element, 'content_type').text = value.content_type or 'application/octet-stream'
|
|
ET.SubElement(element, 'content').text = force_str(base64.b64encode(value.get_content()))
|
|
elif isinstance(value, time.struct_time):
|
|
element.text = time.strftime('%Y-%m-%d', value)
|
|
element.attrib['type'] = 'date'
|
|
elif isinstance(value, bool):
|
|
element.text = 'true' if value else 'false'
|
|
element.attrib['type'] = 'bool'
|
|
elif isinstance(value, int):
|
|
element.attrib['type'] = 'int'
|
|
element.text = str(value)
|
|
elif isinstance(value, (set, tuple, list)):
|
|
element.attrib['type'] = 'list'
|
|
for child_value in value:
|
|
sub_element = ET.SubElement(element, 'item')
|
|
make_xml_value(sub_element, child_value)
|
|
elif isinstance(value, dict):
|
|
element.attrib['type'] = 'dict'
|
|
for child_key, child_value in value.items():
|
|
sub_element = ET.SubElement(element, child_key)
|
|
make_xml_value(sub_element, child_value)
|
|
else:
|
|
assert value is None, 'option variable of unknown type (%s)' % type(value)
|
|
|
|
options = ET.SubElement(root, 'options')
|
|
for option in sorted(self.workflow_options or []):
|
|
element = ET.SubElement(options, 'option')
|
|
element.attrib['varname'] = option
|
|
option_value = self.workflow_options.get(option)
|
|
make_xml_value(element, option_value)
|
|
|
|
custom_views_element = ET.SubElement(root, 'custom_views')
|
|
if hasattr(self, '_custom_views'):
|
|
# it has just been loaded, it's reexported as part as the overwrite
|
|
# confirmation dialog, do not get custom views from database.
|
|
custom_views = self._custom_views
|
|
else:
|
|
custom_views = []
|
|
for view in get_publisher().custom_view_class.select_shared_for_formdef(formdef=self):
|
|
custom_views.append(view)
|
|
for view in custom_views:
|
|
custom_view_node = view.export_to_xml(charset=charset, include_id=include_id)
|
|
if custom_view_node is not None:
|
|
custom_views_element.append(custom_view_node)
|
|
|
|
geolocations = ET.SubElement(root, 'geolocations')
|
|
for geoloc_key, geoloc_label in (self.geolocations or {}).items():
|
|
element = ET.SubElement(geolocations, 'geolocation')
|
|
element.attrib['key'] = geoloc_key
|
|
element.text = force_str(geoloc_label, charset)
|
|
|
|
if self.required_authentication_contexts:
|
|
element = ET.SubElement(root, 'required_authentication_contexts')
|
|
for auth_context in self.required_authentication_contexts:
|
|
ET.SubElement(element, 'method').text = force_str(auth_context)
|
|
|
|
if self.digest_templates:
|
|
digest_templates = ET.SubElement(root, 'digest_templates')
|
|
for key, value in self.digest_templates.items():
|
|
if not value:
|
|
continue
|
|
sub = ET.SubElement(digest_templates, 'template')
|
|
sub.attrib['key'] = key
|
|
sub.text = value
|
|
return root
|
|
|
|
@classmethod
|
|
def import_from_xml(cls, fd, charset=None, include_id=False, fix_on_error=False, check_datasources=True):
|
|
try:
|
|
tree = ET.parse(fd)
|
|
except Exception:
|
|
raise ValueError()
|
|
formdef = cls.import_from_xml_tree(
|
|
tree,
|
|
charset=charset,
|
|
include_id=include_id,
|
|
fix_on_error=fix_on_error,
|
|
check_datasources=check_datasources,
|
|
)
|
|
|
|
if formdef.url_name:
|
|
try:
|
|
cls.get_on_index(formdef.url_name, 'url_name', ignore_migration=True)
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
formdef._import_orig_slug = formdef.url_name
|
|
formdef.url_name = formdef.get_new_url_name()
|
|
|
|
# check if all field id are unique
|
|
known_field_ids = set()
|
|
for field in formdef.fields:
|
|
if field.id in known_field_ids:
|
|
raise FormdefImportRecoverableError(_('Duplicated field identifiers'))
|
|
known_field_ids.add(field.id)
|
|
|
|
return formdef
|
|
|
|
@classmethod
|
|
def import_from_xml_tree(
|
|
cls, tree, include_id=False, charset=None, fix_on_error=False, snapshot=False, check_datasources=True
|
|
):
|
|
from wcs.carddef import CardDef
|
|
|
|
if charset is None:
|
|
charset = get_publisher().site_charset
|
|
assert charset == 'utf-8'
|
|
formdef = cls()
|
|
if tree.find('name') is None or not tree.find('name').text:
|
|
raise FormdefImportError(_('Missing name'))
|
|
|
|
# if the tree we get is actually a ElementTree for real, we get its
|
|
# root element and go on happily.
|
|
if not ET.iselement(tree):
|
|
tree = tree.getroot()
|
|
|
|
if tree.tag != cls.xml_root_node:
|
|
raise FormdefImportError(
|
|
_('Provided XML file is invalid, it starts with a <%(seen)s> tag instead of <%(expected)s>')
|
|
% {'seen': tree.tag, 'expected': cls.xml_root_node}
|
|
)
|
|
|
|
if include_id and tree.attrib.get('id'):
|
|
formdef.id = tree.attrib.get('id')
|
|
for text_attribute in list(cls.TEXT_ATTRIBUTES):
|
|
value = tree.find(text_attribute)
|
|
if value is None or value.text is None:
|
|
continue
|
|
setattr(formdef, text_attribute, xml_node_text(value))
|
|
|
|
for boolean_attribute in cls.BOOLEAN_ATTRIBUTES:
|
|
value = tree.find(boolean_attribute)
|
|
if value is None:
|
|
continue
|
|
setattr(formdef, boolean_attribute, value.text == 'true')
|
|
|
|
formdef.fields = []
|
|
unknown_field_types = set()
|
|
unknown_fields_blocks = set()
|
|
for i, field in enumerate(tree.find('fields')):
|
|
field_type = field.findtext('type')
|
|
if field_type == 'block':
|
|
field_type = 'block:%s' % field.findtext('block_slug')
|
|
try:
|
|
field_o = fields.get_field_class_by_type(field_type)()
|
|
except KeyError:
|
|
if field_type.startswith('block:'):
|
|
unknown_fields_blocks.add(field_type.removeprefix('block:'))
|
|
else:
|
|
unknown_field_types.add(field_type)
|
|
continue
|
|
field_o.init_with_xml(field, charset, include_id=True)
|
|
if field_type.startswith('block:'):
|
|
field_o.block_slug = field_type.removeprefix('block:')
|
|
if fix_on_error or not field_o.id:
|
|
# this assumes all fields will have id, or none of them
|
|
field_o.id = str(i + 1)
|
|
formdef.fields.append(field_o)
|
|
|
|
if tree.find('tracking_code_verify_fields') is not None:
|
|
formdef.tracking_code_verify_fields = [
|
|
xml_node_text(verify_field_id)
|
|
for verify_field_id in tree.findall('tracking_code_verify_fields/field_id')
|
|
]
|
|
|
|
formdef.workflow_options = {}
|
|
|
|
def get_value_from_xml(element):
|
|
if element.attrib.get('type') == 'int':
|
|
return int(xml_node_text(element))
|
|
if element.attrib.get('type') == 'date':
|
|
return time.strptime(element.text, '%Y-%m-%d')
|
|
elif element.attrib.get('type') == 'bool':
|
|
return bool(element.text == 'true')
|
|
elif element.attrib.get('type') == 'file' or element.findall('filename'):
|
|
value = PicklableUpload(
|
|
orig_filename=xml_node_text(element.find('filename')),
|
|
content_type=xml_node_text(element.find('content_type')),
|
|
)
|
|
value.receive([base64.decodebytes(force_bytes(xml_node_text(element.find('content'))))])
|
|
return value
|
|
elif element.attrib.get('type') == 'list':
|
|
return [get_value_from_xml(x) for x in element.findall('item')]
|
|
elif element.attrib.get('type') == 'dict':
|
|
return {x.tag: get_value_from_xml(x) for x in element.findall('*')}
|
|
elif element.text:
|
|
return xml_node_text(element)
|
|
|
|
for option in tree.findall('options/option'):
|
|
formdef.workflow_options[option.attrib.get('varname')] = get_value_from_xml(option)
|
|
|
|
formdef._custom_views = []
|
|
for view in tree.findall('custom_views/%s' % get_publisher().custom_view_class.xml_root_node):
|
|
view_o = get_publisher().custom_view_class()
|
|
view_o.init_with_xml(view, charset, include_id=include_id)
|
|
formdef._custom_views.append(view_o)
|
|
|
|
cls.category_class.object_category_xml_import(formdef, tree, include_id=include_id)
|
|
|
|
if tree.find('workflow') is not None:
|
|
from wcs.workflows import Workflow
|
|
|
|
workflow_node = tree.find('workflow')
|
|
if include_id and workflow_node.attrib.get('workflow_id'):
|
|
workflow_id = workflow_node.attrib.get('workflow_id')
|
|
if Workflow.has_key(workflow_id):
|
|
formdef.workflow_id = workflow_id
|
|
else:
|
|
workflow_slug = workflow_node.attrib.get('slug')
|
|
if workflow_slug:
|
|
formdef.workflow = Workflow.get_by_slug(workflow_slug)
|
|
else:
|
|
workflow = xml_node_text(workflow_node)
|
|
for w in Workflow.select(ignore_errors=True, ignore_migration=True):
|
|
if w and w.name == workflow:
|
|
formdef.workflow_id = w.id
|
|
break
|
|
|
|
roles_elements = [
|
|
('roles', 'user-roles'),
|
|
('backoffice_submission_roles', 'backoffice-submission-roles'),
|
|
]
|
|
for attr_name, node_name in roles_elements:
|
|
if tree.find(node_name) is None:
|
|
continue
|
|
roles_node = tree.find(node_name)
|
|
roles = []
|
|
setattr(formdef, attr_name, roles)
|
|
for child in roles_node:
|
|
role_id = get_publisher().role_class.get_role_by_node(child, include_id=include_id)
|
|
if role_id:
|
|
roles.append(role_id)
|
|
|
|
if tree.find('roles') is not None:
|
|
roles_node = tree.find('roles')
|
|
formdef.workflow_roles = {}
|
|
for child in roles_node:
|
|
role_key = child.attrib['role_key']
|
|
role_id = get_publisher().role_class.get_role_by_node(child, include_id=include_id)
|
|
formdef.workflow_roles[role_key] = role_id
|
|
|
|
if tree.find('geolocations') is not None:
|
|
geolocations_node = tree.find('geolocations')
|
|
formdef.geolocations = {}
|
|
for child in geolocations_node:
|
|
geoloc_key = child.attrib['key']
|
|
geoloc_value = xml_node_text(child)
|
|
formdef.geolocations[geoloc_key] = geoloc_value
|
|
|
|
if tree.find('required_authentication_contexts') is not None:
|
|
node = tree.find('required_authentication_contexts')
|
|
formdef.required_authentication_contexts = []
|
|
for child in node:
|
|
formdef.required_authentication_contexts.append(str(child.text))
|
|
|
|
if tree.find('digest_templates') is not None:
|
|
digest_templates_node = tree.find('digest_templates')
|
|
formdef.digest_templates = {}
|
|
for child in digest_templates_node:
|
|
key = child.attrib['key']
|
|
value = xml_node_text(child)
|
|
formdef.digest_templates[key] = value
|
|
|
|
unknown_datasources = set()
|
|
if check_datasources:
|
|
# check if datasources are defined
|
|
for field in formdef.fields:
|
|
data_source = getattr(field, 'data_source', None)
|
|
if data_source:
|
|
data_source_id = data_source.get('type')
|
|
if isinstance(data_sources.get_object(data_source), data_sources.StubNamedDataSource):
|
|
unknown_datasources.add(data_source_id)
|
|
elif data_source_id and data_source_id.startswith('carddef:'):
|
|
parts = data_source_id.split(':')
|
|
# check if carddef exists
|
|
url_name = parts[1]
|
|
if formdef.xml_root_node == 'carddef' and formdef.url_name == url_name:
|
|
# reference to itself, it's ok
|
|
continue
|
|
try:
|
|
CardDef.get_by_urlname(url_name)
|
|
except KeyError:
|
|
unknown_datasources.add(data_source_id)
|
|
continue
|
|
|
|
if len(parts) == 2 or parts[2] == '_with_user_filter':
|
|
continue
|
|
|
|
lookup_criterias = [
|
|
Equal('formdef_type', 'carddef'),
|
|
Equal('visibility', 'datasource'),
|
|
Equal('slug', parts[2]),
|
|
]
|
|
try:
|
|
get_publisher().custom_view_class.select(lookup_criterias)[0]
|
|
except IndexError:
|
|
unknown_datasources.add(data_source_id)
|
|
|
|
if unknown_field_types or unknown_fields_blocks or unknown_datasources:
|
|
details = collections.defaultdict(set)
|
|
if unknown_field_types:
|
|
details[_('Unknown field types')].update(unknown_field_types)
|
|
if unknown_fields_blocks:
|
|
details[_('Unknown fields blocks')].update(unknown_fields_blocks)
|
|
if unknown_datasources:
|
|
details[_('Unknown datasources')].update(unknown_datasources)
|
|
raise FormdefImportUnknownReferencedError(_('Unknown referenced objects'), details=details)
|
|
|
|
return formdef
|
|
|
|
def get_detailed_email_form(self, formdata, url):
|
|
r = ''
|
|
if formdata.user_id and formdata.user:
|
|
r = '%s\n %s\n\n' % (_('User name:'), formdata.user.name)
|
|
return r + formdata.get_rst_summary(url)
|
|
|
|
def get_user_prefilled_data(self, formdata, field_id):
|
|
# look up in submitted form for one that would hold the user
|
|
# email (the one set to be prefilled by user email)
|
|
|
|
if not formdata.data:
|
|
return None
|
|
|
|
def is_user_field(field):
|
|
if not getattr(field, 'prefill', None):
|
|
return False
|
|
if field.prefill.get('type') != 'user':
|
|
return False
|
|
if field.prefill.get('value') != field_id:
|
|
return False
|
|
return True
|
|
|
|
# check first in "normal" fields
|
|
for field in self.fields:
|
|
if not is_user_field(field):
|
|
continue
|
|
|
|
v = formdata.data.get(field.id)
|
|
if v:
|
|
return v
|
|
|
|
# then check in block fields
|
|
for field in self.fields:
|
|
if field.key != 'block':
|
|
continue
|
|
for subfield in field.block.fields:
|
|
if not is_user_field(subfield):
|
|
continue
|
|
v = formdata.data.get(field.id)
|
|
if not (v and v.get('data')):
|
|
continue
|
|
for data in v.get('data'):
|
|
w = data.get(subfield.id)
|
|
if w:
|
|
return w
|
|
|
|
def get_submitter_email(self, formdata):
|
|
users_cfg = get_cfg('users', {})
|
|
field_email_id = users_cfg.get('field_email') or 'email'
|
|
value = self.get_user_prefilled_data(formdata, field_email_id)
|
|
if value:
|
|
return value
|
|
|
|
# if nothing was found, get email from user profile
|
|
if formdata.user and formdata.user.email and formdata.user.is_active:
|
|
return formdata.user.email
|
|
|
|
return None
|
|
|
|
def get_submitter_phone(self, formdata):
|
|
users_cfg = get_cfg('users', {})
|
|
field_phone_id = users_cfg.get('field_phone')
|
|
if not field_phone_id:
|
|
return None
|
|
|
|
value = self.get_user_prefilled_data(formdata, field_phone_id)
|
|
if value:
|
|
return value
|
|
|
|
# if nothing was found, get email from user profile
|
|
if formdata.user and formdata.user.is_active:
|
|
return formdata.user.get_formatted_phone()
|
|
|
|
return None
|
|
|
|
def get_static_substitution_variables(self, minimal=False):
|
|
d = {
|
|
'form_name': self.name,
|
|
'form_slug': self.url_name,
|
|
'form_class_name': self.__class__.__name__, # reserved for logged errors
|
|
}
|
|
if not minimal:
|
|
from wcs.variables import LazyFormDef
|
|
|
|
d['form_objects'] = LazyFormDef(self).objects
|
|
if self.category:
|
|
d.update(self.category.get_substitution_variables(minimal=minimal))
|
|
d.update(self.get_variable_options())
|
|
return d
|
|
|
|
def get_substitution_variables(self, minimal=False):
|
|
from wcs.variables import LazyFormDef
|
|
|
|
from .qommon.substitution import CompatibilityNamesDict
|
|
|
|
return CompatibilityNamesDict({'form': LazyFormDef(self)})
|
|
|
|
def get_detailed_evolution(self, formdata):
|
|
if not formdata.evolution:
|
|
return None
|
|
|
|
details = []
|
|
evo = formdata.evolution[-1]
|
|
if evo.who:
|
|
evo_who = None
|
|
if evo.who == '_submitter':
|
|
if formdata.user_id:
|
|
evo_who = formdata.user_id
|
|
else:
|
|
evo_who = evo.who
|
|
if evo_who:
|
|
user_who = get_publisher().user_class.get(evo_who, ignore_errors=True)
|
|
if user_who:
|
|
details.append(_('User name'))
|
|
details.append(' %s' % user_who.name)
|
|
if evo.status:
|
|
details.append(_('Status'))
|
|
details.append(' %s' % formdata.get_status_label())
|
|
comment = evo.get_plain_text_comment()
|
|
if comment:
|
|
details.append('\n%s\n' % comment)
|
|
return '\n\n----\n\n' + '\n'.join([str(x) for x in details])
|
|
|
|
def is_of_concern_for_role_id(self, role_id):
|
|
if not self.workflow_roles:
|
|
return False
|
|
return role_id in self.workflow_roles.values()
|
|
|
|
def is_of_concern_for_user(self, user, formdata=None):
|
|
if not self.workflow_roles:
|
|
self.workflow_roles = {}
|
|
|
|
user_roles = set(user.get_roles())
|
|
|
|
# if the formdef itself has some function attributed to the user, grant
|
|
# access.
|
|
for role_id in self.workflow_roles.values():
|
|
if role_id in user_roles:
|
|
return True
|
|
|
|
# if there was some redispatching of function, values will be different
|
|
# in formdata, check them.
|
|
if formdata and formdata.workflow_roles:
|
|
for role_id in formdata.workflow_roles.values():
|
|
if role_id is None:
|
|
continue
|
|
if isinstance(role_id, list):
|
|
role_ids = set(role_id)
|
|
else:
|
|
role_ids = {role_id}
|
|
if user_roles.intersection(role_ids):
|
|
return True
|
|
|
|
# if no formdata was given, lookup if there are some existing formdata
|
|
# where the user has access.
|
|
if not formdata:
|
|
data_class = self.data_class()
|
|
for role_id in user.get_roles():
|
|
if data_class.get_ids_with_indexed_value('workflow_roles', role_id):
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_user_allowed_read(self, user, formdata=None):
|
|
if not user:
|
|
if formdata and get_session() and get_session().is_anonymous_submitter(formdata):
|
|
return True
|
|
return False
|
|
if user.is_admin:
|
|
return True
|
|
|
|
user_roles = set(user.get_roles())
|
|
user_roles.add(logged_users_role().id)
|
|
|
|
def ensure_role_are_strings(roles):
|
|
# makes sure all roles are defined as strings, as different origins
|
|
# (formdef, user, workflow status...) may define them differently.
|
|
return {str(x) for x in roles if x}
|
|
|
|
user_roles = ensure_role_are_strings(user_roles)
|
|
|
|
if self.user_allowed_to_access_own_data and formdata and formdata.is_submitter(user):
|
|
return True
|
|
if self.is_of_concern_for_user(user):
|
|
if not formdata:
|
|
return True
|
|
|
|
if formdata:
|
|
# current status
|
|
concerned_roles = ensure_role_are_strings(formdata.get_concerned_roles())
|
|
if '_submitter' in concerned_roles and formdata.is_submitter(user):
|
|
return True
|
|
if user_roles.intersection(concerned_roles):
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_user_allowed_read_status_and_history(self, user, formdata=None):
|
|
if user and user.is_admin:
|
|
return True
|
|
|
|
if not self.workflow_roles:
|
|
self.workflow_roles = {}
|
|
form_roles = [x for x in self.workflow_roles.values() if x]
|
|
if formdata and formdata.workflow_roles:
|
|
for x in formdata.workflow_roles.values():
|
|
if isinstance(x, list):
|
|
form_roles.extend(x)
|
|
elif x:
|
|
form_roles.append(x)
|
|
return self.is_user_allowed_read(user, formdata=formdata)
|
|
|
|
def is_disabled(self):
|
|
if self.disabled:
|
|
return True
|
|
if self.publication_date:
|
|
try:
|
|
publication_datetime = get_as_datetime(self.publication_date)
|
|
except ValueError:
|
|
return False
|
|
if publication_datetime > datetime.datetime.now():
|
|
return True
|
|
if self.expiration_date:
|
|
try:
|
|
expiration_datetime = get_as_datetime(self.expiration_date)
|
|
except ValueError:
|
|
return False
|
|
if expiration_datetime < datetime.datetime.now():
|
|
return True
|
|
return False
|
|
|
|
class _EmptyClass: # helper for instance creation without calling __init__
|
|
pass
|
|
|
|
def __copy__(self, memo=None, deepcopy=False):
|
|
formdef_copy = self._EmptyClass()
|
|
formdef_copy.__class__ = self.__class__
|
|
if deepcopy:
|
|
formdef_copy.__dict__ = copy.deepcopy(self.__dict__, memo=memo)
|
|
else:
|
|
formdef_copy.__dict__ = copy.copy(self.__dict__)
|
|
return formdef_copy
|
|
|
|
def __deepcopy__(self, memo=None):
|
|
return self.__copy__(memo=memo, deepcopy=True)
|
|
|
|
# don't pickle computed attributes
|
|
def __getstate__(self):
|
|
odict = copy.copy(self.__dict__)
|
|
if '_workflow' in odict:
|
|
del odict['_workflow']
|
|
if '_start_page' in odict:
|
|
del odict['_start_page']
|
|
if self.lightweight and 'fields' in odict:
|
|
# will be stored independently
|
|
del odict['fields']
|
|
if '_custom_views' in odict:
|
|
del odict['_custom_views']
|
|
if '_import_orig_slug' in odict:
|
|
del odict['_import_orig_slug']
|
|
return odict
|
|
|
|
def __setstate__(self, dict):
|
|
super().__setstate__(dict)
|
|
self.__dict__ = dict
|
|
self._workflow = None
|
|
self._start_page = None
|
|
if hasattr(self, 'snapshot_object'):
|
|
# don't restore snapshot object that would have been stored erroneously
|
|
delattr(self, 'snapshot_object')
|
|
|
|
@classmethod
|
|
def storage_load(cls, fd, **kwargs):
|
|
o = super().storage_load(fd)
|
|
if kwargs.get('lightweight'):
|
|
o.fields = Ellipsis
|
|
return o
|
|
if cls.lightweight:
|
|
try:
|
|
o.fields = pickle.load(fd, **PICKLE_KWARGS)
|
|
except EOFError:
|
|
pass # old format
|
|
return o
|
|
|
|
@classmethod
|
|
def storage_dumps(cls, object):
|
|
if getattr(object, 'fields', None) is Ellipsis:
|
|
raise RuntimeError('storing a lightweight object is not allowed')
|
|
# use two separate pickle chunks to store the formdef, the first field
|
|
# is everything but fields (excluded via __getstate__) while the second
|
|
# chunk contains the fields.
|
|
return pickle.dumps(object, protocol=2) + pickle.dumps(object.fields, protocol=2)
|
|
|
|
def change_workflow(self, new_workflow, status_mapping=None):
|
|
old_workflow = self.get_workflow()
|
|
|
|
formdata_count = self.data_class().count()
|
|
status_changes = False
|
|
if formdata_count:
|
|
assert status_mapping, 'status mapping is required if there are formdatas'
|
|
|
|
mapping = {'draft': 'draft'}
|
|
if '_all' in status_mapping: # remapping everything to same status
|
|
for formdata in self.data_class().select_iterator(
|
|
[NotEqual('status', 'draft')], ignore_errors=True, itersize=200
|
|
):
|
|
mapping[formdata.status] = 'wf-%s' % status_mapping['_all']
|
|
else:
|
|
assert all(
|
|
status.id in status_mapping for status in old_workflow.possible_status
|
|
), 'a status was not mapped'
|
|
for old_status, new_status in status_mapping.items():
|
|
mapping['wf-%s' % old_status] = 'wf-%s' % new_status
|
|
|
|
status_changes = any(x[0] != x[1] for x in mapping.items())
|
|
if status_changes:
|
|
# if there are status changes, update all formdatas (except drafts)
|
|
from . import sql
|
|
|
|
sql.formdef_remap_statuses(self, mapping)
|
|
|
|
self.workflow = new_workflow
|
|
if new_workflow.has_action('geolocate') and not self.geolocations:
|
|
self.geolocations = {'base': str(_('Geolocation'))}
|
|
removed_functions = set()
|
|
for function_key in list((self.workflow_roles or {}).keys()):
|
|
if function_key not in new_workflow.roles:
|
|
del self.workflow_roles[function_key]
|
|
removed_functions.add(function_key)
|
|
self.store(comment=_('Workflow change'))
|
|
if formdata_count:
|
|
# instruct formdef to update its security rules
|
|
self.data_class().rebuild_security()
|
|
if removed_functions or status_changes:
|
|
# status changes require to update jump markers and change in functions
|
|
# requires to update all formdatas to remove old keys
|
|
mapping_without_prefix = {x.removeprefix('wf-'): y for x, y in mapping.items()}
|
|
for formdata in self.data_class().select_iterator(ignore_errors=True, itersize=200):
|
|
changed = False
|
|
for function_key in removed_functions:
|
|
if function_key in (formdata.workflow_roles or {}):
|
|
del formdata.workflow_roles[function_key]
|
|
changed = True
|
|
if (
|
|
status_changes
|
|
and formdata.workflow_data
|
|
and '_markers_stack' in formdata.workflow_data
|
|
):
|
|
current_markers_stack = formdata.workflow_data['_markers_stack']
|
|
formdata.workflow_data['_markers_stack'] = [
|
|
{'status_id': mapping_without_prefix.get(x['status_id'])}
|
|
for x in current_markers_stack
|
|
]
|
|
if formdata.workflow_data['_markers_stack'] != current_markers_stack:
|
|
changed = True
|
|
if changed:
|
|
formdata.store()
|
|
|
|
def i18n_scan(self):
|
|
location = '%s/%s/' % (self.backoffice_section, self.id)
|
|
yield location, None, self.name
|
|
yield location, None, self.description
|
|
for field in self.fields or []:
|
|
yield from field.i18n_scan(base_location=location + 'fields/')
|
|
|
|
|
|
EmailsDirectory.register(
|
|
'new_user',
|
|
_('Notification of creation to user'),
|
|
enabled=False,
|
|
category=_('Workflow'),
|
|
default_subject=_('New form ({{ form_name }})'),
|
|
default_body=_(
|
|
'''\
|
|
Hello,
|
|
|
|
This mail is a reminder about the form you just submitted.
|
|
{% if form_user %}
|
|
You can consult it with this link: {{ form_url }}
|
|
{% endif %}
|
|
|
|
{% if form_details %}
|
|
For reference, here are the details:
|
|
|
|
{{ form_details }}
|
|
{% endif %}
|
|
'''
|
|
),
|
|
)
|
|
|
|
EmailsDirectory.register(
|
|
'change_user',
|
|
_('Notification of change to user'),
|
|
category=_('Workflow'),
|
|
default_subject=_('Form status change ({{ form_name }})'),
|
|
default_body=_(
|
|
'''\
|
|
Hello,
|
|
|
|
{% if form_status_changed %}
|
|
Status of the form you submitted just changed (from "{{ form_previous_status }}" to "{{ form_status }}").
|
|
{% endif %}
|
|
|
|
{% if form_user %}
|
|
You can consult it with this link: {{ form_url }}
|
|
{% endif %}
|
|
|
|
{% if form_comment %}New comment: {{ form_comment }}{% endif %}
|
|
|
|
{% if form_evolution %}
|
|
{{ form_evolution }}
|
|
{% endif %}
|
|
'''
|
|
),
|
|
)
|
|
|
|
|
|
EmailsDirectory.register(
|
|
'new_receiver',
|
|
_('Notification of creation to receiver'),
|
|
enabled=False,
|
|
category=_('Workflow'),
|
|
default_subject=_('New form ({{ form_name }})'),
|
|
default_body=_(
|
|
'''\
|
|
Hello,
|
|
|
|
A new form has been submitted, you can see it with this link:
|
|
{{ form_url_backoffice }}
|
|
|
|
{% if form_details %}
|
|
For reference, here are the details:
|
|
|
|
{{ form_details }}
|
|
{% endif %}
|
|
'''
|
|
),
|
|
)
|
|
|
|
|
|
EmailsDirectory.register(
|
|
'change_receiver',
|
|
_('Notification of change to receiver'),
|
|
category=_('Workflow'),
|
|
default_subject=_('Form status change ({{ form_name }})'),
|
|
default_body=_(
|
|
'''\
|
|
Hello,
|
|
|
|
A form just changed, you can consult it with this link:
|
|
{{ form_url_backoffice }}
|
|
|
|
{% if form_status_changed %}
|
|
Status of the form just changed (from "{{ form_previous_status }}" to "{{ form_status }}").
|
|
{% endif %}
|
|
|
|
{% if form_comment %}New comment: {{ form_comment }}{% endif %}
|
|
|
|
{% if form_evolution %}
|
|
{{ form_evolution }}
|
|
{% endif %}
|
|
'''
|
|
),
|
|
)
|
|
|
|
Substitutions.register('form_name', category=_('Form'), comment=_('Form Name'))
|
|
|
|
|
|
def clean_drafts(publisher, **kwargs):
|
|
import wcs.qommon.storage as st
|
|
from wcs.carddef import CardDef
|
|
|
|
job = kwargs.pop('job', None)
|
|
for formdef in FormDef.select() + CardDef.select():
|
|
with job.log_long_job(
|
|
'%s %s' % (formdef.xml_root_node, formdef.url_name)
|
|
) if job else contextlib.ExitStack():
|
|
removal_date = datetime.date.today() - datetime.timedelta(days=formdef.get_drafts_lifespan())
|
|
for formdata in formdef.data_class().select(
|
|
[st.Equal('status', 'draft'), st.Less('receipt_time', removal_date.timetuple())]
|
|
):
|
|
formdata.remove_self()
|
|
|
|
|
|
def clean_unused_files(publisher, **kwargs):
|
|
unused_files_behaviour = publisher.get_site_option('unused-files-behaviour')
|
|
if unused_files_behaviour not in ('move', 'remove'):
|
|
return
|
|
|
|
known_filenames = set()
|
|
known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'uploads/*'))])
|
|
known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'attachments/*/*'))])
|
|
|
|
def accumulate_filenames():
|
|
from wcs.applications import Application
|
|
from wcs.carddef import CardDef
|
|
|
|
for formdef in FormDef.select(ignore_migration=True) + CardDef.select(ignore_migration=True):
|
|
for option_data in (formdef.workflow_options or {}).values():
|
|
if is_upload(option_data):
|
|
yield option_data.get_fs_filename()
|
|
for formdata in formdef.data_class().select_iterator(ignore_errors=True, itersize=200):
|
|
for field_data in formdata.get_all_file_data(with_history=True):
|
|
if is_upload(field_data):
|
|
yield field_data.get_fs_filename()
|
|
elif is_attachment(field_data):
|
|
yield field_data.filename
|
|
for user in publisher.user_class.select():
|
|
for field_data in (user.form_data or {}).values():
|
|
if is_upload(field_data):
|
|
yield field_data.get_fs_filename()
|
|
|
|
for application in Application.select():
|
|
if is_upload(application.icon):
|
|
yield application.icon.get_fs_filename()
|
|
|
|
used_filenames = set()
|
|
for filename in accumulate_filenames():
|
|
if not filename: # alternative storage
|
|
continue
|
|
if not os.path.isabs(filename):
|
|
filename = os.path.join(publisher.app_dir, filename)
|
|
used_filenames.add(filename)
|
|
|
|
unused_filenames = known_filenames - used_filenames
|
|
for filename in unused_filenames:
|
|
try:
|
|
if unused_files_behaviour == 'move':
|
|
new_filename = os.path.join(
|
|
publisher.app_dir, 'unused-files', filename[len(publisher.app_dir) + 1 :]
|
|
)
|
|
if os.path.exists(new_filename):
|
|
os.unlink(filename)
|
|
else:
|
|
new_dirname = os.path.dirname(new_filename)
|
|
if not os.path.exists(new_dirname):
|
|
os.makedirs(new_dirname)
|
|
os.rename(filename, new_filename)
|
|
else:
|
|
os.unlink(filename)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def update_storage_all_formdefs(publisher, **kwargs):
|
|
from wcs.carddef import CardDef
|
|
|
|
for formdef in itertools.chain(FormDef.select(), CardDef.select()):
|
|
formdef.update_storage()
|
|
|
|
|
|
def get_formdefs_of_all_kinds(**kwargs):
|
|
from wcs.admin.settings import UserFieldsFormDef
|
|
from wcs.blocks import BlockDef
|
|
from wcs.carddef import CardDef
|
|
from wcs.wf.form import FormWorkflowStatusItem
|
|
from wcs.workflows import Workflow
|
|
|
|
select_kwargs = {
|
|
'ignore_errors': True,
|
|
'ignore_migration': True,
|
|
}
|
|
select_kwargs.update(kwargs)
|
|
formdefs = [UserFieldsFormDef()]
|
|
formdefs += FormDef.select(**select_kwargs)
|
|
formdefs += BlockDef.select(**select_kwargs)
|
|
formdefs += CardDef.select(**select_kwargs)
|
|
for workflow in Workflow.select(**select_kwargs):
|
|
for status in workflow.possible_status:
|
|
for item in status.items:
|
|
if isinstance(item, FormWorkflowStatusItem) and item.formdef:
|
|
formdefs.append(item.formdef)
|
|
if workflow.variables_formdef:
|
|
formdefs.append(workflow.variables_formdef)
|
|
if workflow.backoffice_fields_formdef:
|
|
formdefs.append(workflow.backoffice_fields_formdef)
|
|
return formdefs
|
|
|
|
|
|
def register_cronjobs():
|
|
# every night:
|
|
# * update storage of all formdefs
|
|
get_publisher_class().register_cronjob(
|
|
CronJob(update_storage_all_formdefs, name='update_storage', hours=[2], minutes=[0])
|
|
)
|
|
# * and look for:
|
|
# * expired drafts
|
|
get_publisher_class().register_cronjob(CronJob(clean_drafts, name='clean_drafts', hours=[2], minutes=[0]))
|
|
# * and unused files
|
|
get_publisher_class().register_cronjob(
|
|
CronJob(clean_unused_files, name='clean_unused_files', hours=[2], minutes=[0])
|
|
)
|
|
|
|
|
|
class UpdateDigestAfterJob(AfterJob):
|
|
label = _('Updating digests')
|
|
|
|
def __init__(self, formdefs):
|
|
super().__init__(formdefs=[(x.__class__, x.id) for x in formdefs])
|
|
|
|
def execute(self):
|
|
for formdef_class, formdef_id in self.kwargs['formdefs']:
|
|
formdef = formdef_class.get(formdef_id)
|
|
for formdata in formdef.data_class().select(order_by='id'):
|
|
formdata.store()
|
|
|
|
|
|
class UpdateStatisticsDataAfterJob(UpdateDigestAfterJob):
|
|
label = _('Updating statistics data')
|