wcs/wcs/formdef.py

1727 lines
66 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import base64
import copy
import glob
import itertools
import pickle
import sys
import time
import types
import json
import xml.etree.ElementTree as ET
import datetime
from django.utils import six
from django.utils.encoding import force_bytes, force_text
from quixote import get_request, get_publisher
from quixote.http_request import Upload
from .qommon import _, N_, force_str, PICKLE_KWARGS
from .qommon.storage import StorableObject, fix_key, Equal
from .qommon.cron import CronJob
from .qommon.form import *
from .qommon.misc import simplify, get_as_datetime, xml_node_text
from .qommon import get_cfg
from .qommon.substitution import Substitutions
from .qommon.publisher import get_publisher_class
from .formdata import FormData
from .roles import Role, logged_users_role
from .categories import Category
from . import fields
from . import data_sources
if not hasattr(types, 'ClassType'):
types.ClassType = type
class FormdefImportError(Exception):
def __init__(self, msg, details=None):
self.msg = msg
self.details = details
class FormdefImportRecoverableError(FormdefImportError):
pass
class FormField(object):
### only used to unpickle form fields from older (<200603) versions
def __setstate__(self, dict):
type = dict['type']
self.real_field = fields.get_field_class_by_type(type)(**dict)
def lax_int(s):
try:
return int(s)
except (ValueError, TypeError):
return -1
class FormDef(StorableObject):
_names = 'formdefs'
_indexes = ['url_name']
_hashed_indexes = ['backoffice_submission_roles']
data_sql_prefix = 'formdata'
pickle_module_name = 'formdef'
xml_root_node = 'formdef'
verbose_name = N_('Form')
verbose_name_plural = N_('Forms')
name = None
description = None
keywords = None
url_name = None
internal_identifier = None # mostly for pickle
table_name = None # for SQL only
fields = None
category_id = None
workflow_id = None
workflow_options = None
workflow_roles = None
roles = None
required_authentication_contexts = None
backoffice_submission_roles = None
discussion = False
confirmation = True
detailed_emails = True
disabled = False
only_allow_one = False
enable_tracking_codes = False
disabled_redirection = None
always_advertise = False
publication_date = None
expiration_date = None
has_captcha = False
skip_from_360_view = False
include_download_all_button = False
appearance_keywords = None
digest_template = None
geolocations = None
last_modification_time = None
last_modification_user_id = None
max_field_id = None
# store fields in a separate pickle chunk
lightweight = True
# prefix for formdata variables
var_prefix = 'form'
# declarations for serialization
TEXT_ATTRIBUTES = ['name', 'url_name', 'description', 'keywords',
'publication_date', 'expiration_date', 'internal_identifier',
'disabled_redirection', 'appearance_keywords',
'digest_template']
BOOLEAN_ATTRIBUTES = ['discussion', 'detailed_emails', 'disabled',
'only_allow_one', 'enable_tracking_codes', 'confirmation',
'always_advertise', 'include_download_all_button',
'has_captcha', 'skip_from_360_view']
def __init__(self, *args, **kwargs):
super(FormDef, self).__init__(*args, **kwargs)
self.fields = []
def migrate(self):
changed = False
if self.__dict__.get('fields') is Ellipsis:
# don't run migration on lightweight objects
return
if 'receiver' in self.__dict__:
self.receiver_id = self.__dict__['receiver']
del self.__dict__['receiver']
changed = True
if 'category' in self.__dict__:
self.category_id = self.__dict__['category']
del self.__dict__['category']
changed = True
if not self.url_name:
try:
int(self.id)
except ValueError:
self.url_name = self.id
else:
self.url_name = self.get_new_url_name()
changed = True
if self.fields and type(self.fields[0]) is dict:
for f in self.fields:
if 'name' in f:
f['label'] = f['name']
del f['name']
self.fields = [FormField(**x) for x in self.fields]
for i, f in enumerate(self.fields):
f.id = str(i)
for formdata in self.data_class().select():
for f in self.fields:
if not f.label in formdata.data:
continue
formdata.data[f.id] = formdata.data[f.label]
del formdata.data[f.label]
formdata.store()
changed = True
if self.fields and isinstance(self.fields[0], FormField):
# migration from generic FormField to specific Field classes
# (200603)
self.fields = [x.real_field for x in self.fields]
if 'public' in self.__dict__:
del self.__dict__['public']
changed = True
if 'receiver_id' in self.__dict__:
# migration from a simple receiver role to workflow roles
if not self.workflow_roles:
self.workflow_roles = {}
self.workflow_roles['_receiver'] = self.__dict__['receiver_id']
del self.__dict__['receiver_id']
changed = True
if not self.table_name and get_publisher().has_site_option('postgresql'):
from . import sql
self.table_name = sql.get_formdef_table_name(self)
changed = True
if self.max_field_id is None and self.fields:
self.max_field_id = max([lax_int(x.id) for x in self.fields])
changed = True
if type(self.category_id) is int:
self.category_id = str(self.category_id)
changed = True
if type(self.workflow_id) is int:
self.workflow_id = str(self.workflow_id)
changed = True
if self.roles:
for role in self.roles:
if type(role) is int:
self.roles = [str(x) for x in self.roles]
changed = True
break
if type(self.last_modification_user_id) is int:
self.last_modification_user_id = str(self.last_modification_user_id)
changed = True
if self.workflow_roles:
workflow_roles_list = self.workflow_roles.items()
for role_key, role_id in self.workflow_roles.items():
if type(role_id) is int:
self.workflow_roles = dict([(x, str(y)) for x, y in workflow_roles_list])
changed = True
break
if not self.internal_identifier:
self.internal_identifier = self.url_name
changed = True
for f in self.fields or []:
changed |= f.migrate()
if changed:
self.store()
@classmethod
def remove_object(cls, id):
super(FormDef, cls).remove_object(id)
if get_publisher().is_using_postgresql():
# recreate global views so they don't reference formdata from
# deleted formefs
from . import sql
conn, cur = sql.get_connection_and_cursor()
sql.do_global_views(conn, cur)
conn.commit()
cur.close()
@property
def data_class_name(self):
return '_wcs_%s' % self.url_name.title()
def data_class(self, mode=None):
if not 'formdef' in sys.modules:
sys.modules['formdef'] = sys.modules[__name__]
if hasattr(sys.modules['formdef'], self.data_class_name):
data_class = getattr(sys.modules['formdef'], self.data_class_name)
# only use existing data class if it has a reference to this actual
# formdef
if data_class._formdef is self:
return data_class
if (get_publisher().is_using_postgresql() and not mode == 'files') or mode == 'sql':
from . import sql
table_name = sql.get_formdef_table_name(self)
cls = types.ClassType(self.data_class_name, (sql.SqlFormData,),
{'_formdef': self,
'_table_name': table_name})
actions = sql.do_formdef_tables(self)
else:
cls = types.ClassType(self.data_class_name, (FormData,),
{'_names': 'form-%s' % self.internal_identifier,
'_formdef': self})
actions = []
setattr(sys.modules['formdef'], self.data_class_name, cls)
setattr(sys.modules['wcs.formdef'], self.data_class_name, cls)
if actions:
for action in actions:
getattr(cls, action)()
return cls
def get_new_field_id(self):
if self.max_field_id is None:
field_id = 1
else:
field_id = self.max_field_id + 1
self.max_field_id = field_id
return str(field_id)
def get_new_url_name(self):
new_url_name = simplify(self.name)
base_new_url_name = new_url_name
suffix_no = 0
while True:
try:
obj = self.get_on_index(new_url_name, 'url_name', ignore_migration=True)
except KeyError:
break
if obj.id == self.id:
break
suffix_no += 1
new_url_name = '%s-%s' % (base_new_url_name, suffix_no)
return new_url_name
def get_new_internal_identifier(self):
new_internal_identifier = simplify(self.name)
base_new_internal_identifier = new_internal_identifier
suffix_no = 0
while True:
try:
formdef = self.get_by_urlname(new_internal_identifier, ignore_migration=True)
except KeyError:
break
if formdef.id == self.id:
break
suffix_no += 1
new_internal_identifier = '%s-%s' % (base_new_internal_identifier, suffix_no)
return new_internal_identifier
@classmethod
def get_new_id(cls, create=False):
id = super().get_new_id(create=False)
if get_publisher().is_using_postgresql():
id = cls.get_sql_new_id(id_start=int(id))
if create:
objects_dir = cls.get_objects_dir()
object_filename = os.path.join(objects_dir, fix_key(id))
try:
fd = os.open(object_filename, os.O_CREAT | os.O_EXCL)
except OSError:
return cls.get_new_id(create=True)
os.close(fd)
return str(id)
@classmethod
def get_sql_new_id(cls, id_start):
from . import sql
return sql.get_formdef_new_id(id_start=id_start)
@classmethod
def wipe(cls):
super(FormDef, cls).wipe()
if get_publisher().is_using_postgresql():
cls.sql_wipe()
@classmethod
def sql_wipe(cls):
from . import sql
sql.formdef_wipe()
def store(self, comment=None):
assert not self.is_readonly()
if self.url_name is None:
# set url name if it's not yet there
self.url_name = self.get_new_url_name()
new_internal_identifier = self.get_new_internal_identifier()
if not self.internal_identifier:
self.internal_identifier = new_internal_identifier
if new_internal_identifier != self.internal_identifier:
# title changed, internal identifier will be changed only if
# the formdef is currently being imported (self.id is None)
# or if there are not yet any submitted forms
if self.id is None or self.data_class().count() == 0:
self.internal_identifier = new_internal_identifier
self.last_modification_time = time.localtime()
if get_request() and get_request().user:
self.last_modification_user_id = str(get_request().user.id)
else:
self.last_modification_user_id = None
t = StorableObject.store(self)
if get_publisher().snapshot_class:
get_publisher().snapshot_class.snap(instance=self, comment=comment)
if get_publisher().is_using_postgresql():
from . import sql
sql.do_formdef_tables(self, rebuild_views=True,
rebuild_global_views=True)
return t
def get_all_fields(self):
return (self.fields or []) + self.workflow.get_backoffice_fields()
def get_widget_fields(self):
return [field for field in self.fields or [] if isinstance(field, fields.WidgetField)]
def rebuild(self):
if get_publisher().is_using_postgresql():
from . import sql
sql.do_formdef_tables(self, rebuild_views=True,
rebuild_global_views=True)
def get_category(self):
if self.category_id:
try:
return Category.get(self.category_id)
except KeyError:
return None
else:
return None
def set_category(self, category):
if category:
self.category_id = category.id
elif self.category_id:
self.category_id = None
category = property(get_category, set_category)
_workflow = None
def get_workflow(self):
if self._workflow:
return self._workflow
from wcs.workflows import Workflow
if self.workflow_id:
try:
workflow = Workflow.get(self.workflow_id)
except KeyError:
return Workflow.get_unknown_workflow()
self._workflow = self.get_workflow_with_options(workflow)
return self._workflow
else:
return self.get_default_workflow()
@classmethod
def get_default_workflow(cls):
from wcs.workflows import Workflow
return Workflow.get_default_workflow()
def get_workflow_with_options(self, workflow):
# this needs to be kept in sync with admin/forms.ptl,
# FormDefPage::workflow
if not self.workflow_options:
return workflow
for status in workflow.possible_status:
for item in status.items:
prefix = '%s*%s*' % (status.id, item.id)
for parameter in item.get_parameters():
value = self.workflow_options.get(prefix + parameter)
if value:
setattr(item, parameter, value)
return workflow
def set_workflow(self, workflow):
if workflow:
self.workflow_id = workflow.id
self._workflow = workflow
elif self.workflow_id:
self.workflow_id = None
workflow = property(get_workflow, set_workflow)
@property
def keywords_list(self):
if not self.keywords:
return []
return [x.strip() for x in self.keywords.split(',')]
@property
def appearance_keywords_list(self):
if not get_publisher().has_site_option('formdef-appearance-keywords'):
return []
if not self.appearance_keywords:
return []
return [x.strip() for x in self.appearance_keywords.split()]
def get_variable_options(self):
variables = {}
if not self.workflow.variables_formdef:
return variables
if not self.workflow_options:
return variables
for field in self.workflow.variables_formdef.fields:
if not field.varname:
continue
option_name = 'form_option_' + field.varname
variables[option_name] = self.workflow_options.get(field.varname)
if field.store_display_value:
if '%s_display' % field.varname in self.workflow_options:
variables[option_name + '_raw'] = variables[option_name]
variables[option_name] = self.workflow_options.get(
'%s_display' % field.varname)
if field.store_structured_value:
if '%s_structured' % field.varname in self.workflow_options:
variables[option_name + '_structured'] = self.workflow_options.get(
'%s_structured' % field.varname)
return variables
def get_variable_options_for_form(self):
variables = {}
if not self.workflow.variables_formdef:
return variables
if not self.workflow_options:
return {}
for field in self.workflow.variables_formdef.fields:
if not field.varname:
continue
variables[str(field.id)] = self.workflow_options.get(field.varname)
return variables
def set_variable_options(self, form):
data = self.workflow.variables_formdef.get_data(form)
variables = {}
for field in self.workflow.variables_formdef.fields:
if not field.varname:
continue
variables[field.varname] = data.get(field.id)
if field.store_display_value:
variables[field.varname + '_display'] = data.get(field.id + '_display')
if field.store_structured_value:
variables[field.varname + '_structured'] = data.get(field.id + '_structured')
if not self.workflow_options:
self.workflow_options = {}
self.workflow_options.update(variables)
@classmethod
def get_by_urlname(cls, url_name, ignore_migration=False):
return cls.get_on_index(url_name, 'url_name', ignore_migration=ignore_migration)
def get_url(self, backoffice=False, preview=False):
if backoffice:
base_url = get_publisher().get_backoffice_url() + '/management'
elif preview:
base_url = get_publisher().get_frontoffice_url() + '/preview'
else:
base_url = get_publisher().get_frontoffice_url()
return '%s/%s/' % (base_url, self.url_name)
def get_api_url(self):
base_url = get_publisher().get_frontoffice_url()
return '%s/api/forms/%s/' % (base_url, self.url_name)
def get_admin_url(self):
base_url = get_publisher().get_backoffice_url()
return '%s/forms/%s/' % (base_url, self.id)
def get_backoffice_submission_url(self):
base_url = get_publisher().get_backoffice_url() + '/submission'
return '%s/%s/' % (base_url, self.url_name)
def get_display_id_format(self):
return '[formdef_id]-[form_number_raw]'
def create_form(self, page=None, displayed_fields=None, transient_formdata=None):
form = Form(enctype="multipart/form-data", use_tokens=False)
if self.appearance_keywords:
form.attrs['class'] = 'quixote %s' % self.appearance_keywords
if self.keywords:
form.attrs['data-keywords'] = ' '.join(self.keywords_list)
form.ERROR_NOTICE = _('There were errors processing the form and '
'you cannot go to the next page. Do '
'check below that you filled all fields correctly.')
self.add_fields_to_form(form,
page=page,
displayed_fields=displayed_fields,
transient_formdata=transient_formdata)
return form
def add_fields_to_form(self,
form,
page=None,
displayed_fields=None,
form_data=None, # a dictionary, to fill fields
transient_formdata=None): # a FormData
current_page = 0
on_page = (page is None)
for field in self.fields:
field.formdef = self
if field.type == 'page':
if on_page:
break
if page.id == field.id:
on_page = True
continue
if not on_page:
continue
visible = field.is_visible(form_data, self)
if not visible:
if not field.has_live_conditions(self):
# no live conditions so field can be skipped
continue
if type(displayed_fields) is list:
displayed_fields.append(field)
value = None
if form_data:
value = form_data.get(field.id)
widget = field.add_to_form(form, value)
widget.is_hidden = not(visible)
widget.field = field
if transient_formdata and not widget.is_hidden:
transient_formdata.data.update(self.get_field_data(field, widget))
# invalidate cache as comment fields (and other things?) may
# have accessed variables in non-lazy mode and caused a cache
# with now-obsolete values.
get_publisher().substitutions.invalidate_cache()
widget._parsed = False
widget.error = None
def get_page(self, page_no):
return [x for x in self.fields if x.type == 'page'][page_no]
def create_view_form(self, dict={}, use_tokens=True, visible=True):
form = Form(enctype='multipart/form-data', use_tokens=use_tokens)
if not visible:
form.attrs['style'] = 'display: none;'
if self.keywords:
form.attrs['data-keywords'] = ' '.join(self.keywords_list)
form_fields = self.fields
if form_fields and form_fields[0].type != 'page':
# add fake initial page in case it's missing
form_fields = [fields.PageField(label='', type='page')] + form_fields
# 1st pass to group fields on different pages
pages = []
current_page = {}
for field in form_fields:
if field.type == 'page':
current_page = {'page': field, 'fields': []}
current_page['disabled'] = not field.is_visible(dict, self)
pages.append(current_page)
continue
if current_page['disabled']:
continue
if field.type == 'title' and (
not current_page['fields'] and
current_page['page'].label == field.label):
# don't include first title of a page if that title has the
# same text as the page.
continue
if field.type in ('title', 'subtitle', 'comment') and not field.include_in_validation_page:
# don't render field that wouldn't be displayed.
continue
if not field.is_visible(dict, self):
continue
current_page['fields'].append(field)
# 2nd pass to create view form
for page in pages:
visible_contents = False
if page['fields'] and any([x.include_in_validation_page for x in page['fields']]):
visible_contents = True
form.widgets.append(HtmlWidget(htmltext('<div class="page">')))
if page['page'].label:
form.widgets.append(HtmlWidget(htmltext('<h3>%s</h3>') % page['page'].label))
form.widgets.append(HtmlWidget(htmltext('<div>')))
for field in page['fields']:
value = dict.get(field.id)
if not field.include_in_validation_page:
form.widgets.append(HtmlWidget(htmltext('<div style="display: none;">')))
field.add_to_view_form(form, value)
form.widgets.append(HtmlWidget(htmltext('</div>')))
else:
field.add_to_view_form(form, value)
if visible_contents:
form.widgets.append(HtmlWidget(htmltext('</div></div>')))
return form
def set_live_condition_sources(self, form, fields):
live_condition_fields = {}
for field in fields:
if field.condition:
field.varnames = field.get_condition_varnames(formdef=self)
for varname in field.varnames:
if not varname in live_condition_fields:
live_condition_fields[varname] = []
live_condition_fields[varname].append(field)
if field.key == 'item' and field.data_source:
real_data_source = data_sources.get_real(field.data_source)
if real_data_source.get('type') != 'json':
continue
varnames = field.get_referenced_varnames(
formdef=self,
value=real_data_source.get('value'))
for varname in varnames:
if not varname in live_condition_fields:
live_condition_fields[varname] = []
live_condition_fields[varname].append(field)
if field.prefill and field.prefill.get('locked') and field.prefill.get('type') == 'string':
for varname in field.get_referenced_varnames(formdef=self, value=field.prefill.get('value', '')):
if varname not in live_condition_fields:
live_condition_fields[varname] = []
live_condition_fields[varname].append(field)
if field.key == 'comment':
for varname in field.get_referenced_varnames(formdef=self, value=field.label):
if not varname in live_condition_fields:
live_condition_fields[varname] = []
live_condition_fields[varname].append(field)
for field in fields:
if field.varname in live_condition_fields:
widget = form.get_widget('f%s' % field.id)
if widget:
widget.live_condition_source = True
widget.live_condition_fields = live_condition_fields[field.varname]
@classmethod
def get_field_data(cls, field, widget):
d = {}
d[field.id] = widget.parse()
if d.get(field.id) is not None and field.convert_value_from_str:
d[field.id] = field.convert_value_from_str(d[field.id])
if d.get(field.id) is not None and field.store_display_value:
display_value = field.store_display_value(d, field.id)
if display_value is not None:
d['%s_display' % field.id] = display_value
elif '%s_display' % field.id in d:
del d['%s_display' % field.id]
if d.get(field.id) is not None and field.store_structured_value:
structured_value = field.store_structured_value(d, field.id)
if structured_value is not None:
d['%s_structured' % field.id] = structured_value
elif '%s_structured' % field.id in d:
del d['%s_structured' % field.id]
if getattr(widget, 'cleanup', None):
widget.cleanup()
return d
def get_data(self, form):
d = {}
for field in self.fields:
widget = form.get_widget('f%s' % field.id)
if widget:
d.update(self.get_field_data(field, widget))
return d
def export_to_json(self, include_id=False, indent=None, anonymise=True):
charset = get_publisher().site_charset
root = {}
root['name'] = force_text(self.name, charset)
if include_id and self.id:
root['id'] = str(self.id)
if self.category:
root['category'] = force_text(self.category.name, charset)
root['category_id'] = str(self.category.id)
if self.workflow:
root['workflow'] = self.workflow.get_json_export_dict(include_id=include_id)
if self.max_field_id is None and self.fields:
self.max_field_id = max([lax_int(x.id) for x in self.fields])
more_attributes = []
if self.max_field_id:
more_attributes.append('max_field_id')
if self.last_modification_time:
more_attributes.append('last_modification_time')
if include_id:
more_attributes.append('last_modification_user_id')
for attribute in self.TEXT_ATTRIBUTES + self.BOOLEAN_ATTRIBUTES + more_attributes:
if not hasattr(self, attribute):
continue
root[attribute] = getattr(self, attribute)
if type(root[attribute]) is time.struct_time:
root[attribute] = time.strftime('%Y-%m-%dT%H:%M:%S',
root[attribute])
root['fields'] = []
if self.fields:
for field in self.fields:
root['fields'].append(field.export_to_json(include_id=include_id, anonymise=anonymise))
if self.geolocations:
root['geolocations'] = self.geolocations.copy()
if self.workflow_options:
root['options'] = self.workflow_options.copy()
for k, v in list(root['options'].items()):
# convert time.struct_time to strings as python3 would
# serialize it as tuple.
if isinstance(v, time.struct_time):
root['options'][k] = time.strftime('%Y-%m-%dT%H:%M:%S', v)
if self.required_authentication_contexts:
root['required_authentication_contexts'] = self.required_authentication_contexts[:]
return json.dumps(root, indent=indent, sort_keys=True, cls=misc.JSONEncoder)
@classmethod
def import_from_json(cls, fd, charset=None, include_id=False):
if charset is None:
charset = get_publisher().site_charset
formdef = cls()
def unicode2str(v):
if isinstance(v, dict):
return dict([(unicode2str(k), unicode2str(v)) for k, v in v.items()])
elif isinstance(v, list):
return [unicode2str(x) for x in v]
elif isinstance(v, six.string_types):
return force_str(v)
else:
return v
# we have to make sure all strings are str object, not unicode.
value = unicode2str(json.load(fd))
if include_id and 'id' in value:
formdef.id = value.get('id')
if include_id and 'category_id' in value:
formdef.category_id = value.get('category_id')
elif 'category' in value:
category = value.get('category')
for c in Category.select():
if c.name == category:
formdef.category_id = c.id
break
if include_id and 'workflow_id' in value:
formdef.workflow_id = value.get('workflow_id')
elif (include_id
and 'workflow' in value
and isinstance(value['workflow'], dict)
and 'id' in value['workflow']):
formdef.workflow_id = value['workflow'].get('id')
elif 'workflow' in value:
if isinstance(value['workflow'], six.string_types):
workflow = value.get('workflow')
else:
workflow = value['workflow'].get('name')
from wcs.workflows import Workflow
for w in Workflow.select():
if w.name == workflow:
formdef.workflow_id = w.id
break
more_attributes = ['max_field_id', 'last_modification_time',
'last_modification_user_id']
for attribute in cls.TEXT_ATTRIBUTES + cls.BOOLEAN_ATTRIBUTES + more_attributes:
if attribute in value:
setattr(formdef, attribute, value.get(attribute))
# fixup last_modification_time to the proper type
if formdef.last_modification_time:
formdef.last_modification_time = time.strptime(
formdef.last_modification_time, '%Y-%m-%dT%H:%M:%S')
formdef.fields = []
for i, field in enumerate(value.get('fields', [])):
try:
field_o = fields.get_field_class_by_type(field.get('type'))()
except KeyError:
raise FormdefImportError(N_('Unknown field type'),
details=field.findtext('type'))
field_o.init_with_json(field, include_id=True)
if not field_o.id:
# this assumes all fields will have id, or none of them
field_o.id = str(i)
formdef.fields.append(field_o)
if formdef.fields and not formdef.max_field_id:
formdef.max_field_id = max([lax_int(x.id) for x in formdef.fields])
if value.get('options'):
formdef.workflow_options = value.get('options')
for option_key, option_value in formdef.workflow_options.items():
if isinstance(option_value, dict) and 'filename' in option_value:
filename = option_value['filename']
upload = Upload(filename, content_type=option_value['content_type'])
new_value = UploadedFile(get_publisher().app_dir, filename, upload)
new_value.set_content(base64.decodebytes(force_bytes(option_value['content'])))
formdef.workflow_options[option_key] = new_value
if value.get('geolocations'):
formdef.geolocations = value.get('geolocations')
if value.get('required_authentication_contexts'):
formdef.required_authentication_contexts = [str(x) for x in
value.get('required_authentication_contexts')]
return formdef
def export_to_xml(self, include_id=False):
charset = get_publisher().site_charset
root = ET.Element(self.xml_root_node)
if include_id and self.id:
root.attrib['id'] = str(self.id)
for text_attribute in list(self.TEXT_ATTRIBUTES):
if not hasattr(self, text_attribute) or not getattr(self, text_attribute):
continue
ET.SubElement(root, text_attribute).text = force_text(
getattr(self, text_attribute), charset)
for boolean_attribute in self.BOOLEAN_ATTRIBUTES:
if not hasattr(self, boolean_attribute):
continue
value = getattr(self, boolean_attribute)
if value:
value = 'true'
else:
value = 'false'
ET.SubElement(root, boolean_attribute).text = value
if self.category:
elem = ET.SubElement(root, 'category')
elem.text = force_text(self.category.name, charset)
if include_id:
elem.attrib['category_id'] = str(self.category.id)
if self.workflow:
elem = ET.SubElement(root, 'workflow')
elem.text = force_text(self.workflow.name, charset)
if include_id:
elem.attrib['workflow_id'] = str(self.workflow.id)
if self.max_field_id is None and self.fields:
self.max_field_id = max([lax_int(x.id) for x in self.fields])
if self.max_field_id:
ET.SubElement(root, 'max_field_id').text = str(self.max_field_id)
if self.last_modification_time:
elem = ET.SubElement(root, 'last_modification')
elem.text = time.strftime('%Y-%m-%d %H:%M:%S', self.last_modification_time)
if include_id:
elem.attrib['user_id'] = str(self.last_modification_user_id)
fields = ET.SubElement(root, 'fields')
for field in self.fields or []:
fields.append(field.export_to_xml(charset=charset, include_id=include_id))
roles_elements = [
('roles', 'user-roles'),
('backoffice_submission_roles', 'backoffice-submission-roles')
]
for attr_name, node_name in roles_elements:
if not getattr(self, attr_name, None):
continue
roles = ET.SubElement(root, node_name)
for role_id in getattr(self, attr_name):
if role_id is None:
continue
role_id = str(role_id)
if role_id.startswith('_') or role_id == 'logged-users':
role = force_text(role_id, charset)
else:
try:
role = force_text(Role.get(role_id).name, charset)
except KeyError:
role = force_text(role_id, charset)
sub = ET.SubElement(roles, 'role')
if include_id:
sub.attrib['role_id'] = role_id
sub.text = role
if self.workflow_roles:
roles = ET.SubElement(root, 'roles')
for role_key, role_id in self.workflow_roles.items():
if role_id is None:
continue
role_id = str(role_id)
if role_id.startswith('_') or role_id == 'logged-users':
role = force_text(role_id, charset)
else:
try:
role = force_text(Role.get(role_id).name, charset)
except KeyError:
role = force_text(role_id, charset)
sub = ET.SubElement(roles, 'role')
sub.attrib['role_key'] = role_key
if include_id:
sub.attrib['role_id'] = role_id
sub.text = role
options = ET.SubElement(root, 'options')
for option in sorted(self.workflow_options or []):
element = ET.SubElement(options, 'option')
element.attrib['varname'] = option
option_value = self.workflow_options.get(option)
if isinstance(option_value, six.string_types):
element.text = force_text(self.workflow_options.get(option, ''), charset)
elif hasattr(option_value, 'base_filename'):
ET.SubElement(element, 'filename').text = option_value.base_filename
ET.SubElement(element, 'content_type').text = (
option_value.content_type or 'application/octet-stream')
ET.SubElement(element, 'content').text = force_text(base64.b64encode(option_value.get_content()))
elif isinstance(option_value, time.struct_time):
element.text = time.strftime('%Y-%m-%d', option_value)
element.attrib['type'] = 'date'
else:
pass # TODO: extend support to other types
geolocations = ET.SubElement(root, 'geolocations')
for geoloc_key, geoloc_label in (self.geolocations or {}).items():
element = ET.SubElement(geolocations, 'geolocation')
element.attrib['key'] = geoloc_key
element.text = force_text(geoloc_label, charset)
if self.required_authentication_contexts:
element = ET.SubElement(root, 'required_authentication_contexts')
for auth_context in self.required_authentication_contexts:
ET.SubElement(element, 'method').text = force_text(auth_context)
return root
@classmethod
def import_from_xml(cls, fd, charset=None, include_id=False,
fix_on_error=False, check_datasources=True):
from wcs.carddef import CardDef
try:
tree = ET.parse(fd)
except:
raise ValueError()
formdef = cls.import_from_xml_tree(tree, charset=charset,
include_id=include_id, fix_on_error=fix_on_error)
if formdef.url_name:
try:
obj = cls.get_on_index(formdef.url_name, 'url_name', ignore_migration=True)
except KeyError:
pass
else:
formdef.url_name = formdef.get_new_url_name()
# fix max_field_id if necessary
if formdef.max_field_id is not None:
max_field_id = max([lax_int(x.id) for x in formdef.fields])
if formdef.max_field_id < max_field_id:
formdef.max_field_id = max_field_id
if check_datasources:
# check if datasources are defined
unknown_datasources = set()
for field in formdef.fields:
data_source = getattr(field, 'data_source', None)
if data_source:
data_source_id = data_source.get('type')
if isinstance(data_sources.get_object(data_source),
data_sources.StubNamedDataSource):
unknown_datasources.add(data_source_id)
elif data_source_id and data_source_id.startswith('carddef:'):
parts = data_source_id.split(':')
# check if carddef exists
url_name = parts[1]
if formdef.xml_root_node == 'carddef' and formdef.url_name == url_name:
# reference to itself, it's ok
continue
try:
CardDef.get_by_urlname(url_name)
except KeyError:
unknown_datasources.add(data_source_id)
continue
if len(parts) == 2:
continue
lookup_criterias = [
Equal('formdef_type', 'carddef'),
Equal('visibility', 'datasource'),
Equal('slug', parts[2]),
]
try:
get_publisher().custom_view_class.select(lookup_criterias)[0]
except IndexError:
unknown_datasources.add(data_source_id)
if unknown_datasources:
raise FormdefImportError(N_('Unknown datasources'),
details=', '.join(sorted(unknown_datasources)))
# check if all field id are unique
known_field_ids = set()
for field in formdef.fields:
if field.id in known_field_ids:
raise FormdefImportRecoverableError(N_('Duplicated field identifiers'))
known_field_ids.add(field.id)
return formdef
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, charset=None,
fix_on_error=False, snapshot=False):
if charset is None:
charset = get_publisher().site_charset
assert charset == 'utf-8'
formdef = cls()
if tree.find('name') is None or not tree.find('name').text:
raise FormdefImportError(N_('Missing name'))
# if the tree we get is actually a ElementTree for real, we get its
# root element and go on happily.
if not ET.iselement(tree):
tree = tree.getroot()
if tree.tag != cls.xml_root_node:
raise FormdefImportError(N_('Unexpected root node'))
if include_id and tree.attrib.get('id'):
formdef.id = tree.attrib.get('id')
for text_attribute in list(cls.TEXT_ATTRIBUTES):
value = tree.find(text_attribute)
if value is None or value.text is None:
continue
setattr(formdef, text_attribute, xml_node_text(value))
for boolean_attribute in cls.BOOLEAN_ATTRIBUTES:
value = tree.find(boolean_attribute)
if value is None:
continue
setattr(formdef, boolean_attribute, value.text == 'true')
formdef.fields = []
for i, field in enumerate(tree.find('fields')):
try:
field_o = fields.get_field_class_by_type(field.findtext('type'))()
except KeyError:
raise FormdefImportError(N_('Unknown field type'),
details=field.findtext('type'))
field_o.init_with_xml(field, charset, include_id=True)
if fix_on_error or not field_o.id:
# this assumes all fields will have id, or none of them
field_o.id = str(i+1)
formdef.fields.append(field_o)
if formdef.fields:
value = tree.find('max_field_id')
if value is not None:
formdef.max_field_id = int(value.text)
else:
formdef.max_field_id = max([lax_int(x.id) for x in formdef.fields])
formdef.workflow_options = {}
for option in tree.findall('options/option'):
option_value = None
if option.attrib.get('type') == 'date':
option_value = time.strptime(option.text, '%Y-%m-%d')
elif option.text:
option_value = xml_node_text(option)
elif option.findall('filename'):
filename = xml_node_text(option.find('filename'))
upload = Upload(filename, content_type=xml_node_text(option.find('content_type')))
option_value = UploadedFile(get_publisher().app_dir, filename, upload)
option_value.set_content(base64.decodebytes(force_bytes(option.find('content').text)))
formdef.workflow_options[option.attrib.get('varname')] = option_value
if tree.find('last_modification') is not None:
node = tree.find('last_modification')
formdef.last_modification_time = time.strptime(node.text, '%Y-%m-%d %H:%M:%S')
if include_id and node.attrib.get('user_id'):
formdef.last_modification_user_id = node.attrib.get('user_id')
if tree.find('category') is not None:
category_node = tree.find('category')
if include_id and category_node.attrib.get('category_id'):
category_id = str(category_node.attrib.get('category_id'))
if Category.has_key(category_id):
formdef.category_id = category_id
else:
category = xml_node_text(category_node)
for c in Category.select():
if c.name == category:
formdef.category_id = c.id
break
if tree.find('workflow') is not None:
from wcs.workflows import Workflow
workflow_node = tree.find('workflow')
if include_id and workflow_node.attrib.get('workflow_id'):
workflow_id = workflow_node.attrib.get('workflow_id')
if Workflow.has_key(workflow_id):
formdef.workflow_id = workflow_id
else:
workflow = xml_node_text(workflow_node)
for w in Workflow.select():
if w.name == workflow:
formdef.workflow_id = w.id
break
def get_role_by_node(role_node):
role_id = None
value = xml_node_text(role_node)
if value.startswith('_') or value == 'logged-users':
role_id = value
elif include_id:
role_id = role_node.attrib.get('role_id')
if role_id and not Role.has_key(role_id):
role_id = None
if not role_id:
for role in Role.select(ignore_errors=True):
if role.name == value:
role_id = role.id
break
return role_id
roles_elements = [
('roles', 'user-roles'),
('backoffice_submission_roles', 'backoffice-submission-roles')
]
for attr_name, node_name in roles_elements:
if tree.find(node_name) is None:
continue
roles_node = tree.find(node_name)
roles = []
setattr(formdef, attr_name, roles)
for child in roles_node:
role_id = get_role_by_node(child)
if role_id:
roles.append(role_id)
if tree.find('roles') is not None:
roles_node = tree.find('roles')
formdef.workflow_roles = {}
for child in roles_node:
role_key = child.attrib['role_key']
role_id = get_role_by_node(child)
formdef.workflow_roles[role_key] = role_id
if tree.find('geolocations') is not None:
geolocations_node = tree.find('geolocations')
formdef.geolocations = {}
for child in geolocations_node:
geoloc_key = child.attrib['key']
geoloc_value = xml_node_text(child)
formdef.geolocations[geoloc_key] = geoloc_value
if tree.find('required_authentication_contexts') is not None:
node = tree.find('required_authentication_contexts')
formdef.required_authentication_contexts = []
for child in node:
formdef.required_authentication_contexts.append(str(child.text))
return formdef
def get_detailed_email_form(self, formdata, url):
details = []
if formdata.user_id and formdata.user:
details.append(_('User name:'))
details.append(' %s' % formdata.user.name)
details.append('')
data = formdata.data
for field in self.fields:
if isinstance(field, (fields.SubtitleField, fields.TitleField, fields.CommentField,
fields.PageField)):
continue
if data is None:
continue
if data.get(field.id) is None:
continue
if data.get(field.id + '_display'):
value = data.get(field.id + '_display')
else:
value = data.get(field.id)
details.append(_('%s:') % field.label)
if field.type in ('text', 'file'):
# XXX: howto support preformatted text in a dl in docutils ?
details.append((' %s' % value).replace('\n', '\n '))
else:
details.append('%s' % field.get_rst_view_value(value, indent=' '))
details.append('')
return '\n'.join(details)
def get_submitter_email(self, formdata):
users_cfg = get_cfg('users', {})
field_email = users_cfg.get('field_email') or 'email'
# look up in submitted form for one that would hold the user
# email (the one set to be prefilled by user email)
if formdata.data:
fields = formdata.formdef.fields
for field in fields:
if not hasattr(field, 'prefill'):
continue
if field.prefill and field.prefill.get('type') == 'user':
if field.prefill.get('value') == field_email:
v = formdata.data.get(field.id)
if v:
return v
# if nothing was found, get email from user profile
if formdata.user and formdata.user.email and formdata.user.is_active:
return formdata.user.email
return None
def get_static_substitution_variables(self, minimal=False):
d = {
'form_name': self.name,
'form_slug': self.url_name,
'form_class_name': self.__class__.__name__, # reserved for logged errors
}
if not minimal:
from wcs.variables import LazyFormDef
d['form_objects'] = LazyFormDef(self).objects
if self.category:
d.update(self.category.get_substitution_variables(minimal=minimal))
d.update(self.get_variable_options())
return d
def get_substitution_variables(self, minimal=False):
from .qommon.substitution import CompatibilityNamesDict
from wcs.variables import LazyFormDef
return CompatibilityNamesDict({'form': LazyFormDef(self)})
def get_detailed_evolution(self, formdata):
if not formdata.evolution:
return None
details = []
evo = formdata.evolution[-1]
if evo.who:
evo_who = None
if evo.who == '_submitter':
if formdata.user_id:
evo_who = formdata.user_id
else:
evo_who = evo.who
if evo_who:
user_who = get_publisher().user_class.get(evo_who, ignore_errors=True)
if user_who:
details.append(_('User name'))
details.append(' %s' % user_who.name)
if evo.status:
details.append(_('Status'))
details.append(' %s' % formdata.get_status_label())
if evo.comment:
details.append('\n%s\n' % evo.comment)
return '\n\n----\n\n' + '\n'.join(details)
def is_of_concern_for_role_id(self, role_id):
if not self.workflow_roles:
return False
return (role_id in self.workflow_roles.values())
def is_of_concern_for_user(self, user, formdata=None):
if not self.workflow_roles:
self.workflow_roles = {}
# if the formdef itself has some function attributed to the user, grant
# access.
for role_id in self.workflow_roles.values():
if role_id in user.get_roles():
return True
# if there was some redispatching of function, values will be different
# in formdata, check them.
if formdata and formdata.workflow_roles:
for role_id in formdata.workflow_roles.values():
if role_id in user.get_roles():
return True
# if no formdata was given, lookup if there are some existing formdata
# where the user has access.
if not formdata:
data_class = self.data_class()
for role_id in user.get_roles():
if data_class.get_ids_with_indexed_value('workflow_roles', role_id):
return True
return False
def is_user_allowed_read(self, user, formdata=None):
if not user:
if formdata and get_session() and \
get_session().is_anonymous_submitter(formdata):
return True
return False
if user.is_admin:
return True
user_roles = set(user.get_roles())
user_roles.add(logged_users_role().id)
def ensure_role_are_strings(roles):
# makes sure all roles are defined as strings, as different origins
# (formdef, user, workflow status...) may define them differently.
return set([str(x) for x in roles if x])
user_roles = ensure_role_are_strings(user_roles)
if formdata and formdata.is_submitter(user):
return True
if self.is_of_concern_for_user(user):
if not formdata:
return True
if formdata:
# current status
concerned_roles = ensure_role_are_strings(formdata.get_concerned_roles())
if '_submitter' in concerned_roles and formdata.is_submitter(user):
return True
if user_roles.intersection(concerned_roles):
return True
return False
def is_user_allowed_read_status_and_history(self, user, formdata=None):
if user and user.is_admin:
return True
if user:
user_roles = set(user.get_roles())
else:
user_roles = set([])
if not self.workflow_roles:
self.workflow_roles = {}
form_roles = [x for x in self.workflow_roles.values() if x]
if formdata and formdata.workflow_roles:
form_roles.extend([x for x in formdata.workflow_roles.values() if x])
return self.is_user_allowed_read(user, formdata=formdata)
def is_disabled(self):
if self.disabled:
return True
if self.publication_date:
try:
publication_datetime = get_as_datetime(self.publication_date)
except ValueError:
return False
if publication_datetime > datetime.datetime.now():
return True
if self.expiration_date:
try:
expiration_datetime = get_as_datetime(self.expiration_date)
except ValueError:
return False
if expiration_datetime < datetime.datetime.now():
return True
return False
@classmethod
def update_filetype(cls, filetype_id, previous_filetype, new_filetype):
# look for file fields in all formdefs, to update them with the
# new mimetypes.
if previous_filetype == new_filetype:
return
for formdef in cls.select():
changed = False
for field in formdef.fields:
if not hasattr(field, 'document_type'):
continue
if not field.document_type:
continue
if field.document_type['id'] == filetype_id:
previous_filetype = field.document_type.copy()
del previous_filetype['id']
if previous_filetype == new_filetype:
continue
field.document_type = new_filetype.copy()
field.document_type['id'] = filetype_id
changed = True
if changed:
formdef.store()
class _EmptyClass(object): # helper for instance creation without calling __init__
pass
def __copy__(self, memo=None, deepcopy=False):
formdef_copy = self._EmptyClass()
formdef_copy.__class__ = self.__class__
if deepcopy:
formdef_copy.__dict__ = copy.deepcopy(self.__dict__, memo=memo)
else:
formdef_copy.__dict__ = copy.copy(self.__dict__)
return formdef_copy
def __deepcopy__(self, memo=None):
return self.__copy__(memo=memo, deepcopy=True)
# don't pickle computed attributes
def __getstate__(self):
odict = copy.copy(self.__dict__)
if '_workflow' in odict:
del odict['_workflow']
if '_start_page' in odict:
del odict['_start_page']
if self.lightweight and 'fields' in odict:
# will be stored independently
del odict['fields']
return odict
def __setstate__(self, dict):
self.__dict__ = dict
self._workflow = None
self._start_page = None
@classmethod
def storage_load(cls, fd, **kwargs):
o = super(FormDef, cls).storage_load(fd)
if kwargs.get('lightweight'):
o.fields = Ellipsis
return o
if cls.lightweight:
try:
o.fields = pickle.load(fd, **PICKLE_KWARGS)
except EOFError:
pass # old format
return o
@classmethod
def storage_dumps(cls, object):
if getattr(object, 'fields', None) is Ellipsis:
raise RuntimeError('storing a lightweight object is not allowed')
# use two separate pickle chunks to store the formdef, the first field
# is everything but fields (excluded via __getstate__) while the second
# chunk contains the fields.
return pickle.dumps(object, protocol=2) + pickle.dumps(object.fields, protocol=2)
from .qommon.admin.emails import EmailsDirectory
EmailsDirectory.register('new_user', N_('Notification of creation to user'),
enabled=False,
category=N_('Workflow'),
default_subject=N_('New form ({{ form_name }})'),
default_body=N_('''\
Hello,
This mail is a reminder about the form you just submitted.
{% if form_user %}
You can consult it with this link: {{ form_url }}
{% endif %}
{% if form_details %}
For reference, here are the details:
{{ form_details }}
{% endif %}
'''))
EmailsDirectory.register('change_user', N_('Notification of change to user'),
category=N_('Workflow'),
default_subject=N_('Form status change ({{ form_name }})'),
default_body=N_('''\
Hello,
{% if form_status_changed %}
Status of the form you submitted just changed (from "{{ form_previous_status }}" to "{{ form_status }}").
{% endif %}
{% if form_user %}
You can consult it with this link: {{ form_url }}
{% endif %}
{% if form_comment %}New comment: {{ form_comment }}{% endif %}
{% if form_evolution %}
{{ form_evolution }}
{% endif %}
'''))
EmailsDirectory.register('new_receiver', N_('Notification of creation to receiver'),
enabled=False,
category=N_('Workflow'),
default_subject=N_('New form ({{ form_name }})'),
default_body=N_('''\
Hello,
A new form has been submitted, you can see it with this link:
{{ form_url_backoffice }}
{% if form_details %}
For reference, here are the details:
{{ form_details }}
{% endif %}
'''))
EmailsDirectory.register('change_receiver', N_('Notification of change to receiver'),
category=N_('Workflow'),
default_subject=N_('Form status change ({{ form_name }})'),
default_body=N_('''\
Hello,
A form just changed, you can consult it with this link:
{{ form_url_backoffice }}
{% if form_status_changed %}
Status of the form just changed (from "{{ form_previous_status }}" to "{{ form_status }}").
{% endif %}
{% if form_comment %}New comment: {{ form_comment }}{% endif %}
{% if form_evolution %}
{{ form_evolution }}
{% endif %}
'''))
Substitutions.register('form_name', category=N_('Form'), comment=N_('Form Name'))
def clean_drafts(publisher):
import wcs.qommon.storage as st
from wcs.carddef import CardDef
removal_date = datetime.date.today() - datetime.timedelta(days=100)
for formdef in FormDef.select() + CardDef.select():
for formdata in formdef.data_class().select(
[st.Equal('status', 'draft'),
st.Less('receipt_time', removal_date.timetuple())]):
formdata.remove_self()
def clean_unused_files(publisher):
from wcs.wf.attachment import AttachmentEvolutionPart
unused_files_behaviour = publisher.get_site_option('unused-files-behaviour')
if unused_files_behaviour not in ('move', 'remove'):
return
known_filenames = set()
known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'uploads/*'))])
known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'attachments/*/*'))])
def is_upload(obj):
# we can't use isinstance() because obj can be a
# wcs.qommon.form.PicklableUpload or a qommon.form.PicklableUpload
return obj.__class__.__name__ == 'PicklableUpload'
def is_attachment(obj):
return obj.__class__.__name__ == 'AttachmentEvolutionPart'
def accumulate_filenames():
from wcs.carddef import CardDef
for formdef in FormDef.select(ignore_migration=True) + CardDef.select(ignore_migration=True):
for option_data in (formdef.workflow_options or {}).values():
if is_upload(option_data):
yield option_data.get_fs_filename()
for formdata in formdef.data_class().select(ignore_errors=True):
for field_data in itertools.chain(
(formdata.data or {}).values(),
(formdata.workflow_data or {}).values()):
if is_upload(field_data):
yield field_data.get_fs_filename()
elif isinstance(field_data, dict) and isinstance(field_data.get('data'), list):
for subfield_rowdata in field_data.get('data'):
if isinstance(subfield_rowdata, dict):
for block_field_data in subfield_rowdata.values():
if is_upload(block_field_data):
yield block_field_data.get_fs_filename()
for part in formdata.iter_evolution_parts():
if is_attachment(part):
yield part.filename
for user in publisher.user_class.select():
for field_data in (user.form_data or {}).values():
if is_upload(field_data):
yield field_data.get_fs_filename()
used_filenames = set()
for filename in accumulate_filenames():
if not filename: # alternative storage
continue
if not os.path.isabs(filename):
filename = os.path.join(publisher.app_dir, filename)
used_filenames.add(filename)
unused_filenames = known_filenames - used_filenames
for filename in unused_filenames:
try:
if unused_files_behaviour == 'move':
new_filename = os.path.join(publisher.app_dir, 'unused-files', filename[len(publisher.app_dir)+1:])
if os.path.exists(new_filename):
os.unlink(filename)
else:
new_dirname = os.path.dirname(new_filename)
if not os.path.exists(new_dirname):
os.makedirs(new_dirname)
os.rename(filename, new_filename)
else:
os.unlink(filename)
except OSError:
pass
def get_formdefs_of_all_kinds():
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
from wcs.wf.form import FormWorkflowStatusItem
from wcs.admin.settings import UserFieldsFormDef
from wcs.workflows import Workflow
kwargs = {
'ignore_errors': True,
'ignore_migration': True,
}
formdefs = [UserFieldsFormDef()]
formdefs += FormDef.select(**kwargs)
formdefs += BlockDef.select(**kwargs)
formdefs += CardDef.select(**kwargs)
for workflow in Workflow.select(**kwargs):
for status in workflow.possible_status:
for item in status.items:
if isinstance(item, FormWorkflowStatusItem) and item.formdef:
formdefs.append(item.formdef)
if workflow.variables_formdef:
formdefs.append(workflow.variables_formdef)
if workflow.backoffice_fields_formdef:
formdefs.append(workflow.backoffice_fields_formdef)
return formdefs
if get_publisher_class():
# once a month, look for drafts to remove
get_publisher_class().register_cronjob(CronJob(clean_drafts,
name='clean_drafts',
days=[2], hours=[0], minutes=[0]))
# once a day, look for unused files
get_publisher_class().register_cronjob(CronJob(clean_unused_files,
name='clean_unused_files',
hours=[2], minutes=[0]))