wcs/wcs/formdef.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

2206 lines
82 KiB
Python
Raw Normal View History

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import base64
import collections
import contextlib
import copy
import datetime
2019-07-17 11:15:50 +02:00
import glob
import itertools
import json
import os
import pickle
import sys
import time
import types
import xml.etree.ElementTree as ET
from operator import itemgetter
from django.utils.encoding import force_bytes, force_text
from quixote import get_publisher, get_session
from quixote.html import htmltext
from quixote.http_request import Upload
2005-05-19 23:26:55 +02:00
from . import data_sources, fields
2019-09-29 20:53:23 +02:00
from .categories import Category
from .formdata import FormData
from .qommon import PICKLE_KWARGS, _, force_str, get_cfg, pgettext_lazy
from .qommon.admin.emails import EmailsDirectory
from .qommon.afterjobs import AfterJob
2019-09-29 20:53:23 +02:00
from .qommon.cron import CronJob
from .qommon.form import Form, HtmlWidget, UploadedFile
from .qommon.misc import JSONEncoder, get_as_datetime, is_attachment, is_upload, simplify, xml_node_text
2019-09-29 20:53:23 +02:00
from .qommon.publisher import get_publisher_class
from .qommon.storage import Equal, StorableObject, classonlymethod, fix_key
2019-09-29 20:53:23 +02:00
from .qommon.substitution import Substitutions
from .qommon.template import Template
2021-03-09 15:35:21 +01:00
from .roles import logged_users_role
DRAFTS_DEFAULT_LIFESPAN = 100 # days
if not hasattr(types, 'ClassType'):
types.ClassType = type
class FormdefImportError(Exception):
def __init__(self, msg, msg_args=None, details=None):
self.msg = msg
self.msg_args = msg_args or ()
self.details = details
class UnknownReferencedErrorMixin:
def __init__(self, msg, msg_args=None, details=None):
self.msg = msg
self.msg_args = msg_args or ()
self._details = details
@property
def details(self):
if not self._details:
return None
details = []
for kind in sorted(self._details.keys()):
details.append('%s: %s' % (kind, ', '.join(sorted(self._details[kind]))))
return '; '.join(details)
def render(self):
result = htmltext('<ul>')
for kind in sorted(self._details.keys()):
result += htmltext('<li>%s: %s</li>' % (kind, ', '.join(sorted(self._details[kind]))))
result += htmltext('</ul>')
return result
class FormdefImportUnknownReferencedError(UnknownReferencedErrorMixin, FormdefImportError):
pass
class FormdefImportRecoverableError(FormdefImportError):
pass
class FormDefDoesNotExist(AttributeError):
error_message = _('No such form: %s')
def get_error_message(self):
return self.error_message % self
class FormField:
# only used to unpickle form fields from older (<200603) versions
2006-03-02 21:00:37 +01:00
def __setstate__(self, dict):
type = dict['type']
self.real_field = fields.get_field_class_by_type(type)(**dict)
def lax_int(s):
try:
return int(s)
except (ValueError, TypeError):
return -1
class FormDef(StorableObject):
# noqa pylint: disable=too-many-public-methods
_names = 'formdefs'
_indexes = ['url_name']
_hashed_indexes = ['backoffice_submission_roles']
backoffice_class = 'wcs.admin.forms.FormDefPage'
data_sql_prefix = 'formdata'
pickle_module_name = 'formdef'
xml_root_node = 'formdef'
backoffice_section = 'forms'
verbose_name = _('Form')
verbose_name_plural = _('Forms')
item_name = pgettext_lazy('item', 'form')
item_name_plural = pgettext_lazy('item', 'forms')
name = None
description = None
keywords = None
url_name = None
internal_identifier = None # used to have a stable pickle object class name
table_name = None # for SQL only
fields = None
category_id = None
workflow_id = None
workflow_options = None
workflow_roles = None
roles = None
required_authentication_contexts = None
backoffice_submission_roles = None
discussion = False
confirmation = True
detailed_emails = True
disabled = False
only_allow_one = False
enable_tracking_codes = False
tracking_code_verify_fields = None
disabled_redirection = None
2011-05-17 13:59:15 +02:00
always_advertise = False
publication_date = None
expiration_date = None
has_captcha = False
skip_from_360_view = False
include_download_all_button = False
appearance_keywords = None
digest_templates = None
lateral_template = None
submission_lateral_template = None
drafts_lifespan = None
2020-11-16 15:21:23 +01:00
user_support = None
geolocations = None
max_field_id = None
# store reverse relations
reverse_relations = None
# store fields in a separate pickle chunk
lightweight = True
# prefixes for formdata variables
var_prefixes = ['form']
# declarations for serialization
TEXT_ATTRIBUTES = [
'name',
'url_name',
'description',
'keywords',
'publication_date',
'expiration_date',
'internal_identifier',
'disabled_redirection',
'appearance_keywords',
'lateral_template',
'submission_lateral_template',
'drafts_lifespan',
'user_support',
]
BOOLEAN_ATTRIBUTES = [
'discussion',
'detailed_emails',
'disabled',
'only_allow_one',
'enable_tracking_codes',
'confirmation',
'always_advertise',
'include_download_all_button',
'has_captcha',
'skip_from_360_view',
]
2020-11-02 17:40:49 +01:00
category_class = Category
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields = []
def __eq__(self, other):
return bool(
isinstance(other, FormDef) and self.xml_root_node == other.xml_root_node and self.id == other.id
)
def migrate(self):
changed = False
if self.__dict__.get('fields') is Ellipsis:
# don't run migration on lightweight objects
return
if self.max_field_id is None and self.fields:
self.max_field_id = max(lax_int(x.id) for x in self.fields)
changed = True
if isinstance(self.category_id, int):
self.category_id = str(self.category_id)
changed = True
if isinstance(self.workflow_id, int):
self.workflow_id = str(self.workflow_id)
changed = True
if self.roles:
for role in self.roles:
if isinstance(role, int):
self.roles = [str(x) for x in self.roles]
changed = True
break
if self.workflow_roles:
workflow_roles_list = self.workflow_roles.items()
for role_id in self.workflow_roles.values():
if isinstance(role_id, int):
self.workflow_roles = {x: str(y) for x, y in workflow_roles_list}
changed = True
break
if not self.internal_identifier:
self.internal_identifier = self.url_name
changed = True
for f in self.fields or []:
changed |= f.migrate()
if changed:
self.store(comment=_('Automatic update'), snapshot_store_user=False)
@classmethod
def remove_object(cls, id):
super().remove_object(id)
if cls == FormDef:
# recreate global views so they don't reference formdata from
# deleted formefs
from . import sql
2021-02-04 10:37:40 +01:00
conn, cur = sql.get_connection_and_cursor()
sql.clean_global_views(conn, cur)
conn.commit()
cur.close()
@property
def data_class_name(self):
return '_wcs_%s' % self.url_name.title()
2013-03-29 15:47:10 +01:00
def data_class(self, mode=None):
2021-03-22 11:14:42 +01:00
if 'formdef' not in sys.modules:
sys.modules['formdef'] = sys.modules[__name__]
if hasattr(sys.modules['formdef'], self.data_class_name):
data_class = getattr(sys.modules['formdef'], self.data_class_name)
# only use existing data class if it has a reference to this actual
# formdef
if data_class._formdef is self:
return data_class
if (not mode == 'files') or mode == 'sql':
from . import sql
2021-02-04 10:37:40 +01:00
2013-03-29 15:47:10 +01:00
table_name = sql.get_formdef_table_name(self)
cls = types.ClassType(
self.data_class_name, (sql.SqlFormData,), {'_formdef': self, '_table_name': table_name}
)
else:
cls = types.ClassType(
self.data_class_name,
(FormData,),
{'_names': 'form-%s' % self.internal_identifier, '_formdef': self},
)
setattr(sys.modules['formdef'], self.data_class_name, cls)
setattr(sys.modules['wcs.formdef'], self.data_class_name, cls)
return cls
def get_new_field_id(self):
if self.max_field_id is None:
field_id = 1
else:
field_id = self.max_field_id + 1
self.max_field_id = field_id
return str(field_id)
2013-02-22 11:30:08 +01:00
def get_new_url_name(self):
new_url_name = simplify(self.name)[:250]
2013-02-22 11:30:08 +01:00
base_new_url_name = new_url_name
suffix_no = 0
while True:
try:
obj = self.get_on_index(new_url_name, 'url_name', ignore_migration=True)
2013-02-22 11:30:08 +01:00
except KeyError:
break
if obj.id == self.id:
2013-02-22 11:30:08 +01:00
break
suffix_no += 1
new_url_name = '%s-%s' % (base_new_url_name, suffix_no)
return new_url_name
def get_new_internal_identifier(self):
new_internal_identifier = simplify(self.name)
base_new_internal_identifier = new_internal_identifier
suffix_no = 0
while True:
try:
formdef = self.get_by_urlname(new_internal_identifier, ignore_migration=True)
except KeyError:
break
if formdef.id == self.id:
break
suffix_no += 1
new_internal_identifier = '%s-%s' % (base_new_internal_identifier, suffix_no)
return new_internal_identifier
@classmethod
def get_new_id(cls, create=False):
id = super().get_new_id(create=False)
id = cls.get_sql_new_id(id_start=int(id))
if create:
objects_dir = cls.get_objects_dir()
object_filename = os.path.join(objects_dir, fix_key(id))
try:
fd = os.open(object_filename, os.O_CREAT | os.O_EXCL)
except OSError:
return cls.get_new_id(create=True)
os.close(fd)
return str(id)
@classmethod
def get_sql_new_id(cls, id_start):
from . import sql
2021-02-04 10:37:40 +01:00
return sql.get_formdef_new_id(id_start=id_start)
def get_order_by(self, order_by):
if not order_by:
return order_by
direction = ''
if order_by.startswith('-'):
order_by = order_by[1:]
direction = '-'
for field in self.iter_fields(include_block_fields=True):
if getattr(field, 'block_field', None):
if field.key == 'items':
# not yet
continue
if order_by not in [field.contextual_varname, 'f%s' % field.contextual_id]:
continue
if field.contextual_varname == order_by:
order_by = "f%s" % field.contextual_id
if getattr(field, 'block_field', None) and 'f%s' % field.contextual_id == order_by:
# field of block field, sort on the first element
order_by = "f%s->'data'->0->>'%s%s'" % (
field.block_field.id,
field.id,
"_display" if field.store_display_value else "",
)
elif field.store_display_value:
order_by += "_display"
break
return '%s%s' % (direction, order_by)
def has_user_access(self, user):
if get_publisher().get_backoffice_root().is_global_accessible(self.backoffice_section):
return True
if not user:
return False
if not self.category_id:
return False
management_roles = {x.id for x in getattr(self.category, 'management_roles') or []}
user_roles = set(user.get_roles())
return management_roles.intersection(user_roles)
@classonlymethod
def wipe(cls):
super().wipe()
cls.sql_wipe()
@classmethod
def sql_wipe(cls):
from . import sql
2021-02-04 10:37:40 +01:00
sql.formdef_wipe()
def store(self, comment=None, snapshot_store_user=True, *args, **kwargs):
assert not self.is_readonly()
if self.url_name is None:
# set url name if it's not yet there
self.url_name = self.get_new_url_name()
object_only = kwargs.pop('object_only', False)
new_internal_identifier = self.get_new_internal_identifier()
if not self.internal_identifier:
self.internal_identifier = new_internal_identifier
if new_internal_identifier != self.internal_identifier:
# title changed, internal identifier will be changed only if
# the formdef is currently being imported (self.id is None)
# or if there are not yet any submitted forms (or if site
# is using the SQL storage as internal identifier is not used
# in that mode.
if self.id is None or not self.data_class().exists():
self.internal_identifier = new_internal_identifier
if not object_only:
self.update_relations()
StorableObject.store(self, *args, **kwargs)
if object_only:
return
2020-08-10 13:10:04 +02:00
if get_publisher().snapshot_class:
get_publisher().snapshot_class.snap(
instance=self, comment=comment, store_user=snapshot_store_user
)
2021-02-04 10:37:40 +01:00
if get_publisher().has_postgresql_config():
self.update_storage()
self.store_related_custom_views()
def update_storage(self):
from . import sql
actions = sql.do_formdef_tables(self, rebuild_views=True, rebuild_global_views=True)
if actions:
cls = self.data_class()
for action in actions:
getattr(cls, action)()
def update_relations(self):
from wcs.carddef import CardDef
self_ref = '%s:%s' % (self.xml_root_node, self.url_name)
self_relations_by_ref = self.build_relations_by_ref()
reverse_relations = []
# cross each formdef and cardef and check relations
for objdef in itertools.chain(
FormDef.select(ignore_errors=True, ignore_migration=True),
CardDef.select(ignore_errors=True, ignore_migration=True),
):
objdef_ref = '%s:%s' % (objdef.xml_root_node, objdef.url_name)
if objdef.xml_root_node == self.xml_root_node and objdef.id == self.id:
# don't build relations twice
objdef_relations_by_ref = self_relations_by_ref
else:
objdef_relations_by_ref = objdef.build_relations_by_ref()
reverse_relations += objdef_relations_by_ref.get(self_ref, [])
old_objdef_reverse_relations = copy.deepcopy(objdef.reverse_relations)
# remove relations with self in objdef's reverse_relations
new_objdef_reverse_relations = [
r for r in (objdef.reverse_relations or []) if r['obj'] != self_ref
]
# and update objdef's reverse_relations from self_relations_by_ref
new_objdef_reverse_relations += self_relations_by_ref.get(objdef_ref, [])
# sort objectdef's reverse_relations
new_objdef_reverse_relations = sorted(
new_objdef_reverse_relations, key=itemgetter('obj', 'varname', 'type')
)
if old_objdef_reverse_relations != new_objdef_reverse_relations:
objdef.reverse_relations = new_objdef_reverse_relations
objdef.store(object_only=True)
# sort self's reverse_relations and set
self.reverse_relations = sorted(reverse_relations, key=itemgetter('obj', 'varname', 'type'))
def build_relations_by_ref(self):
# build relations to other carddefs, to be stored in some object reverse field
self_ref = '%s:%s' % (self.xml_root_node, self.url_name)
relations_by_ref = collections.defaultdict(list)
def _check_field(field):
data_source = getattr(field, 'data_source', None)
if not data_source or not data_source.get('type', '').startswith('carddef:'):
return
# reverse relation of data_source['type'] to this object
obj_ref = ':'.join(data_source['type'].split(':')[:2]) # remove possible custom-view
relations_by_ref[obj_ref].append(
{
'varname': field.contextual_varname or '',
'type': field.key,
'obj': self_ref,
}
)
for field in self.iter_fields(include_block_fields=True):
if field.key in ['item', 'items', 'computed']:
_check_field(field)
# remove duplicated items
return {
k: list(map(dict, {tuple(sorted(d.items())) for d in v})) for k, v in relations_by_ref.items()
}
def store_related_custom_views(self):
for view in getattr(self, '_custom_views', []):
if not view.id:
existing_views = get_publisher().custom_view_class.select(
[
Equal('formdef_type', self.xml_root_node),
Equal('formdef_id', str(self.id)),
Equal('visibility', view.visibility),
Equal('slug', view.slug),
]
)
if existing_views:
view.id = existing_views[0].id
view.formdef = self
view.store()
def get_all_fields(self):
return (self.fields or []) + self.workflow.get_backoffice_fields()
def iter_fields(self, include_block_fields=False):
def _iter_fields(fields, block_field=None):
for field in fields:
# add contextual_id/contextual_varname attributes
# they are id/varname for normal fields
# but in case of blocks they are concatenation of block id/varname + field id/varname
field.contextual_id = field.id
field.contextual_varname = None
if block_field:
field.block_field = block_field
field.contextual_id = '%s-%s' % (field.block_field.id, field.id)
if field.varname and field.block_field.varname:
field.contextual_varname = '%s_%s' % (
field.block_field.varname,
field.varname,
)
else:
field.contextual_varname = field.varname
yield field
if field.key == 'block' and include_block_fields:
try:
field.block # load block
except KeyError:
# blockdef not found
continue
yield from _iter_fields(field.block.fields, block_field=field)
field._block = None # reset cache
yield from _iter_fields(self.get_all_fields())
def get_widget_fields(self):
return [field for field in self.fields or [] if isinstance(field, fields.WidgetField)]
@property
def default_digest_template(self):
return (self.digest_templates or {}).get('default')
def get_category(self):
if self.category_id:
try:
2020-11-02 17:40:49 +01:00
return self.category_class.get(self.category_id)
except KeyError:
return None
else:
return None
def set_category(self, category):
if category:
self.category_id = category.id
elif self.category_id:
self.category_id = None
2021-02-04 10:37:40 +01:00
category = property(get_category, set_category)
def get_drafts_lifespan(self):
return int(self.drafts_lifespan or DRAFTS_DEFAULT_LIFESPAN)
_workflow = None
2021-02-04 10:37:40 +01:00
def get_workflow(self):
if self._workflow:
return self._workflow
from wcs.workflows import Workflow
2021-02-04 10:37:40 +01:00
if self.workflow_id:
try:
workflow = Workflow.get(self.workflow_id)
except KeyError:
return Workflow.get_unknown_workflow()
self._workflow = self.get_workflow_with_options(workflow)
return self._workflow
else:
return self.get_default_workflow()
@classmethod
def get_default_workflow(cls):
from wcs.workflows import Workflow
2021-02-04 10:37:40 +01:00
return Workflow.get_default_workflow()
def get_workflow_with_options(self, workflow):
# this needs to be kept in sync with admin/forms.ptl,
# FormDefPage::workflow
if not self.workflow_options:
return workflow
for status in workflow.possible_status:
for item in status.items:
prefix = '%s*%s*' % (status.id, item.id)
for parameter in item.get_parameters():
value = self.workflow_options.get(prefix + parameter)
if value:
setattr(item, parameter, value)
return workflow
def set_workflow(self, workflow):
if workflow and workflow.id not in ['_carddef_default', '_default']:
self.workflow_id = workflow.id
self._workflow = workflow
elif self.workflow_id:
self.workflow_id = None
self._workflow = None
2021-02-04 10:37:40 +01:00
workflow = property(get_workflow, set_workflow)
2021-12-31 14:59:14 +01:00
def get_dependencies(self):
yield self.category
if self.workflow_id and self.workflow.id not in ['_carddef_default', '_default']:
2021-12-31 14:59:14 +01:00
yield self.workflow
for field in self.fields or []:
yield from field.get_dependencies()
role_class = get_publisher().role_class
for role_id in itertools.chain(self.roles or [], self.backoffice_submission_roles or []):
yield role_class.get(role_id, ignore_errors=True)
for role_id in (self.workflow_roles or {}).values():
yield role_class.get(role_id, ignore_errors=True)
2021-12-31 14:59:14 +01:00
@property
def keywords_list(self):
if not self.keywords:
return []
return [x.strip() for x in self.keywords.split(',')]
@property
def appearance_keywords_list(self):
if not get_publisher().has_site_option('formdef-appearance-keywords'):
return []
if not self.appearance_keywords:
return []
return [x.strip() for x in self.appearance_keywords.split()]
def get_variable_options(self):
variables = {}
if not self.workflow.variables_formdef:
return variables
if not self.workflow_options:
return variables
for field in self.workflow.variables_formdef.fields:
if not field.varname:
continue
option_name = 'form_option_' + field.varname
variables[option_name] = self.workflow_options.get(field.varname)
if field.store_display_value:
if '%s_display' % field.varname in self.workflow_options:
variables[option_name + '_raw'] = variables[option_name]
variables[option_name] = self.workflow_options.get('%s_display' % field.varname)
if field.store_structured_value:
if '%s_structured' % field.varname in self.workflow_options:
variables[option_name + '_structured'] = self.workflow_options.get(
'%s_structured' % field.varname
)
return variables
def get_variable_options_for_form(self):
variables = {}
if not self.workflow.variables_formdef:
return variables
if not self.workflow_options:
return {}
for field in self.workflow.variables_formdef.fields:
if not field.varname:
continue
variables[str(field.id)] = self.workflow_options.get(field.varname)
return variables
def set_variable_options(self, form):
data = self.workflow.variables_formdef.get_data(form)
variables = {}
for field in self.workflow.variables_formdef.fields:
if not field.varname:
continue
variables[field.varname] = data.get(field.id)
if field.store_display_value:
variables[field.varname + '_display'] = data.get(field.id + '_display')
if field.store_structured_value:
variables[field.varname + '_structured'] = data.get(field.id + '_structured')
if not self.workflow_options:
self.workflow_options = {}
self.workflow_options.update(variables)
@classmethod
def get_by_urlname(cls, url_name, ignore_migration=False, ignore_errors=False):
return cls.get_on_index(
url_name, 'url_name', ignore_migration=ignore_migration, ignore_errors=ignore_errors
)
get_by_slug = get_by_urlname
@property
def slug(self):
return self.url_name
@slug.setter
def slug(self, value):
self.url_name = value
def get_url(self, backoffice=False, preview=False, include_category=False, language=None):
2007-01-18 16:43:22 +01:00
if backoffice:
base_url = get_publisher().get_backoffice_url() + '/management'
elif preview:
base_url = get_publisher().get_frontoffice_url() + '/preview'
else:
base_url = get_publisher().get_frontoffice_url()
if language and get_publisher().has_i18n_enabled():
base_url += '/' + language
if include_category and self.category_id:
return '%s/%s/%s/' % (base_url, self.category.slug, self.url_name)
2007-01-18 16:43:22 +01:00
return '%s/%s/' % (base_url, self.url_name)
def get_api_url(self):
base_url = get_publisher().get_frontoffice_url()
return '%s/api/forms/%s/' % (base_url, self.url_name)
def get_admin_url(self):
base_url = get_publisher().get_backoffice_url()
return '%s/forms/%s/' % (base_url, self.id)
def get_field_admin_url(self, field):
return self.get_admin_url() + 'fields/%s/' % field.id
def get_backoffice_submission_url(self):
base_url = get_publisher().get_backoffice_url() + '/submission'
return '%s/%s/' % (base_url, self.url_name)
def get_display_id_format(self):
return '[formdef_id]-[form_number_raw]'
2005-05-19 23:26:55 +02:00
def get_submission_lateral_block(self):
context = get_publisher().substitutions.get_context_variables(mode='lazy')
if self.submission_lateral_template is None:
new_value = None
else:
try:
new_value = Template(self.submission_lateral_template, autoescape=False).render(context)
except Exception as e:
get_publisher().record_error(
_('Could not render submission lateral template (%s)' % e),
formdef=self,
exception=e,
)
return None
return new_value
def create_form(self, page=None, displayed_fields=None, transient_formdata=None):
form = Form(enctype="multipart/form-data", use_tokens=False)
if self.appearance_keywords:
form.attrs['class'] = 'quixote %s' % self.appearance_keywords
if self.keywords:
form.attrs['data-keywords'] = ' '.join(self.keywords_list)
form.ERROR_NOTICE = _(
'There were errors processing the form and '
'you cannot go to the next page. Do '
'check below that you filled all fields correctly.'
)
self.add_fields_to_form(
form, page=page, displayed_fields=displayed_fields, transient_formdata=transient_formdata
)
return form
def get_computed_fields_from_page(self, page):
on_page = page is None
for field in self.fields:
if field.key == 'page':
if on_page:
break
if page.id == field.id:
on_page = True
continue
if not on_page:
continue
if field.key == 'computed':
yield field
def add_fields_to_form(
self,
form,
page=None,
displayed_fields=None,
form_data=None, # a dictionary, to fill fields
transient_formdata=None,
): # a FormData
on_page = page is None
hidden_varnames = set()
for field in self.fields:
field.formdef = self
2006-03-02 23:30:40 +01:00
if field.type == 'page':
if on_page:
2006-03-02 23:30:40 +01:00
break
if page.id == field.id:
on_page = True
2006-03-02 23:30:40 +01:00
continue
if not on_page:
2006-03-02 23:30:40 +01:00
continue
2018-07-28 20:55:26 +02:00
visible = field.is_visible(form_data, self)
if not visible:
if not field.has_live_conditions(self, hidden_varnames=hidden_varnames):
# ignore field.varname when checking later conditions for liveness
if field.varname:
hidden_varnames.add(field.varname)
2018-07-28 20:55:26 +02:00
# no live conditions so field can be skipped
continue
if isinstance(displayed_fields, list):
2006-03-02 23:30:40 +01:00
displayed_fields.append(field)
value = None
if form_data:
value = form_data.get(field.id)
if not field.add_to_form:
continue
2018-07-28 20:55:26 +02:00
widget = field.add_to_form(form, value)
widget.is_hidden = not (visible)
widget.field = field
if transient_formdata and not widget.is_hidden:
transient_formdata.data.update(self.get_field_data(field, widget))
# invalidate cache as comment fields (and other things?) may
# have accessed variables in non-lazy mode and caused a cache
# with now-obsolete values.
get_publisher().substitutions.invalidate_cache()
widget._parsed = False
widget.error = None
2005-05-19 23:26:55 +02:00
2006-03-03 17:26:53 +01:00
def get_page(self, page_no):
return [x for x in self.fields if x.type == 'page'][page_no]
def page_count(self):
return len([x for x in self.fields if x.type == 'page']) or 1
def create_view_form(self, dict=None, use_tokens=True, visible=True):
dict = dict or {}
form = Form(enctype='multipart/form-data', use_tokens=use_tokens)
if not visible:
form.attrs['style'] = 'display: none;'
if self.keywords:
form.attrs['data-keywords'] = ' '.join(self.keywords_list)
form_fields = self.fields
if form_fields and form_fields[0].type != 'page':
# add fake initial page in case it's missing
form_fields = [fields.PageField(label='', type='page')] + form_fields
# 1st pass to group fields on different pages
pages = []
current_page = {}
for field in form_fields:
2006-03-03 17:26:53 +01:00
if field.type == 'page':
current_page = {'page': field, 'fields': []}
current_page['disabled'] = not field.is_visible(dict, self)
pages.append(current_page)
2006-03-03 17:26:53 +01:00
continue
if current_page['disabled']:
continue
if field.type == 'title' and (
not current_page['fields'] and current_page['page'].label == field.label
):
# don't include first title of a page if that title has the
# same text as the page.
continue
2009-01-15 10:50:37 +01:00
if field.type in ('title', 'subtitle', 'comment') and not field.include_in_validation_page:
# don't render field that wouldn't be displayed.
continue
2018-07-28 17:02:53 +02:00
if not field.is_visible(dict, self):
continue
current_page['fields'].append(field)
# 2nd pass to create view form
for page in pages:
visible_contents = False
if page['fields'] and any(x.include_in_validation_page for x in page['fields']):
visible_contents = True
form.widgets.append(HtmlWidget(htmltext('<div class="page">')))
if page['page'].label:
form.widgets.append(HtmlWidget(htmltext('<h3>%s</h3>') % page['page'].label))
form.widgets.append(HtmlWidget(htmltext('<div>')))
for field in page['fields']:
value = dict.get(field.id)
if not field.add_to_view_form:
continue
if not field.include_in_validation_page:
form.widgets.append(HtmlWidget(htmltext('<div style="display: none;">')))
field.add_to_view_form(form, value)
form.widgets.append(HtmlWidget(htmltext('</div>')))
else:
field.add_to_view_form(form, value)
if visible_contents:
form.widgets.append(HtmlWidget(htmltext('</div></div>')))
2005-05-19 23:26:55 +02:00
return form
def set_live_condition_sources(self, form, fields):
live_condition_fields = {}
fields_ids = {str(x.id) for x in fields}
for field in self.iter_fields(include_block_fields=True):
if (hasattr(field, 'block_field') and str(field.block_field.id) not in fields_ids) or (
not hasattr(field, 'block_field') and str(field.id) not in fields_ids
):
continue
if field.condition:
field.varnames = field.get_condition_varnames(formdef=self)
for varname in field.varnames:
2021-03-22 11:14:42 +01:00
if varname not in live_condition_fields:
live_condition_fields[varname] = []
live_condition_fields[varname].append(field)
if field.key in ('item', 'items') and field.data_source:
data_source = data_sources.get_object(field.data_source)
if data_source.type not in ('json', 'geojson') and not data_source.type.startswith(
'carddef:'
):
continue
varnames = data_source.get_referenced_varnames(formdef=self)
for varname in varnames:
2021-03-22 11:14:42 +01:00
if varname not in live_condition_fields:
live_condition_fields[varname] = []
live_condition_fields[varname].append(field)
if field.prefill and field.prefill.get('type') == 'string':
for varname in field.get_referenced_varnames(
formdef=self, value=field.prefill.get('value', '')
):
if varname not in live_condition_fields:
live_condition_fields[varname] = []
live_condition_fields[varname].append(field)
if field.key == 'comment':
for varname in field.get_referenced_varnames(formdef=self, value=field.label):
2021-03-22 11:14:42 +01:00
if varname not in live_condition_fields:
live_condition_fields[varname] = []
live_condition_fields[varname].append(field)
for field in fields:
if field.varname in live_condition_fields:
widget = form.get_widget('f%s' % field.id)
if widget:
widget.live_condition_source = True
widget.live_condition_fields = live_condition_fields[field.varname]
@classmethod
def get_field_data(cls, field, widget, raise_on_error=False):
d = {}
d[field.id] = widget.parse()
if d.get(field.id) is not None and field.convert_value_from_str:
d[field.id] = field.convert_value_from_str(d[field.id])
field.set_value(d, d[field.id], raise_on_error=raise_on_error)
if getattr(widget, 'cleanup', None):
widget.cleanup()
return d
def get_data(self, form, raise_on_error=False):
2005-05-19 23:26:55 +02:00
d = {}
for field in self.fields:
widget = form.get_widget('f%s' % field.id)
if widget:
d.update(self.get_field_data(field, widget, raise_on_error=raise_on_error))
2005-05-19 23:26:55 +02:00
return d
def export_to_json(self, include_id=False, indent=None, with_user_fields=False):
from wcs.carddef import CardDef
charset = get_publisher().site_charset
root = {}
root['name'] = force_text(self.name, charset)
if include_id and self.id:
root['id'] = str(self.id)
if self.category:
root['category'] = force_text(self.category.name, charset)
root['category_id'] = str(self.category.id)
if self.workflow:
root['workflow'] = self.workflow.get_json_export_dict(include_id=include_id)
if self.max_field_id is None and self.fields:
self.max_field_id = max(lax_int(x.id) for x in self.fields)
more_attributes = ['tracking_code_verify_fields']
if self.max_field_id:
more_attributes.append('max_field_id')
for attribute in self.TEXT_ATTRIBUTES + self.BOOLEAN_ATTRIBUTES + more_attributes:
if not hasattr(self, attribute):
continue
root[attribute] = getattr(self, attribute)
if isinstance(root[attribute], time.struct_time):
root[attribute] = time.strftime('%Y-%m-%dT%H:%M:%S', root[attribute])
root['fields'] = []
if self.fields:
for field in self.fields:
root['fields'].append(field.export_to_json(include_id=include_id))
if self.geolocations:
root['geolocations'] = self.geolocations.copy()
if self.workflow_options:
root['options'] = self.workflow_options.copy()
for k, v in list(root['options'].items()):
# convert time.struct_time to strings as python3 would
# serialize it as tuple.
if isinstance(v, time.struct_time):
root['options'][k] = time.strftime('%Y-%m-%dT%H:%M:%S', v)
if self.required_authentication_contexts:
root['required_authentication_contexts'] = self.required_authentication_contexts[:]
if isinstance(self, CardDef):
all_carddefs = CardDef.select(ignore_errors=True)
all_carddefs = [c for c in all_carddefs if c]
all_carddefs_by_slug = {c.url_name: c for c in all_carddefs}
def get_field_label(obj, field_varname):
card_slug = obj.split(':')[1]
carddef = all_carddefs_by_slug.get(card_slug)
if not carddef:
return
for field in carddef.iter_fields(include_block_fields=True):
if field.contextual_varname == field_varname:
if getattr(field, 'block_field', None):
return '%s - %s' % (field.block_field.label, field.label)
return field.label
card_relations = []
current_objdef_ref = '%s:%s' % (self.xml_root_node, self.url_name)
for objdef, relations in self.build_relations_by_ref().items():
if not objdef.startswith('carddef:'):
continue
try:
CardDef.get_by_urlname(objdef.split(':')[1])
except KeyError:
continue
for relation in relations:
if not relation['varname']:
continue
card_relations.append(
{
'varname': relation['varname'],
'label': get_field_label(current_objdef_ref, relation['varname']),
'type': relation['type'],
'obj': objdef,
'reverse': False,
}
)
for relation in self.reverse_relations or []:
if not relation['obj'].startswith('carddef:'):
continue
if not relation['varname']:
continue
rel = relation.copy()
rel.update(
{
'reverse': True,
'label': get_field_label(relation['obj'], relation['varname']),
}
)
card_relations.append(rel)
root['relations'] = sorted(card_relations, key=itemgetter('varname'))
if with_user_fields:
root['user'] = {
'fields': [
{
'varname': 'name',
'label': _('Full name'),
'type': 'string',
},
{
'varname': 'email',
'label': _('Email'),
'type': 'email',
},
]
}
user_formdef = get_publisher().user_class.get_formdef()
if user_formdef:
root['user']['fields'] += [
{
'varname': f.varname or '',
'label': f.label,
'type': f.type,
}
for f in user_formdef.fields
]
return json.dumps(root, indent=indent, sort_keys=True, cls=JSONEncoder)
@classmethod
def import_from_json(cls, fd, charset=None, include_id=False):
if charset is None:
charset = get_publisher().site_charset
formdef = cls()
def unicode2str(v):
if isinstance(v, dict):
return {unicode2str(k): unicode2str(v) for k, v in v.items()}
elif isinstance(v, list):
return [unicode2str(x) for x in v]
elif isinstance(v, str):
return force_str(v)
else:
return v
# we have to make sure all strings are str object, not unicode.
value = unicode2str(json.load(fd))
if include_id and 'id' in value:
formdef.id = value.get('id')
if include_id and 'category_id' in value:
formdef.category_id = value.get('category_id')
elif 'category' in value:
category = value.get('category')
for c in Category.select():
if c.name == category:
formdef.category_id = c.id
break
if include_id and 'workflow_id' in value:
formdef.workflow_id = value.get('workflow_id')
elif (
include_id
and 'workflow' in value
and isinstance(value['workflow'], dict)
and 'id' in value['workflow']
):
formdef.workflow_id = value['workflow'].get('id')
elif 'workflow' in value:
if isinstance(value['workflow'], str):
workflow = value.get('workflow')
else:
workflow = value['workflow'].get('name')
from wcs.workflows import Workflow
2021-02-04 10:37:40 +01:00
for w in Workflow.select():
if w.name == workflow:
formdef.workflow_id = w.id
break
more_attributes = ['max_field_id', 'tracking_code_verify_fields']
for attribute in cls.TEXT_ATTRIBUTES + cls.BOOLEAN_ATTRIBUTES + more_attributes:
if attribute in value:
setattr(formdef, attribute, value.get(attribute))
formdef.fields = []
for i, field in enumerate(value.get('fields', [])):
try:
field_o = fields.get_field_class_by_type(field.get('type'))()
except KeyError:
raise FormdefImportError(_('Unknown field type'), details=field.findtext('type'))
field_o.init_with_json(field, include_id=True)
if not field_o.id:
# this assumes all fields will have id, or none of them
field_o.id = str(i)
formdef.fields.append(field_o)
if formdef.fields and not formdef.max_field_id:
formdef.max_field_id = max(lax_int(x.id) for x in formdef.fields)
if value.get('options'):
formdef.workflow_options = value.get('options')
for option_key, option_value in formdef.workflow_options.items():
if isinstance(option_value, dict) and 'filename' in option_value:
filename = option_value['filename']
upload = Upload(filename, content_type=option_value['content_type'])
new_value = UploadedFile(get_publisher().app_dir, filename, upload)
new_value.set_content(base64.decodebytes(force_bytes(option_value['content'])))
formdef.workflow_options[option_key] = new_value
if value.get('geolocations'):
formdef.geolocations = value.get('geolocations')
if value.get('required_authentication_contexts'):
formdef.required_authentication_contexts = [
str(x) for x in value.get('required_authentication_contexts')
]
return formdef
def export_to_xml(self, include_id=False):
charset = get_publisher().site_charset
root = ET.Element(self.xml_root_node)
if include_id and self.id:
root.attrib['id'] = str(self.id)
for text_attribute in list(self.TEXT_ATTRIBUTES):
if not hasattr(self, text_attribute) or not getattr(self, text_attribute):
continue
ET.SubElement(root, text_attribute).text = force_text(getattr(self, text_attribute), charset)
for boolean_attribute in self.BOOLEAN_ATTRIBUTES:
if not hasattr(self, boolean_attribute):
continue
value = getattr(self, boolean_attribute)
if value:
value = 'true'
else:
value = 'false'
ET.SubElement(root, boolean_attribute).text = value
self.category_class.object_category_xml_export(self, root, include_id=include_id)
2021-12-31 14:58:15 +01:00
workflow = None
if self.workflow_id:
from wcs.workflows import Workflow
workflow = Workflow.get(self.workflow_id, ignore_errors=True, ignore_migration=True)
if not workflow:
workflow = self.get_default_workflow()
elem = ET.SubElement(root, 'workflow')
elem.text = force_text(workflow.name, charset)
if workflow.slug:
elem.attrib['slug'] = str(workflow.slug)
if include_id:
elem.attrib['workflow_id'] = str(workflow.id)
if self.max_field_id is None and self.fields:
self.max_field_id = max(lax_int(x.id) for x in self.fields)
if self.max_field_id:
ET.SubElement(root, 'max_field_id').text = str(self.max_field_id)
if self.tracking_code_verify_fields is not None:
verify_fields = ET.SubElement(root, 'tracking_code_verify_fields')
for field_id in self.tracking_code_verify_fields:
ET.SubElement(verify_fields, 'field_id').text = str(field_id)
2007-05-30 11:14:40 +02:00
fields = ET.SubElement(root, 'fields')
for field in self.fields or []:
fields.append(field.export_to_xml(charset=charset, include_id=include_id))
2007-05-30 11:14:40 +02:00
2022-01-09 21:53:56 +01:00
from wcs.workflows import get_role_name_and_slug
def add_role_element(roles_root, role_id):
if not role_id:
return
role_name, role_slug = get_role_name_and_slug(role_id)
sub = ET.SubElement(roles_root, 'role')
if role_slug:
sub.attrib['slug'] = role_slug
if include_id:
sub.attrib['role_id'] = str(role_id)
sub.text = role_name
return sub
roles_elements = [
('roles', 'user-roles'),
('backoffice_submission_roles', 'backoffice-submission-roles'),
]
for attr_name, node_name in roles_elements:
if not getattr(self, attr_name, None):
continue
roles = ET.SubElement(root, node_name)
for role_id in getattr(self, attr_name):
2022-01-09 21:53:56 +01:00
add_role_element(roles, role_id)
if self.workflow_roles:
roles = ET.SubElement(root, 'roles')
for role_key, role_id in self.workflow_roles.items():
2022-01-09 21:53:56 +01:00
sub = add_role_element(roles, role_id)
if sub is not None:
sub.attrib['role_key'] = role_key
options = ET.SubElement(root, 'options')
for option in sorted(self.workflow_options or []):
element = ET.SubElement(options, 'option')
element.attrib['varname'] = option
option_value = self.workflow_options.get(option)
if isinstance(option_value, str):
element.text = force_text(self.workflow_options.get(option, ''), charset)
elif hasattr(option_value, 'base_filename'):
ET.SubElement(element, 'filename').text = option_value.base_filename
ET.SubElement(element, 'content_type').text = (
option_value.content_type or 'application/octet-stream'
)
ET.SubElement(element, 'content').text = force_text(
base64.b64encode(option_value.get_content())
2021-02-04 10:37:40 +01:00
)
elif isinstance(option_value, time.struct_time):
element.text = time.strftime('%Y-%m-%d', option_value)
element.attrib['type'] = 'date'
else:
pass # TODO: extend support to other types
custom_views_element = ET.SubElement(root, 'custom_views')
if hasattr(self, '_custom_views'):
# it has just been loaded, it's reexported as part as the overwrite
# confirmation dialog, do not get custom views from database.
custom_views = self._custom_views
else:
custom_views = []
for view in get_publisher().custom_view_class.select():
if view.match(user=None, formdef=self):
custom_views.append(view)
for view in custom_views:
custom_views_element.append(view.export_to_xml(charset=charset))
geolocations = ET.SubElement(root, 'geolocations')
for geoloc_key, geoloc_label in (self.geolocations or {}).items():
element = ET.SubElement(geolocations, 'geolocation')
element.attrib['key'] = geoloc_key
element.text = force_text(geoloc_label, charset)
if self.required_authentication_contexts:
element = ET.SubElement(root, 'required_authentication_contexts')
for auth_context in self.required_authentication_contexts:
ET.SubElement(element, 'method').text = force_text(auth_context)
if self.digest_templates:
digest_templates = ET.SubElement(root, 'digest_templates')
for key, value in self.digest_templates.items():
if not value:
continue
sub = ET.SubElement(digest_templates, 'template')
sub.attrib['key'] = key
sub.text = value
2007-05-30 11:14:40 +02:00
return root
@classmethod
def import_from_xml(cls, fd, charset=None, include_id=False, fix_on_error=False, check_datasources=True):
2007-06-27 13:57:36 +02:00
try:
tree = ET.parse(fd)
2021-03-20 23:07:57 +01:00
except Exception:
2007-06-27 13:57:36 +02:00
raise ValueError()
formdef = cls.import_from_xml_tree(
tree,
charset=charset,
include_id=include_id,
fix_on_error=fix_on_error,
check_datasources=check_datasources,
)
if formdef.url_name:
try:
cls.get_on_index(formdef.url_name, 'url_name', ignore_migration=True)
except KeyError:
pass
else:
formdef.url_name = formdef.get_new_url_name()
# fix max_field_id if necessary
if formdef.max_field_id is not None:
max_field_id = max(lax_int(x.id) for x in formdef.fields)
formdef.max_field_id = max(max_field_id, formdef.max_field_id)
# check if all field id are unique
known_field_ids = set()
for field in formdef.fields:
if field.id in known_field_ids:
raise FormdefImportRecoverableError(_('Duplicated field identifiers'))
known_field_ids.add(field.id)
return formdef
@classmethod
def import_from_xml_tree(
cls, tree, include_id=False, charset=None, fix_on_error=False, snapshot=False, check_datasources=True
):
from wcs.carddef import CardDef
if charset is None:
charset = get_publisher().site_charset
assert charset == 'utf-8'
formdef = cls()
2007-06-27 13:57:36 +02:00
if tree.find('name') is None or not tree.find('name').text:
raise FormdefImportError(_('Missing name'))
2007-06-27 13:57:36 +02:00
# if the tree we get is actually a ElementTree for real, we get its
# root element and go on happily.
if not ET.iselement(tree):
tree = tree.getroot()
if tree.tag != cls.xml_root_node:
raise FormdefImportError(
_('Provided XML file is invalid, it starts with a <%(seen)s> tag instead of <%(expected)s>')
% {'seen': tree.tag, 'expected': cls.xml_root_node}
)
if include_id and tree.attrib.get('id'):
formdef.id = tree.attrib.get('id')
for text_attribute in list(cls.TEXT_ATTRIBUTES):
value = tree.find(text_attribute)
if value is None or value.text is None:
continue
setattr(formdef, text_attribute, xml_node_text(value))
for boolean_attribute in cls.BOOLEAN_ATTRIBUTES:
value = tree.find(boolean_attribute)
if value is None:
continue
setattr(formdef, boolean_attribute, value.text == 'true')
formdef.fields = []
unknown_field_types = set()
unknown_fields_blocks = set()
2007-09-20 14:12:30 +02:00
for i, field in enumerate(tree.find('fields')):
2007-06-27 13:57:36 +02:00
try:
field_o = fields.get_field_class_by_type(field.findtext('type'))()
except KeyError:
field_type = field.findtext('type')
if field_type.startswith('block:'):
unknown_fields_blocks.add(field_type[6:])
else:
unknown_field_types.add(field_type)
continue
field_o.init_with_xml(field, charset, include_id=True)
if fix_on_error or not field_o.id:
# this assumes all fields will have id, or none of them
field_o.id = str(i + 1)
formdef.fields.append(field_o)
if formdef.fields:
value = tree.find('max_field_id')
if value is not None:
formdef.max_field_id = int(value.text)
else:
formdef.max_field_id = max(lax_int(x.id) for x in formdef.fields)
if tree.find('tracking_code_verify_fields') is not None:
formdef.tracking_code_verify_fields = [
xml_node_text(verify_field_id)
for verify_field_id in tree.findall('tracking_code_verify_fields/field_id')
]
formdef.workflow_options = {}
for option in tree.findall('options/option'):
option_value = None
if option.attrib.get('type') == 'date':
option_value = time.strptime(option.text, '%Y-%m-%d')
elif option.text:
option_value = xml_node_text(option)
elif option.findall('filename'):
filename = xml_node_text(option.find('filename'))
upload = Upload(filename, content_type=xml_node_text(option.find('content_type')))
option_value = UploadedFile(get_publisher().app_dir, filename, upload)
option_value.set_content(base64.decodebytes(force_bytes(option.find('content').text)))
formdef.workflow_options[option.attrib.get('varname')] = option_value
formdef._custom_views = []
for view in tree.findall('custom_views/%s' % get_publisher().custom_view_class.xml_root_node):
view_o = get_publisher().custom_view_class()
view_o.init_with_xml(view, charset)
formdef._custom_views.append(view_o)
cls.category_class.object_category_xml_import(formdef, tree, include_id=include_id)
if tree.find('workflow') is not None:
from wcs.workflows import Workflow
2021-02-04 10:37:40 +01:00
workflow_node = tree.find('workflow')
if include_id and workflow_node.attrib.get('workflow_id'):
workflow_id = workflow_node.attrib.get('workflow_id')
if Workflow.has_key(workflow_id):
formdef.workflow_id = workflow_id
else:
2021-12-31 14:58:15 +01:00
workflow_slug = workflow_node.attrib.get('slug')
if workflow_slug:
formdef.workflow = Workflow.get_by_slug(workflow_slug)
else:
workflow = xml_node_text(workflow_node)
for w in Workflow.select(ignore_errors=True, ignore_migration=True):
if w and w.name == workflow:
formdef.workflow_id = w.id
break
def get_role_by_node(role_node):
role_id = None
value = xml_node_text(role_node)
if value.startswith('_') or value == 'logged-users':
2022-01-09 21:53:56 +01:00
return value
if include_id:
role_id = role_node.attrib.get('role_id')
2022-01-09 21:53:56 +01:00
if role_id and get_publisher().role_class.get(role_id, ignore_errors=True):
return role_id
2022-01-09 21:53:56 +01:00
role_slug = role_node.attrib.get('slug')
role = get_publisher().role_class.resolve(uuid=None, slug=role_slug, name=value)
if role:
return role.id
2022-01-09 21:53:56 +01:00
return None
roles_elements = [
('roles', 'user-roles'),
('backoffice_submission_roles', 'backoffice-submission-roles'),
]
for attr_name, node_name in roles_elements:
if tree.find(node_name) is None:
continue
roles_node = tree.find(node_name)
roles = []
setattr(formdef, attr_name, roles)
for child in roles_node:
role_id = get_role_by_node(child)
if role_id:
roles.append(role_id)
if tree.find('roles') is not None:
roles_node = tree.find('roles')
formdef.workflow_roles = {}
for child in roles_node:
role_key = child.attrib['role_key']
role_id = get_role_by_node(child)
formdef.workflow_roles[role_key] = role_id
if tree.find('geolocations') is not None:
geolocations_node = tree.find('geolocations')
formdef.geolocations = {}
for child in geolocations_node:
geoloc_key = child.attrib['key']
geoloc_value = xml_node_text(child)
formdef.geolocations[geoloc_key] = geoloc_value
if tree.find('required_authentication_contexts') is not None:
node = tree.find('required_authentication_contexts')
formdef.required_authentication_contexts = []
for child in node:
formdef.required_authentication_contexts.append(str(child.text))
if tree.find('digest_templates') is not None:
digest_templates_node = tree.find('digest_templates')
formdef.digest_templates = {}
for child in digest_templates_node:
key = child.attrib['key']
value = xml_node_text(child)
formdef.digest_templates[key] = value
unknown_datasources = set()
if check_datasources:
# check if datasources are defined
for field in formdef.fields:
data_source = getattr(field, 'data_source', None)
if data_source:
data_source_id = data_source.get('type')
if isinstance(data_sources.get_object(data_source), data_sources.StubNamedDataSource):
unknown_datasources.add(data_source_id)
elif data_source_id and data_source_id.startswith('carddef:'):
parts = data_source_id.split(':')
# check if carddef exists
url_name = parts[1]
if formdef.xml_root_node == 'carddef' and formdef.url_name == url_name:
# reference to itself, it's ok
continue
try:
CardDef.get_by_urlname(url_name)
except KeyError:
unknown_datasources.add(data_source_id)
continue
if len(parts) == 2 or parts[2] == '_with_user_filter':
continue
lookup_criterias = [
Equal('formdef_type', 'carddef'),
Equal('visibility', 'datasource'),
Equal('slug', parts[2]),
]
try:
get_publisher().custom_view_class.select(lookup_criterias)[0]
except IndexError:
unknown_datasources.add(data_source_id)
if unknown_field_types or unknown_fields_blocks or unknown_datasources:
details = collections.defaultdict(set)
if unknown_field_types:
details[_('Unknown field types')].update(unknown_field_types)
if unknown_fields_blocks:
details[_('Unknown fields blocks')].update(unknown_fields_blocks)
if unknown_datasources:
details[_('Unknown datasources')].update(unknown_datasources)
raise FormdefImportUnknownReferencedError(_('Unknown referenced objects'), details=details)
return formdef
2007-05-30 11:14:40 +02:00
def get_detailed_email_form(self, formdata, url):
details = []
if formdata.user_id and formdata.user:
details.append(_('User name:'))
details.append(' %s' % formdata.user.name)
details.append('')
data = formdata.data
for field in self.fields:
if isinstance(
field,
(
fields.SubtitleField,
fields.TitleField,
fields.CommentField,
fields.PageField,
fields.ComputedField,
),
):
continue
if isinstance(field, fields.TextField) and field.display_mode == 'rich':
continue
if data is None:
continue
if data.get(field.id) is None:
continue
if data.get(field.id + '_display'):
value = data.get(field.id + '_display')
else:
value = data.get(field.id)
details.append(_('%s:') % field.label)
if field.type in ('text', 'file'):
# XXX: howto support preformatted text in a dl in docutils ?
details.append((' %s' % value).replace('\n', '\n '))
else:
details.append('%s' % field.get_rst_view_value(value, indent=' '))
details.append('')
return '\n'.join([str(x) for x in details])
def get_submitter_email(self, formdata):
users_cfg = get_cfg('users', {})
field_email = users_cfg.get('field_email') or 'email'
# look up in submitted form for one that would hold the user
# email (the one set to be prefilled by user email)
def is_user_field(field):
if not getattr(field, 'prefill', None):
return False
if field.prefill.get('type') != 'user':
return False
if field.prefill.get('value') != field_email:
return False
return True
if formdata.data:
# check first in "normal" fields
for field in formdata.formdef.fields:
if not is_user_field(field):
continue
v = formdata.data.get(field.id)
if v:
return v
# then check in block fields
for field in formdata.formdef.fields:
if field.key != 'block':
continue
for subfield in field.block.fields:
if not is_user_field(subfield):
continue
v = formdata.data.get(field.id)
if not (v and v.get('data')):
continue
for data in v.get('data'):
w = data.get(subfield.id)
if w:
return w
# if nothing was found, get email from user profile
if formdata.user and formdata.user.email and formdata.user.is_active:
return formdata.user.email
return None
2009-01-15 10:50:37 +01:00
def get_static_substitution_variables(self, minimal=False):
d = {
'form_name': self.name,
'form_slug': self.url_name,
'form_class_name': self.__class__.__name__, # reserved for logged errors
}
if not minimal:
from wcs.variables import LazyFormDef
2021-02-04 10:37:40 +01:00
d['form_objects'] = LazyFormDef(self).objects
if self.category:
d.update(self.category.get_substitution_variables(minimal=minimal))
d.update(self.get_variable_options())
return d
def get_substitution_variables(self, minimal=False):
from wcs.variables import LazyFormDef
2021-02-04 10:37:40 +01:00
2019-09-29 20:53:23 +02:00
from .qommon.substitution import CompatibilityNamesDict
2021-03-19 14:41:24 +01:00
return CompatibilityNamesDict({'form': LazyFormDef(self)})
def get_detailed_evolution(self, formdata):
if not formdata.evolution:
return None
details = []
evo = formdata.evolution[-1]
if evo.who:
evo_who = None
if evo.who == '_submitter':
if formdata.user_id:
evo_who = formdata.user_id
else:
evo_who = evo.who
if evo_who:
2013-05-02 11:32:18 +02:00
user_who = get_publisher().user_class.get(evo_who, ignore_errors=True)
if user_who:
details.append(_('User name'))
details.append(' %s' % user_who.name)
if evo.status:
details.append(_('Status'))
2012-06-20 13:02:28 +02:00
details.append(' %s' % formdata.get_status_label())
comment = evo.get_plain_text_comment()
if comment:
details.append('\n%s\n' % comment)
return '\n\n----\n\n' + '\n'.join([str(x) for x in details])
def is_of_concern_for_role_id(self, role_id):
if not self.workflow_roles:
return False
return role_id in self.workflow_roles.values()
def is_of_concern_for_user(self, user, formdata=None):
if not self.workflow_roles:
self.workflow_roles = {}
user_roles = set(user.get_roles())
# if the formdef itself has some function attributed to the user, grant
# access.
for role_id in self.workflow_roles.values():
if role_id in user_roles:
return True
# if there was some redispatching of function, values will be different
# in formdata, check them.
if formdata and formdata.workflow_roles:
for role_id in formdata.workflow_roles.values():
if role_id is None:
continue
if isinstance(role_id, list):
role_ids = set(role_id)
else:
role_ids = {role_id}
if user_roles.intersection(role_ids):
2016-01-08 16:46:50 +01:00
return True
# if no formdata was given, lookup if there are some existing formdata
# where the user has access.
if not formdata:
data_class = self.data_class()
for role_id in user.get_roles():
if data_class.get_ids_with_indexed_value('workflow_roles', role_id):
return True
return False
def is_user_allowed_read(self, user, formdata=None):
if not user:
2014-01-03 16:02:00 +01:00
if formdata and get_session() and get_session().is_anonymous_submitter(formdata):
return True
return False
if user.is_admin:
return True
user_roles = set(user.get_roles())
user_roles.add(logged_users_role().id)
def ensure_role_are_strings(roles):
# makes sure all roles are defined as strings, as different origins
# (formdef, user, workflow status...) may define them differently.
2013-10-25 12:32:53 +02:00
return {str(x) for x in roles if x}
user_roles = ensure_role_are_strings(user_roles)
if formdata and formdata.is_submitter(user):
return True
if self.is_of_concern_for_user(user):
if not formdata:
return True
if formdata:
# current status
concerned_roles = ensure_role_are_strings(formdata.get_concerned_roles())
if '_submitter' in concerned_roles and formdata.is_submitter(user):
return True
if user_roles.intersection(concerned_roles):
return True
return False
def is_user_allowed_read_status_and_history(self, user, formdata=None):
if user and user.is_admin:
return True
2014-01-03 16:02:00 +01:00
if not self.workflow_roles:
self.workflow_roles = {}
form_roles = [x for x in self.workflow_roles.values() if x]
if formdata and formdata.workflow_roles:
for x in formdata.workflow_roles.values():
if isinstance(x, list):
form_roles.extend(x)
elif x:
form_roles.append(x)
return self.is_user_allowed_read(user, formdata=formdata)
def is_disabled(self):
if self.disabled:
return True
if self.publication_date:
try:
publication_datetime = get_as_datetime(self.publication_date)
except ValueError:
return False
if publication_datetime > datetime.datetime.now():
return True
if self.expiration_date:
try:
expiration_datetime = get_as_datetime(self.expiration_date)
except ValueError:
return False
if expiration_datetime < datetime.datetime.now():
return True
return False
@classmethod
def update_filetype(cls, filetype_id, previous_filetype, new_filetype):
# look for file fields in all formdefs, to update them with the
# new mimetypes.
if previous_filetype == new_filetype:
return
for formdef in cls.select():
changed = False
for field in formdef.fields:
if not hasattr(field, 'document_type'):
continue
if not field.document_type:
continue
if field.document_type['id'] == filetype_id:
previous_filetype = field.document_type.copy()
del previous_filetype['id']
if previous_filetype == new_filetype:
continue
field.document_type = new_filetype.copy()
field.document_type['id'] = filetype_id
changed = True
if changed:
formdef.store()
class _EmptyClass: # helper for instance creation without calling __init__
pass
def __copy__(self, memo=None, deepcopy=False):
formdef_copy = self._EmptyClass()
formdef_copy.__class__ = self.__class__
if deepcopy:
formdef_copy.__dict__ = copy.deepcopy(self.__dict__, memo=memo)
else:
formdef_copy.__dict__ = copy.copy(self.__dict__)
return formdef_copy
def __deepcopy__(self, memo=None):
return self.__copy__(memo=memo, deepcopy=True)
# don't pickle computed attributes
def __getstate__(self):
odict = copy.copy(self.__dict__)
2019-11-12 11:54:16 +01:00
if '_workflow' in odict:
del odict['_workflow']
2019-11-12 11:54:16 +01:00
if '_start_page' in odict:
del odict['_start_page']
2019-11-12 11:54:16 +01:00
if self.lightweight and 'fields' in odict:
# will be stored independently
del odict['fields']
if '_custom_views' in odict:
del odict['_custom_views']
return odict
def __setstate__(self, dict):
self.__dict__ = dict
self._workflow = None
self._start_page = None
if hasattr(self, 'snapshot_object'):
# don't restore snapshot object that would have been stored erroneously
delattr(self, 'snapshot_object')
@classmethod
def storage_load(cls, fd, **kwargs):
o = super().storage_load(fd)
if kwargs.get('lightweight'):
o.fields = Ellipsis
return o
if cls.lightweight:
try:
o.fields = pickle.load(fd, **PICKLE_KWARGS)
except EOFError:
pass # old format
return o
@classmethod
def storage_dumps(cls, object):
if getattr(object, 'fields', None) is Ellipsis:
raise RuntimeError('storing a lightweight object is not allowed')
# use two separate pickle chunks to store the formdef, the first field
# is everything but fields (excluded via __getstate__) while the second
# chunk contains the fields.
return pickle.dumps(object, protocol=2) + pickle.dumps(object.fields, protocol=2)
def change_workflow(self, new_workflow, status_mapping=None):
old_workflow = self.get_workflow()
formdata_count = self.data_class().count()
if formdata_count:
assert status_mapping, 'status mapping is required if there are formdatas'
assert all(
status.id in status_mapping for status in old_workflow.possible_status
), 'a status was not mapped'
mapping = {}
for old_status, new_status in status_mapping.items():
mapping['wf-%s' % old_status] = 'wf-%s' % new_status
mapping['draft'] = 'draft'
if any(x[0] != x[1] for x in mapping.items()):
# if there are status changes, update all formdatas (except drafts)
from . import sql
sql.formdef_remap_statuses(self, mapping)
self.workflow = new_workflow
if new_workflow.has_action('geolocate') and not self.geolocations:
self.geolocations = {'base': str(_('Geolocation'))}
self.store(comment=_('Workflow change'))
if formdata_count:
# instruct formdef to update its security rules
self.data_class().rebuild_security()
def i18n_scan(self):
location = '%s/%s/' % (self.backoffice_section, self.id)
yield location, None, self.name
yield location, None, self.description
for field in self.fields or []:
yield from field.i18n_scan(base_location=location + 'fields/')
EmailsDirectory.register(
'new_user',
_('Notification of creation to user'),
enabled=False,
category=_('Workflow'),
default_subject=_('New form ({{ form_name }})'),
default_body=_(
'''\
Hello,
This mail is a reminder about the form you just submitted.
{% if form_user %}
You can consult it with this link: {{ form_url }}
{% endif %}
{% if form_details %}
For reference, here are the details:
{{ form_details }}
{% endif %}
'''
2021-02-04 10:37:40 +01:00
),
)
2021-02-04 10:37:40 +01:00
EmailsDirectory.register(
'change_user',
_('Notification of change to user'),
category=_('Workflow'),
default_subject=_('Form status change ({{ form_name }})'),
default_body=_(
'''\
Hello,
{% if form_status_changed %}
Status of the form you submitted just changed (from "{{ form_previous_status }}" to "{{ form_status }}").
{% endif %}
{% if form_user %}
You can consult it with this link: {{ form_url }}
{% endif %}
{% if form_comment %}New comment: {{ form_comment }}{% endif %}
{% if form_evolution %}
{{ form_evolution }}
{% endif %}
'''
2021-02-04 10:37:40 +01:00
),
)
2021-02-04 10:37:40 +01:00
EmailsDirectory.register(
'new_receiver',
_('Notification of creation to receiver'),
enabled=False,
category=_('Workflow'),
default_subject=_('New form ({{ form_name }})'),
default_body=_(
'''\
Hello,
A new form has been submitted, you can see it with this link:
{{ form_url_backoffice }}
{% if form_details %}
For reference, here are the details:
{{ form_details }}
{% endif %}
'''
2021-02-04 10:37:40 +01:00
),
)
2021-02-04 10:37:40 +01:00
EmailsDirectory.register(
'change_receiver',
_('Notification of change to receiver'),
category=_('Workflow'),
default_subject=_('Form status change ({{ form_name }})'),
default_body=_(
'''\
Hello,
A form just changed, you can consult it with this link:
{{ form_url_backoffice }}
{% if form_status_changed %}
Status of the form just changed (from "{{ form_previous_status }}" to "{{ form_status }}").
{% endif %}
{% if form_comment %}New comment: {{ form_comment }}{% endif %}
{% if form_evolution %}
{{ form_evolution }}
{% endif %}
'''
2021-02-04 10:37:40 +01:00
),
)
Substitutions.register('form_name', category=_('Form'), comment=_('Form Name'))
2015-11-01 21:32:41 +01:00
def clean_drafts(publisher, **kwargs):
2015-11-01 21:32:41 +01:00
import wcs.qommon.storage as st
from wcs.carddef import CardDef
2021-02-04 10:37:40 +01:00
job = kwargs.pop('job', None)
for formdef in FormDef.select() + CardDef.select():
with job.log_long_job(
'%s %s' % (formdef.xml_root_node, formdef.url_name)
) if job else contextlib.ExitStack():
removal_date = datetime.date.today() - datetime.timedelta(days=formdef.get_drafts_lifespan())
for formdata in formdef.data_class().select(
[st.Equal('status', 'draft'), st.Less('receipt_time', removal_date.timetuple())]
):
formdata.remove_self()
2015-11-01 21:32:41 +01:00
2019-07-17 11:15:50 +02:00
def clean_unused_files(publisher, **kwargs):
2019-07-17 11:15:50 +02:00
unused_files_behaviour = publisher.get_site_option('unused-files-behaviour')
if unused_files_behaviour not in ('move', 'remove'):
return
known_filenames = set()
known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'uploads/*'))])
known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'attachments/*/*'))])
def accumulate_filenames():
from wcs.carddef import CardDef
2021-02-04 10:37:40 +01:00
for formdef in FormDef.select(ignore_migration=True) + CardDef.select(ignore_migration=True):
2019-07-17 11:15:50 +02:00
for option_data in (formdef.workflow_options or {}).values():
if is_upload(option_data):
yield option_data.get_fs_filename()
for formdata in formdef.data_class().select_iterator(ignore_errors=True, itersize=200):
for field_data in formdata.get_all_file_data(with_history=True):
if is_upload(field_data):
yield field_data.get_fs_filename()
elif is_attachment(field_data):
yield field_data.filename
2019-07-17 11:15:50 +02:00
for user in publisher.user_class.select():
for field_data in (user.form_data or {}).values():
if is_upload(field_data):
yield field_data.get_fs_filename()
2019-07-17 11:15:50 +02:00
used_filenames = set()
for filename in accumulate_filenames():
if not filename: # alternative storage
continue
2019-07-17 11:15:50 +02:00
if not os.path.isabs(filename):
filename = os.path.join(publisher.app_dir, filename)
used_filenames.add(filename)
unused_filenames = known_filenames - used_filenames
for filename in unused_filenames:
try:
if unused_files_behaviour == 'move':
new_filename = os.path.join(
publisher.app_dir, 'unused-files', filename[len(publisher.app_dir) + 1 :]
2021-02-04 10:37:40 +01:00
)
2019-07-17 11:15:50 +02:00
if os.path.exists(new_filename):
os.unlink(filename)
else:
new_dirname = os.path.dirname(new_filename)
if not os.path.exists(new_dirname):
os.makedirs(new_dirname)
os.rename(filename, new_filename)
else:
os.unlink(filename)
except OSError:
pass
2022-09-24 16:54:45 +02:00
def get_formdefs_of_all_kinds(**kwargs):
from wcs.admin.settings import UserFieldsFormDef
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
from wcs.wf.form import FormWorkflowStatusItem
from wcs.workflows import Workflow
2022-09-24 16:54:45 +02:00
select_kwargs = {
'ignore_errors': True,
'ignore_migration': True,
}
2022-09-24 16:54:45 +02:00
select_kwargs.update(kwargs)
formdefs = [UserFieldsFormDef()]
2022-09-24 16:54:45 +02:00
formdefs += FormDef.select(**select_kwargs)
formdefs += BlockDef.select(**select_kwargs)
formdefs += CardDef.select(**select_kwargs)
for workflow in Workflow.select(**select_kwargs):
for status in workflow.possible_status:
for item in status.items:
if isinstance(item, FormWorkflowStatusItem) and item.formdef:
formdefs.append(item.formdef)
if workflow.variables_formdef:
formdefs.append(workflow.variables_formdef)
if workflow.backoffice_fields_formdef:
formdefs.append(workflow.backoffice_fields_formdef)
return formdefs
def register_cronjobs():
# once a day, look for:
# * expired drafts
2015-11-01 21:32:41 +01:00
get_publisher_class().register_cronjob(CronJob(clean_drafts, name='clean_drafts', hours=[2], minutes=[0]))
# * unused files
2019-07-17 11:15:50 +02:00
get_publisher_class().register_cronjob(
CronJob(clean_unused_files, name='clean_unused_files', hours=[2], minutes=[0])
)
class UpdateDigestAfterJob(AfterJob):
label = _('Updating digests')
def __init__(self, formdefs):
super().__init__(formdefs=[(x.__class__, x.id) for x in formdefs])
def execute(self):
for formdef_class, formdef_id in self.kwargs['formdefs']:
formdef = formdef_class.get(formdef_id)
for formdata in formdef.data_class().select(order_by='id'):
formdata.store()