1667 lines
63 KiB
Python
1667 lines
63 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import copy
|
|
import glob
|
|
import itertools
|
|
import pickle
|
|
import sys
|
|
import types
|
|
import json
|
|
import xml.etree.ElementTree as ET
|
|
import datetime
|
|
|
|
from django.utils import six
|
|
from django.utils.encoding import force_bytes, force_text
|
|
|
|
from quixote import get_request, get_publisher
|
|
from quixote.http_request import Upload
|
|
|
|
from .qommon import _, force_str, PICKLE_KWARGS
|
|
from .qommon.storage import StorableObject, fix_key
|
|
from .qommon.cron import CronJob
|
|
from .qommon.form import *
|
|
from .qommon.misc import simplify, get_as_datetime
|
|
from .qommon import get_cfg
|
|
from .qommon.substitution import Substitutions
|
|
from .qommon.publisher import get_publisher_class
|
|
|
|
from .formdata import FormData
|
|
from .roles import Role, logged_users_role
|
|
from .categories import Category
|
|
from . import fields
|
|
from . import data_sources
|
|
|
|
if not hasattr(types, 'ClassType'):
|
|
types.ClassType = type
|
|
|
|
|
|
class FormdefImportError(Exception):
|
|
def __init__(self, msg, details=None):
|
|
self.msg = msg
|
|
self.details = details
|
|
|
|
|
|
class FormdefImportRecoverableError(FormdefImportError):
|
|
pass
|
|
|
|
|
|
class FormField(object):
|
|
### only used to unpickle form fields from older (<200603) versions
|
|
def __setstate__(self, dict):
|
|
type = dict['type']
|
|
self.real_field = fields.get_field_class_by_type(type)(**dict)
|
|
|
|
|
|
def lax_int(s):
|
|
try:
|
|
return int(s)
|
|
except (ValueError, TypeError):
|
|
return -1
|
|
|
|
|
|
class FormDef(StorableObject):
|
|
_names = 'formdefs'
|
|
_indexes = ['url_name']
|
|
_hashed_indexes = ['backoffice_submission_roles']
|
|
data_sql_prefix = 'formdata'
|
|
pickle_module_name = 'formdef'
|
|
xml_root_node = 'formdef'
|
|
|
|
name = None
|
|
description = None
|
|
keywords = None
|
|
url_name = None
|
|
internal_identifier = None # mostly for pickle
|
|
table_name = None # for SQL only
|
|
fields = None
|
|
category_id = None
|
|
workflow_id = None
|
|
workflow_options = None
|
|
workflow_roles = None
|
|
roles = None
|
|
required_authentication_contexts = None
|
|
backoffice_submission_roles = None
|
|
discussion = False
|
|
confirmation = True
|
|
detailed_emails = True
|
|
disabled = False
|
|
only_allow_one = False
|
|
enable_tracking_codes = False
|
|
disabled_redirection = None
|
|
always_advertise = False
|
|
publication_date = None
|
|
expiration_date = None
|
|
has_captcha = False
|
|
skip_from_360_view = False
|
|
appearance_keywords = None
|
|
digest_template = None
|
|
|
|
geolocations = None
|
|
|
|
last_modification_time = None
|
|
last_modification_user_id = None
|
|
|
|
max_field_id = None
|
|
|
|
# store fields in a separate pickle chunk
|
|
lightweight = True
|
|
|
|
# prefix for formdata variables
|
|
var_prefix = 'form'
|
|
|
|
# declarations for serialization
|
|
TEXT_ATTRIBUTES = ['name', 'url_name', 'description', 'keywords',
|
|
'publication_date', 'expiration_date', 'internal_identifier',
|
|
'disabled_redirection', 'appearance_keywords',
|
|
'digest_template']
|
|
BOOLEAN_ATTRIBUTES = ['discussion', 'detailed_emails', 'disabled',
|
|
'only_allow_one', 'enable_tracking_codes', 'confirmation',
|
|
'always_advertise',
|
|
'has_captcha', 'skip_from_360_view']
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(FormDef, self).__init__(*args, **kwargs)
|
|
self.fields = []
|
|
|
|
def migrate(self):
|
|
changed = False
|
|
|
|
if self.__dict__.get('fields') is Ellipsis:
|
|
# don't run migration on lightweight objects
|
|
return
|
|
|
|
if 'receiver' in self.__dict__:
|
|
self.receiver_id = self.__dict__['receiver']
|
|
del self.__dict__['receiver']
|
|
changed = True
|
|
|
|
if 'category' in self.__dict__:
|
|
self.category_id = self.__dict__['category']
|
|
del self.__dict__['category']
|
|
changed = True
|
|
|
|
if not self.url_name:
|
|
try:
|
|
int(self.id)
|
|
except ValueError:
|
|
self.url_name = self.id
|
|
else:
|
|
self.url_name = self.get_new_url_name()
|
|
changed = True
|
|
|
|
if self.fields and type(self.fields[0]) is dict:
|
|
for f in self.fields:
|
|
if 'name' in f:
|
|
f['label'] = f['name']
|
|
del f['name']
|
|
self.fields = [FormField(**x) for x in self.fields]
|
|
for i, f in enumerate(self.fields):
|
|
f.id = str(i)
|
|
for formdata in self.data_class().select():
|
|
for f in self.fields:
|
|
if not f.label in formdata.data:
|
|
continue
|
|
formdata.data[f.id] = formdata.data[f.label]
|
|
del formdata.data[f.label]
|
|
formdata.store()
|
|
changed = True
|
|
|
|
if self.fields and isinstance(self.fields[0], FormField):
|
|
# migration from generic FormField to specific Field classes
|
|
# (200603)
|
|
self.fields = [x.real_field for x in self.fields]
|
|
|
|
if 'public' in self.__dict__:
|
|
del self.__dict__['public']
|
|
changed = True
|
|
|
|
if 'receiver_id' in self.__dict__:
|
|
# migration from a simple receiver role to workflow roles
|
|
if not self.workflow_roles:
|
|
self.workflow_roles = {}
|
|
self.workflow_roles['_receiver'] = self.__dict__['receiver_id']
|
|
del self.__dict__['receiver_id']
|
|
changed = True
|
|
|
|
if not self.table_name and get_publisher().has_site_option('postgresql'):
|
|
from . import sql
|
|
self.table_name = sql.get_formdef_table_name(self)
|
|
changed = True
|
|
|
|
if self.max_field_id is None and self.fields:
|
|
self.max_field_id = max([lax_int(x.id) for x in self.fields])
|
|
changed = True
|
|
|
|
if type(self.category_id) is int:
|
|
self.category_id = str(self.category_id)
|
|
changed = True
|
|
|
|
if type(self.workflow_id) is int:
|
|
self.workflow_id = str(self.workflow_id)
|
|
changed = True
|
|
|
|
if self.roles:
|
|
for role in self.roles:
|
|
if type(role) is int:
|
|
self.roles = [str(x) for x in self.roles]
|
|
changed = True
|
|
break
|
|
|
|
if type(self.last_modification_user_id) is int:
|
|
self.last_modification_user_id = str(self.last_modification_user_id)
|
|
changed = True
|
|
|
|
if self.workflow_roles:
|
|
workflow_roles_list = self.workflow_roles.items()
|
|
for role_key, role_id in self.workflow_roles.items():
|
|
if type(role_id) is int:
|
|
self.workflow_roles = dict([(x, str(y)) for x, y in workflow_roles_list])
|
|
changed = True
|
|
break
|
|
|
|
if not self.internal_identifier:
|
|
self.internal_identifier = self.url_name
|
|
changed = True
|
|
|
|
for f in self.fields or []:
|
|
changed |= f.migrate()
|
|
|
|
if changed:
|
|
self.store()
|
|
|
|
@classmethod
|
|
def remove_object(cls, id):
|
|
super(FormDef, cls).remove_object(id)
|
|
if get_publisher().is_using_postgresql():
|
|
# recreate global views so they don't reference formdata from
|
|
# deleted formefs
|
|
from . import sql
|
|
conn, cur = sql.get_connection_and_cursor()
|
|
sql.do_global_views(conn, cur)
|
|
conn.commit()
|
|
cur.close()
|
|
|
|
def data_class(self, mode=None):
|
|
if not 'formdef' in sys.modules:
|
|
sys.modules['formdef'] = sys.modules[__name__]
|
|
if hasattr(sys.modules['formdef'], self.url_name.title()):
|
|
data_class = getattr(sys.modules['formdef'], self.url_name.title())
|
|
# only use existing data class if it has a reference to this actual
|
|
# formdef
|
|
if data_class._formdef is self:
|
|
return data_class
|
|
if (get_publisher().is_using_postgresql() and not mode == 'files') or mode == 'sql':
|
|
from . import sql
|
|
table_name = sql.get_formdef_table_name(self)
|
|
cls = types.ClassType(self.url_name.title(), (sql.SqlFormData,),
|
|
{'_formdef': self,
|
|
'_table_name': table_name})
|
|
actions = sql.do_formdef_tables(self)
|
|
else:
|
|
cls = types.ClassType(self.url_name.title(), (FormData,),
|
|
{'_names': 'form-%s' % self.internal_identifier,
|
|
'_formdef': self})
|
|
actions = []
|
|
setattr(sys.modules['formdef'], self.url_name.title(), cls)
|
|
setattr(sys.modules['wcs.formdef'], self.url_name.title(), cls)
|
|
|
|
if actions:
|
|
for action in actions:
|
|
getattr(cls, action)()
|
|
|
|
return cls
|
|
|
|
def get_new_field_id(self):
|
|
if self.max_field_id is None:
|
|
field_id = 1
|
|
else:
|
|
field_id = self.max_field_id + 1
|
|
self.max_field_id = field_id
|
|
return str(field_id)
|
|
|
|
def get_new_url_name(self):
|
|
new_url_name = simplify(self.name)
|
|
base_new_url_name = new_url_name
|
|
suffix_no = 0
|
|
while True:
|
|
try:
|
|
obj = self.get_on_index(new_url_name, 'url_name', ignore_migration=True)
|
|
except KeyError:
|
|
break
|
|
if obj.id == self.id:
|
|
break
|
|
suffix_no += 1
|
|
new_url_name = '%s-%s' % (base_new_url_name, suffix_no)
|
|
return new_url_name
|
|
|
|
def get_new_internal_identifier(self):
|
|
new_internal_identifier = simplify(self.name)
|
|
base_new_internal_identifier = new_internal_identifier
|
|
suffix_no = 0
|
|
while True:
|
|
try:
|
|
formdef = self.get_by_urlname(new_internal_identifier, ignore_migration=True)
|
|
except KeyError:
|
|
break
|
|
if formdef.id == self.id:
|
|
break
|
|
suffix_no += 1
|
|
new_internal_identifier = '%s-%s' % (base_new_internal_identifier, suffix_no)
|
|
return new_internal_identifier
|
|
|
|
@classmethod
|
|
def get_new_id(cls, create=False):
|
|
keys = cls.keys()
|
|
if not keys:
|
|
id = 1
|
|
else:
|
|
id = max([lax_int(x) for x in keys]) + 1
|
|
if id == 0:
|
|
id = len(keys)+1
|
|
if get_publisher().is_using_postgresql():
|
|
id = cls.get_sql_new_id(id_start=id)
|
|
if create:
|
|
objects_dir = cls.get_objects_dir()
|
|
object_filename = os.path.join(objects_dir, fix_key(id))
|
|
try:
|
|
fd = os.open(object_filename, os.O_CREAT | os.O_EXCL)
|
|
except OSError:
|
|
return cls.get_new_id(create=True)
|
|
os.close(fd)
|
|
return str(id)
|
|
|
|
@classmethod
|
|
def get_sql_new_id(cls, id_start):
|
|
from . import sql
|
|
return sql.get_formdef_new_id(id_start=id_start)
|
|
|
|
@classmethod
|
|
def wipe(cls):
|
|
super(FormDef, cls).wipe()
|
|
if get_publisher().is_using_postgresql():
|
|
from . import sql
|
|
sql.formdef_wipe()
|
|
|
|
def store(self):
|
|
if self.url_name is None:
|
|
# set url name if it's not yet there
|
|
self.url_name = self.get_new_url_name()
|
|
new_internal_identifier = self.get_new_internal_identifier()
|
|
if not self.internal_identifier:
|
|
self.internal_identifier = new_internal_identifier
|
|
if new_internal_identifier != self.internal_identifier:
|
|
# title changed, internal identifier will be changed only if
|
|
# the formdef is currently being imported (self.id is None)
|
|
# or if there are not yet any submitted forms
|
|
if self.id is None or self.data_class().count() == 0:
|
|
self.internal_identifier = new_internal_identifier
|
|
self.last_modification_time = time.localtime()
|
|
if get_request() and get_request().user:
|
|
self.last_modification_user_id = str(get_request().user.id)
|
|
else:
|
|
self.last_modification_user_id = None
|
|
t = StorableObject.store(self)
|
|
if get_publisher().is_using_postgresql():
|
|
from . import sql
|
|
sql.do_formdef_tables(self, rebuild_views=True,
|
|
rebuild_global_views=True)
|
|
return t
|
|
|
|
def get_all_fields(self):
|
|
return (self.fields or []) + self.workflow.get_backoffice_fields()
|
|
|
|
def rebuild(self):
|
|
if get_publisher().is_using_postgresql():
|
|
from . import sql
|
|
sql.do_formdef_tables(self, rebuild_views=True,
|
|
rebuild_global_views=True)
|
|
|
|
def get_category(self):
|
|
if self.category_id:
|
|
try:
|
|
return Category.get(self.category_id)
|
|
except KeyError:
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
def set_category(self, category):
|
|
if category:
|
|
self.category_id = category.id
|
|
elif self.category_id:
|
|
self.category_id = None
|
|
category = property(get_category, set_category)
|
|
|
|
|
|
_workflow = None
|
|
def get_workflow(self):
|
|
if self._workflow:
|
|
return self._workflow
|
|
from wcs.workflows import Workflow
|
|
if self.workflow_id:
|
|
try:
|
|
workflow = Workflow.get(self.workflow_id)
|
|
except KeyError:
|
|
return Workflow.get_unknown_workflow()
|
|
self._workflow = self.get_workflow_with_options(workflow)
|
|
return self._workflow
|
|
else:
|
|
return self.get_default_workflow()
|
|
|
|
@classmethod
|
|
def get_default_workflow(cls):
|
|
from wcs.workflows import Workflow
|
|
return Workflow.get_default_workflow()
|
|
|
|
def get_workflow_with_options(self, workflow):
|
|
# this needs to be kept in sync with admin/forms.ptl,
|
|
# FormDefPage::workflow
|
|
if not self.workflow_options:
|
|
return workflow
|
|
for status in workflow.possible_status:
|
|
for item in status.items:
|
|
prefix = '%s*%s*' % (status.id, item.id)
|
|
for parameter in item.get_parameters():
|
|
value = self.workflow_options.get(prefix + parameter)
|
|
if value:
|
|
setattr(item, parameter, value)
|
|
return workflow
|
|
|
|
def set_workflow(self, workflow):
|
|
if workflow:
|
|
self.workflow_id = workflow.id
|
|
self._workflow = workflow
|
|
elif self.workflow_id:
|
|
self.workflow_id = None
|
|
workflow = property(get_workflow, set_workflow)
|
|
|
|
@property
|
|
def keywords_list(self):
|
|
if not self.keywords:
|
|
return []
|
|
return [x.strip() for x in self.keywords.split(',')]
|
|
|
|
@property
|
|
def appearance_keywords_list(self):
|
|
if not get_publisher().has_site_option('formdef-appearance-keywords'):
|
|
return []
|
|
if not self.appearance_keywords:
|
|
return []
|
|
return [x.strip() for x in self.appearance_keywords.split()]
|
|
|
|
def get_variable_options(self):
|
|
variables = {}
|
|
if not self.workflow.variables_formdef:
|
|
return variables
|
|
if not self.workflow_options:
|
|
return variables
|
|
for field in self.workflow.variables_formdef.fields:
|
|
if not field.varname:
|
|
continue
|
|
option_name = 'form_option_' + field.varname
|
|
variables[option_name] = self.workflow_options.get(field.varname)
|
|
if field.store_display_value:
|
|
if '%s_display' % field.varname in self.workflow_options:
|
|
variables[option_name + '_raw'] = variables[option_name]
|
|
variables[option_name] = self.workflow_options.get(
|
|
'%s_display' % field.varname)
|
|
if field.store_structured_value:
|
|
if '%s_structured' % field.varname in self.workflow_options:
|
|
variables[option_name + '_structured'] = self.workflow_options.get(
|
|
'%s_structured' % field.varname)
|
|
return variables
|
|
|
|
def get_variable_options_for_form(self):
|
|
variables = {}
|
|
if not self.workflow.variables_formdef:
|
|
return variables
|
|
if not self.workflow_options:
|
|
return {}
|
|
for field in self.workflow.variables_formdef.fields:
|
|
if not field.varname:
|
|
continue
|
|
variables[str(field.id)] = self.workflow_options.get(field.varname)
|
|
return variables
|
|
|
|
def set_variable_options(self, form):
|
|
data = self.workflow.variables_formdef.get_data(form)
|
|
variables = {}
|
|
for field in self.workflow.variables_formdef.fields:
|
|
if not field.varname:
|
|
continue
|
|
variables[field.varname] = data.get(field.id)
|
|
if field.store_display_value:
|
|
variables[field.varname + '_display'] = data.get(field.id + '_display')
|
|
if field.store_structured_value:
|
|
variables[field.varname + '_structured'] = data.get(field.id + '_structured')
|
|
if not self.workflow_options:
|
|
self.workflow_options = {}
|
|
self.workflow_options.update(variables)
|
|
|
|
@classmethod
|
|
def get_by_urlname(cls, url_name, ignore_migration=False):
|
|
return cls.get_on_index(url_name, 'url_name', ignore_migration=ignore_migration)
|
|
|
|
def get_url(self, backoffice=False, preview=False):
|
|
if backoffice:
|
|
base_url = get_publisher().get_backoffice_url() + '/management'
|
|
elif preview:
|
|
base_url = get_publisher().get_frontoffice_url() + '/preview'
|
|
else:
|
|
base_url = get_publisher().get_frontoffice_url()
|
|
return '%s/%s/' % (base_url, self.url_name)
|
|
|
|
def get_api_url(self):
|
|
base_url = get_publisher().get_frontoffice_url()
|
|
return '%s/api/forms/%s/' % (base_url, self.url_name)
|
|
|
|
def get_admin_url(self):
|
|
base_url = get_publisher().get_backoffice_url()
|
|
return '%s/forms/%s/' % (base_url, self.id)
|
|
|
|
def get_backoffice_submission_url(self):
|
|
base_url = get_publisher().get_backoffice_url() + '/submission'
|
|
return '%s/%s/' % (base_url, self.url_name)
|
|
|
|
def get_display_id_format(self):
|
|
return '[formdef_id]-[form_number_raw]'
|
|
|
|
def create_form(self, page=None, displayed_fields=None, transient_formdata=None):
|
|
form = Form(enctype="multipart/form-data", use_tokens=False)
|
|
if self.appearance_keywords:
|
|
form.attrs['class'] = 'quixote %s' % self.appearance_keywords
|
|
if self.keywords:
|
|
form.attrs['data-keywords'] = ' '.join(self.keywords_list)
|
|
form.ERROR_NOTICE = _('There were errors processing the form and '
|
|
'you cannot go to the next page. Do '
|
|
'check below that you filled all fields correctly.')
|
|
self.add_fields_to_form(form,
|
|
page=page,
|
|
displayed_fields=displayed_fields,
|
|
transient_formdata=transient_formdata)
|
|
return form
|
|
|
|
def add_fields_to_form(self,
|
|
form,
|
|
page=None,
|
|
displayed_fields=None,
|
|
form_data=None, # a dictionary, to fill fields
|
|
transient_formdata=None): # a FormData
|
|
current_page = 0
|
|
on_page = (page is None)
|
|
for field in self.fields:
|
|
field.formdef = self
|
|
if field.type == 'page':
|
|
if on_page:
|
|
break
|
|
if page.id == field.id:
|
|
on_page = True
|
|
continue
|
|
if not on_page:
|
|
continue
|
|
visible = field.is_visible(form_data, self)
|
|
if not visible:
|
|
if not field.has_live_conditions(self):
|
|
# no live conditions so field can be skipped
|
|
continue
|
|
if type(displayed_fields) is list:
|
|
displayed_fields.append(field)
|
|
value = None
|
|
if form_data:
|
|
value = form_data.get(field.id)
|
|
widget = field.add_to_form(form, value)
|
|
widget.is_hidden = not(visible)
|
|
widget.field = field
|
|
if transient_formdata and not widget.is_hidden:
|
|
transient_formdata.data.update(self.get_field_data(field, widget))
|
|
# invalidate cache as comment fields (and other things?) may
|
|
# have accessed variables in non-lazy mode and caused a cache
|
|
# with now-obsolete values.
|
|
get_publisher().substitutions.invalidate_cache()
|
|
widget._parsed = False
|
|
widget.error = None
|
|
|
|
def get_page(self, page_no):
|
|
return [x for x in self.fields if x.type == 'page'][page_no]
|
|
|
|
def create_view_form(self, dict={}, use_tokens=True, visible=True):
|
|
form = Form(enctype='multipart/form-data', use_tokens=use_tokens)
|
|
if not visible:
|
|
form.attrs['style'] = 'display: none;'
|
|
if self.keywords:
|
|
form.attrs['data-keywords'] = ' '.join(self.keywords_list)
|
|
current_page_fields = []
|
|
on_disabled_page = False
|
|
on_page = False
|
|
for i, field in enumerate(self.fields):
|
|
if field.type == 'page':
|
|
on_disabled_page = False
|
|
if not field.is_visible(dict, self):
|
|
on_disabled_page = True
|
|
form_field = False
|
|
for f in self.fields[self.fields.index(field)+1:]:
|
|
if f.key == 'page':
|
|
break
|
|
if isinstance(f, fields.WidgetField):
|
|
form_field = True
|
|
break
|
|
if form_field is False:
|
|
on_disabled_page = True
|
|
|
|
if on_disabled_page:
|
|
continue
|
|
|
|
if field.type == 'page':
|
|
if on_page:
|
|
form.widgets.append(HtmlWidget(htmltext('</div></div>')))
|
|
form.widgets.append(HtmlWidget(
|
|
htmltext('<div class="page"><h3>%s</h3><div>' % field.label)))
|
|
on_page = field
|
|
current_page_fields = []
|
|
continue
|
|
|
|
if field.type == 'title' and on_page and (
|
|
not current_page_fields and
|
|
on_page.label == field.label):
|
|
# don't include first title of a page if that title has the
|
|
# same text as the page.
|
|
continue
|
|
|
|
if field.type == 'comment' and not field.include_in_validation_page:
|
|
# don't render field that wouldn't be displayed.
|
|
continue
|
|
|
|
if not field.is_visible(dict, self):
|
|
continue
|
|
|
|
current_page_fields.append(field)
|
|
value = dict.get(field.id)
|
|
|
|
if not field.include_in_validation_page:
|
|
form.widgets.append(HtmlWidget(htmltext('<div style="display: none;">')))
|
|
field.add_to_view_form(form, value)
|
|
form.widgets.append(HtmlWidget(htmltext('</div>')))
|
|
else:
|
|
field.add_to_view_form(form, value)
|
|
|
|
if on_page:
|
|
form.widgets.append(HtmlWidget(htmltext('</div></div>')))
|
|
|
|
return form
|
|
|
|
def set_live_condition_sources(self, form, fields):
|
|
live_condition_fields = {}
|
|
for field in fields:
|
|
if field.condition:
|
|
field.varnames = field.get_condition_varnames(formdef=self)
|
|
for varname in field.varnames:
|
|
if not varname in live_condition_fields:
|
|
live_condition_fields[varname] = []
|
|
live_condition_fields[varname].append(field)
|
|
if field.key == 'item' and field.data_source:
|
|
real_data_source = data_sources.get_real(field.data_source)
|
|
if real_data_source.get('type') != 'json':
|
|
continue
|
|
varnames = field.get_referenced_varnames(
|
|
formdef=self,
|
|
value=real_data_source.get('value'))
|
|
for varname in varnames:
|
|
if not varname in live_condition_fields:
|
|
live_condition_fields[varname] = []
|
|
live_condition_fields[varname].append(field)
|
|
if field.key == 'comment':
|
|
for varname in field.get_referenced_varnames(formdef=self, value=field.label):
|
|
if not varname in live_condition_fields:
|
|
live_condition_fields[varname] = []
|
|
live_condition_fields[varname].append(field)
|
|
|
|
for field in fields:
|
|
if field.varname in live_condition_fields:
|
|
widget = form.get_widget('f%s' % field.id)
|
|
if widget:
|
|
widget.live_condition_source = True
|
|
|
|
def get_field_data(self, field, widget):
|
|
d = {}
|
|
d[field.id] = widget.parse()
|
|
if d.get(field.id) is not None and field.convert_value_from_str:
|
|
d[field.id] = field.convert_value_from_str(d[field.id])
|
|
if d.get(field.id) is not None and field.store_display_value:
|
|
display_value = field.store_display_value(d, field.id)
|
|
if display_value is not None:
|
|
d['%s_display' % field.id] = display_value
|
|
elif '%s_display' % field.id in d:
|
|
del d['%s_display' % field.id]
|
|
if d.get(field.id) is not None and field.store_structured_value:
|
|
structured_value = field.store_structured_value(d, field.id)
|
|
if structured_value is not None:
|
|
d['%s_structured' % field.id] = structured_value
|
|
elif '%s_structured' % field.id in d:
|
|
del d['%s_structured' % field.id]
|
|
if getattr(widget, 'cleanup', None):
|
|
widget.cleanup()
|
|
return d
|
|
|
|
def get_data(self, form):
|
|
d = {}
|
|
for field in self.fields:
|
|
widget = form.get_widget('f%s' % field.id)
|
|
if widget:
|
|
d.update(self.get_field_data(field, widget))
|
|
return d
|
|
|
|
def export_to_json(self, include_id=False, indent=None, anonymise=True):
|
|
charset = get_publisher().site_charset
|
|
root = {}
|
|
root['name'] = force_text(self.name, charset)
|
|
if include_id and self.id:
|
|
root['id'] = str(self.id)
|
|
if self.category:
|
|
root['category'] = force_text(self.category.name, charset)
|
|
root['category_id'] = str(self.category.id)
|
|
if self.workflow:
|
|
root['workflow'] = self.workflow.get_json_export_dict(include_id=include_id)
|
|
|
|
if self.max_field_id is None and self.fields:
|
|
self.max_field_id = max([lax_int(x.id) for x in self.fields])
|
|
|
|
more_attributes = []
|
|
if self.max_field_id:
|
|
more_attributes.append('max_field_id')
|
|
if self.last_modification_time:
|
|
more_attributes.append('last_modification_time')
|
|
if include_id:
|
|
more_attributes.append('last_modification_user_id')
|
|
|
|
for attribute in self.TEXT_ATTRIBUTES + self.BOOLEAN_ATTRIBUTES + more_attributes:
|
|
if not hasattr(self, attribute):
|
|
continue
|
|
root[attribute] = getattr(self, attribute)
|
|
if type(root[attribute]) is time.struct_time:
|
|
root[attribute] = time.strftime('%Y-%m-%dT%H:%M:%S',
|
|
root[attribute])
|
|
|
|
root['fields'] = []
|
|
if self.fields:
|
|
for field in self.fields:
|
|
root['fields'].append(field.export_to_json(include_id=include_id, anonymise=anonymise))
|
|
|
|
if self.geolocations:
|
|
root['geolocations'] = self.geolocations.copy()
|
|
|
|
if self.workflow_options:
|
|
root['options'] = self.workflow_options.copy()
|
|
for k, v in list(root['options'].items()):
|
|
# convert time.struct_time to strings as python3 would
|
|
# serialize it as tuple.
|
|
if isinstance(v, time.struct_time):
|
|
root['options'][k] = time.strftime('%Y-%m-%dT%H:%M:%S', v)
|
|
|
|
if self.required_authentication_contexts:
|
|
root['required_authentication_contexts'] = self.required_authentication_contexts[:]
|
|
|
|
return json.dumps(root, indent=indent, cls=misc.JSONEncoder)
|
|
|
|
@classmethod
|
|
def import_from_json(cls, fd, charset=None, include_id=False):
|
|
if charset is None:
|
|
charset = get_publisher().site_charset
|
|
formdef = cls()
|
|
|
|
def unicode2str(v):
|
|
if isinstance(v, dict):
|
|
return dict([(unicode2str(k), unicode2str(v)) for k, v in v.items()])
|
|
elif isinstance(v, list):
|
|
return [unicode2str(x) for x in v]
|
|
elif isinstance(v, six.string_types):
|
|
return force_str(v)
|
|
else:
|
|
return v
|
|
|
|
# we have to make sure all strings are str object, not unicode.
|
|
value = unicode2str(json.load(fd))
|
|
|
|
if include_id and 'id' in value:
|
|
formdef.id = value.get('id')
|
|
|
|
if include_id and 'category_id' in value:
|
|
formdef.category_id = value.get('category_id')
|
|
elif 'category' in value:
|
|
category = value.get('category')
|
|
for c in Category.select():
|
|
if c.name == category:
|
|
formdef.category_id = c.id
|
|
break
|
|
|
|
if include_id and 'workflow_id' in value:
|
|
formdef.workflow_id = value.get('workflow_id')
|
|
elif (include_id
|
|
and 'workflow' in value
|
|
and isinstance(value['workflow'], dict)
|
|
and 'id' in value['workflow']):
|
|
formdef.workflow_id = value['workflow'].get('id')
|
|
elif 'workflow' in value:
|
|
if isinstance(value['workflow'], six.string_types):
|
|
workflow = value.get('workflow')
|
|
else:
|
|
workflow = value['workflow'].get('name')
|
|
from wcs.workflows import Workflow
|
|
for w in Workflow.select():
|
|
if w.name == workflow:
|
|
formdef.workflow_id = w.id
|
|
break
|
|
|
|
more_attributes = ['max_field_id', 'last_modification_time',
|
|
'last_modification_user_id']
|
|
for attribute in cls.TEXT_ATTRIBUTES + cls.BOOLEAN_ATTRIBUTES + more_attributes:
|
|
if attribute in value:
|
|
setattr(formdef, attribute, value.get(attribute))
|
|
|
|
# fixup last_modification_time to the proper type
|
|
if formdef.last_modification_time:
|
|
formdef.last_modification_time = time.strptime(
|
|
formdef.last_modification_time, '%Y-%m-%dT%H:%M:%S')
|
|
|
|
formdef.fields = []
|
|
for i, field in enumerate(value.get('fields', [])):
|
|
try:
|
|
field_o = fields.get_field_class_by_type(field.get('type'))()
|
|
except KeyError:
|
|
raise FormdefImportError(N_('Unknown field type'),
|
|
details=field.findtext('type'))
|
|
field_o.init_with_json(field, include_id=True)
|
|
if not field_o.id:
|
|
# this assumes all fields will have id, or none of them
|
|
field_o.id = str(i)
|
|
formdef.fields.append(field_o)
|
|
|
|
if formdef.fields and not formdef.max_field_id:
|
|
formdef.max_field_id = max([lax_int(x.id) for x in formdef.fields])
|
|
|
|
if value.get('options'):
|
|
formdef.workflow_options = value.get('options')
|
|
for option_key, option_value in formdef.workflow_options.items():
|
|
if isinstance(option_value, dict) and 'filename' in option_value:
|
|
filename = option_value['filename']
|
|
upload = Upload(filename, content_type=option_value['content_type'])
|
|
new_value = UploadedFile(get_publisher().app_dir, filename, upload)
|
|
new_value.set_content(base64.decodestring(force_bytes(option_value['content'])))
|
|
formdef.workflow_options[option_key] = new_value
|
|
|
|
if value.get('geolocations'):
|
|
formdef.geolocations = value.get('geolocations')
|
|
|
|
if value.get('required_authentication_contexts'):
|
|
formdef.required_authentication_contexts = [str(x) for x in
|
|
value.get('required_authentication_contexts')]
|
|
|
|
return formdef
|
|
|
|
def export_to_xml(self, include_id=False):
|
|
charset = get_publisher().site_charset
|
|
root = ET.Element(self.xml_root_node)
|
|
if include_id and self.id:
|
|
root.attrib['id'] = str(self.id)
|
|
for text_attribute in list(self.TEXT_ATTRIBUTES):
|
|
if not hasattr(self, text_attribute) or not getattr(self, text_attribute):
|
|
continue
|
|
ET.SubElement(root, text_attribute).text = force_text(
|
|
getattr(self, text_attribute), charset)
|
|
for boolean_attribute in self.BOOLEAN_ATTRIBUTES:
|
|
if not hasattr(self, boolean_attribute):
|
|
continue
|
|
value = getattr(self, boolean_attribute)
|
|
if value:
|
|
value = 'true'
|
|
else:
|
|
value = 'false'
|
|
ET.SubElement(root, boolean_attribute).text = value
|
|
|
|
if self.category:
|
|
elem = ET.SubElement(root, 'category')
|
|
elem.text = force_text(self.category.name, charset)
|
|
if include_id:
|
|
elem.attrib['category_id'] = str(self.category.id)
|
|
|
|
if self.workflow:
|
|
elem = ET.SubElement(root, 'workflow')
|
|
elem.text = force_text(self.workflow.name, charset)
|
|
if include_id:
|
|
elem.attrib['workflow_id'] = str(self.workflow.id)
|
|
|
|
if self.max_field_id is None and self.fields:
|
|
self.max_field_id = max([lax_int(x.id) for x in self.fields])
|
|
|
|
if self.max_field_id:
|
|
ET.SubElement(root, 'max_field_id').text = str(self.max_field_id)
|
|
|
|
if self.last_modification_time:
|
|
elem = ET.SubElement(root, 'last_modification')
|
|
elem.text = time.strftime('%Y-%m-%d %H:%M:%S', self.last_modification_time)
|
|
if include_id:
|
|
elem.attrib['user_id'] = str(self.last_modification_user_id)
|
|
|
|
fields = ET.SubElement(root, 'fields')
|
|
for field in self.fields or []:
|
|
fields.append(field.export_to_xml(charset=charset, include_id=include_id))
|
|
|
|
roles_elements = [
|
|
('roles', 'user-roles'),
|
|
('backoffice_submission_roles', 'backoffice-submission-roles')
|
|
]
|
|
for attr_name, node_name in roles_elements:
|
|
if not getattr(self, attr_name, None):
|
|
continue
|
|
roles = ET.SubElement(root, node_name)
|
|
for role_id in getattr(self, attr_name):
|
|
if role_id is None:
|
|
continue
|
|
role_id = str(role_id)
|
|
if role_id.startswith('_') or role_id == 'logged-users':
|
|
role = force_text(role_id, charset)
|
|
else:
|
|
try:
|
|
role = force_text(Role.get(role_id).name, charset)
|
|
except KeyError:
|
|
role = force_text(role_id, charset)
|
|
sub = ET.SubElement(roles, 'role')
|
|
if include_id:
|
|
sub.attrib['role_id'] = role_id
|
|
sub.text = role
|
|
|
|
if self.workflow_roles:
|
|
roles = ET.SubElement(root, 'roles')
|
|
for role_key, role_id in self.workflow_roles.items():
|
|
if role_id is None:
|
|
continue
|
|
role_id = str(role_id)
|
|
if role_id.startswith('_') or role_id == 'logged-users':
|
|
role = force_text(role_id, charset)
|
|
else:
|
|
try:
|
|
role = force_text(Role.get(role_id).name, charset)
|
|
except KeyError:
|
|
role = force_text(role_id, charset)
|
|
sub = ET.SubElement(roles, 'role')
|
|
sub.attrib['role_key'] = role_key
|
|
if include_id:
|
|
sub.attrib['role_id'] = role_id
|
|
sub.text = role
|
|
|
|
options = ET.SubElement(root, 'options')
|
|
for option in sorted(self.workflow_options or []):
|
|
element = ET.SubElement(options, 'option')
|
|
element.attrib['varname'] = option
|
|
option_value = self.workflow_options.get(option)
|
|
if isinstance(option_value, six.string_types):
|
|
element.text = force_text(self.workflow_options.get(option, ''), charset)
|
|
elif hasattr(option_value, 'base_filename'):
|
|
ET.SubElement(element, 'filename').text = option_value.base_filename
|
|
ET.SubElement(element, 'content_type').text = (
|
|
option_value.content_type or 'application/octet-stream')
|
|
ET.SubElement(element, 'content').text = force_text(base64.b64encode(option_value.get_content()))
|
|
elif isinstance(option_value, time.struct_time):
|
|
element.text = time.strftime('%Y-%m-%d', option_value)
|
|
element.attrib['type'] = 'date'
|
|
else:
|
|
pass # TODO: extend support to other types
|
|
|
|
geolocations = ET.SubElement(root, 'geolocations')
|
|
for geoloc_key, geoloc_label in (self.geolocations or {}).items():
|
|
element = ET.SubElement(geolocations, 'geolocation')
|
|
element.attrib['key'] = geoloc_key
|
|
element.text = force_text(geoloc_label, charset)
|
|
|
|
if self.required_authentication_contexts:
|
|
element = ET.SubElement(root, 'required_authentication_contexts')
|
|
for auth_context in self.required_authentication_contexts:
|
|
ET.SubElement(element, 'method').text = force_text(auth_context)
|
|
|
|
return root
|
|
|
|
@classmethod
|
|
def import_from_xml(cls, fd, charset=None, include_id=False, fix_on_error=False):
|
|
try:
|
|
tree = ET.parse(fd)
|
|
except:
|
|
raise ValueError()
|
|
formdef = cls.import_from_xml_tree(tree, charset=charset,
|
|
include_id=include_id, fix_on_error=fix_on_error)
|
|
|
|
if formdef.url_name:
|
|
try:
|
|
obj = cls.get_on_index(formdef.url_name, 'url_name', ignore_migration=True)
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
formdef.url_name = formdef.get_new_url_name()
|
|
|
|
# fix max_field_id if necessary
|
|
if formdef.max_field_id is not None:
|
|
max_field_id = max([lax_int(x.id) for x in formdef.fields])
|
|
if formdef.max_field_id < max_field_id:
|
|
formdef.max_field_id = max_field_id
|
|
|
|
# check if datasources are defined
|
|
unknown_datasources = set()
|
|
for field in formdef.fields:
|
|
data_source = getattr(field, 'data_source', None)
|
|
if data_source:
|
|
if isinstance(data_sources.get_object(data_source),
|
|
data_sources.StubNamedDataSource):
|
|
unknown_datasources.add(data_source.get('type'))
|
|
if unknown_datasources:
|
|
raise FormdefImportError(N_('Unknown datasources'),
|
|
details=', '.join(sorted(unknown_datasources)))
|
|
|
|
# check if all field id are unique
|
|
known_field_ids = set()
|
|
for field in formdef.fields:
|
|
if field.id in known_field_ids:
|
|
raise FormdefImportRecoverableError(N_('Duplicated field identifiers'))
|
|
known_field_ids.add(field.id)
|
|
|
|
return formdef
|
|
|
|
@classmethod
|
|
def import_from_xml_tree(cls, tree, include_id=False, charset=None, fix_on_error=False):
|
|
if charset is None:
|
|
charset = get_publisher().site_charset
|
|
assert charset == 'utf-8'
|
|
formdef = cls()
|
|
if tree.find('name') is None or not tree.find('name').text:
|
|
raise FormdefImportError(N_('Missing name'))
|
|
|
|
# if the tree we get is actually a ElementTree for real, we get its
|
|
# root element and go on happily.
|
|
if not ET.iselement(tree):
|
|
tree = tree.getroot()
|
|
|
|
if tree.tag != cls.xml_root_node:
|
|
raise FormdefImportError(N_('Unexpected root node'))
|
|
|
|
if include_id and tree.attrib.get('id'):
|
|
formdef.id = tree.attrib.get('id')
|
|
for text_attribute in list(cls.TEXT_ATTRIBUTES):
|
|
value = tree.find(text_attribute)
|
|
if value is None:
|
|
continue
|
|
setattr(formdef, text_attribute, force_str(value.text))
|
|
|
|
for boolean_attribute in cls.BOOLEAN_ATTRIBUTES:
|
|
value = tree.find(boolean_attribute)
|
|
if value is None:
|
|
continue
|
|
setattr(formdef, boolean_attribute, value.text == 'true')
|
|
|
|
formdef.fields = []
|
|
for i, field in enumerate(tree.find('fields')):
|
|
try:
|
|
field_o = fields.get_field_class_by_type(field.findtext('type'))()
|
|
except KeyError:
|
|
raise FormdefImportError(N_('Unknown field type'),
|
|
details=field.findtext('type'))
|
|
field_o.init_with_xml(field, charset, include_id=True)
|
|
if fix_on_error or not field_o.id:
|
|
# this assumes all fields will have id, or none of them
|
|
field_o.id = str(i+1)
|
|
formdef.fields.append(field_o)
|
|
|
|
if formdef.fields:
|
|
value = tree.find('max_field_id')
|
|
if value is not None:
|
|
formdef.max_field_id = int(value.text)
|
|
else:
|
|
formdef.max_field_id = max([lax_int(x.id) for x in formdef.fields])
|
|
|
|
formdef.workflow_options = {}
|
|
for option in tree.findall('options/option'):
|
|
option_value = None
|
|
if option.attrib.get('type') == 'date':
|
|
option_value = time.strptime(option.text, '%Y-%m-%d')
|
|
elif option.text:
|
|
option_value = force_str(option.text)
|
|
elif option.findall('filename'):
|
|
filename = option.find('filename').text
|
|
upload = Upload(filename, content_type=option.find('content_type').text)
|
|
option_value = UploadedFile(get_publisher().app_dir, filename, upload)
|
|
option_value.set_content(base64.decodestring(force_bytes(option.find('content').text)))
|
|
formdef.workflow_options[option.attrib.get('varname')] = option_value
|
|
|
|
if tree.find('last_modification') is not None:
|
|
node = tree.find('last_modification')
|
|
formdef.last_modification_time = time.strptime(node.text, '%Y-%m-%d %H:%M:%S')
|
|
if include_id and node.attrib.get('user_id'):
|
|
formdef.last_modification_user_id = node.attrib.get('user_id')
|
|
|
|
if tree.find('category') is not None:
|
|
category_node = tree.find('category')
|
|
if include_id and category_node.attrib.get('category_id'):
|
|
category_id = str(category_node.attrib.get('category_id'))
|
|
if Category.has_key(category_id):
|
|
formdef.category_id = category_id
|
|
else:
|
|
category = force_str(category_node.text)
|
|
for c in Category.select():
|
|
if c.name == category:
|
|
formdef.category_id = c.id
|
|
break
|
|
|
|
if tree.find('workflow') is not None:
|
|
from wcs.workflows import Workflow
|
|
workflow_node = tree.find('workflow')
|
|
if include_id and workflow_node.attrib.get('workflow_id'):
|
|
workflow_id = workflow_node.attrib.get('workflow_id')
|
|
if Workflow.has_key(workflow_id):
|
|
formdef.workflow_id = workflow_id
|
|
else:
|
|
workflow = force_str(workflow_node.text)
|
|
for w in Workflow.select():
|
|
if w.name == workflow:
|
|
formdef.workflow_id = w.id
|
|
break
|
|
|
|
def get_role_by_node(role_node):
|
|
role_id = None
|
|
value = force_str(role_node.text)
|
|
if value.startswith('_') or value == 'logged-users':
|
|
role_id = value
|
|
elif include_id:
|
|
role_id = role_node.attrib.get('role_id')
|
|
if role_id and not Role.has_key(role_id):
|
|
role_id = None
|
|
|
|
if not role_id:
|
|
for role in Role.select(ignore_errors=True):
|
|
if role.name == value:
|
|
role_id = role.id
|
|
break
|
|
|
|
return role_id
|
|
|
|
|
|
roles_elements = [
|
|
('roles', 'user-roles'),
|
|
('backoffice_submission_roles', 'backoffice-submission-roles')
|
|
]
|
|
for attr_name, node_name in roles_elements:
|
|
if tree.find(node_name) is None:
|
|
continue
|
|
roles_node = tree.find(node_name)
|
|
roles = []
|
|
setattr(formdef, attr_name, roles)
|
|
for child in roles_node:
|
|
role_id = get_role_by_node(child)
|
|
if role_id:
|
|
roles.append(role_id)
|
|
|
|
if tree.find('roles') is not None:
|
|
roles_node = tree.find('roles')
|
|
formdef.workflow_roles = {}
|
|
for child in roles_node:
|
|
role_key = child.attrib['role_key']
|
|
role_id = get_role_by_node(child)
|
|
formdef.workflow_roles[role_key] = role_id
|
|
|
|
if tree.find('geolocations') is not None:
|
|
geolocations_node = tree.find('geolocations')
|
|
formdef.geolocations = {}
|
|
for child in geolocations_node:
|
|
geoloc_key = child.attrib['key']
|
|
geoloc_value = force_str(child.text)
|
|
formdef.geolocations[geoloc_key] = geoloc_value
|
|
|
|
if tree.find('required_authentication_contexts') is not None:
|
|
node = tree.find('required_authentication_contexts')
|
|
formdef.required_authentication_contexts = []
|
|
for child in node:
|
|
formdef.required_authentication_contexts.append(str(child.text))
|
|
|
|
return formdef
|
|
|
|
def get_detailed_email_form(self, formdata, url):
|
|
details = []
|
|
|
|
if formdata.user_id and formdata.user:
|
|
details.append(_('User name:'))
|
|
details.append(' %s' % formdata.user.name)
|
|
details.append('')
|
|
|
|
data = formdata.data
|
|
for field in self.fields:
|
|
if isinstance(field, (fields.SubtitleField, fields.TitleField, fields.CommentField,
|
|
fields.PageField)):
|
|
continue
|
|
if data is None:
|
|
continue
|
|
if data.get(field.id) is None:
|
|
continue
|
|
if data.get(field.id + '_display'):
|
|
value = data.get(field.id + '_display')
|
|
else:
|
|
value = data.get(field.id)
|
|
details.append(_('%s:') % field.label)
|
|
if field.type in ('text', 'file'):
|
|
# XXX: howto support preformatted text in a dl in docutils ?
|
|
details.append((' %s' % value).replace('\n', '\n '))
|
|
else:
|
|
details.append('%s' % field.get_rst_view_value(value, indent=' '))
|
|
details.append('')
|
|
return '\n'.join(details)
|
|
|
|
def get_submitter_email(self, formdata):
|
|
users_cfg = get_cfg('users', {})
|
|
field_email = users_cfg.get('field_email') or 'email'
|
|
|
|
# look up in submitted form for one that would hold the user
|
|
# email (the one set to be prefilled by user email)
|
|
if formdata.data:
|
|
fields = formdata.formdef.fields
|
|
for field in fields:
|
|
if not hasattr(field, 'prefill'):
|
|
continue
|
|
if field.prefill and field.prefill.get('type') == 'user':
|
|
if field.prefill.get('value') == field_email:
|
|
v = formdata.data.get(field.id)
|
|
if v:
|
|
return v
|
|
|
|
# if nothing was found, get email from user profile
|
|
if formdata.user and formdata.user.email:
|
|
return formdata.user.email
|
|
|
|
return None
|
|
|
|
def get_static_substitution_variables(self, minimal=False):
|
|
d = {
|
|
'form_name': self.name,
|
|
'form_slug': self.url_name,
|
|
}
|
|
if not minimal:
|
|
from wcs.variables import LazyFormDef
|
|
d['form_objects'] = LazyFormDef(self).objects
|
|
if self.category:
|
|
d.update(self.category.get_substitution_variables(minimal=minimal))
|
|
d.update(self.get_variable_options())
|
|
return d
|
|
|
|
def get_substitution_variables(self, minimal=False):
|
|
from .qommon.substitution import CompatibilityNamesDict
|
|
from wcs.variables import LazyFormDef
|
|
return CompatibilityNamesDict({'form': LazyFormDef(self)})
|
|
|
|
def get_detailed_evolution(self, formdata):
|
|
if not formdata.evolution:
|
|
return None
|
|
|
|
details = []
|
|
evo = formdata.evolution[-1]
|
|
if evo.who:
|
|
evo_who = None
|
|
if evo.who == '_submitter':
|
|
if formdata.user_id:
|
|
evo_who = formdata.user_id
|
|
else:
|
|
evo_who = evo.who
|
|
if evo_who:
|
|
user_who = get_publisher().user_class.get(evo_who, ignore_errors=True)
|
|
if user_who:
|
|
details.append(_('User name'))
|
|
details.append(' %s' % user_who.name)
|
|
if evo.status:
|
|
details.append(_('Status'))
|
|
details.append(' %s' % formdata.get_status_label())
|
|
if evo.comment:
|
|
details.append('\n%s\n' % evo.comment)
|
|
return '\n\n----\n\n' + '\n'.join(details)
|
|
|
|
def is_of_concern_for_role_id(self, role_id):
|
|
if not self.workflow_roles:
|
|
return False
|
|
return (role_id in self.workflow_roles.values())
|
|
|
|
def is_of_concern_for_user(self, user, formdata=None):
|
|
if not self.workflow_roles:
|
|
self.workflow_roles = {}
|
|
|
|
# if the formdef itself has some function attributed to the user, grant
|
|
# access.
|
|
for role_id in self.workflow_roles.values():
|
|
if role_id in user.get_roles():
|
|
return True
|
|
|
|
# if there was some redispatching of function, values will be different
|
|
# in formdata, check them.
|
|
if formdata and formdata.workflow_roles:
|
|
for role_id in formdata.workflow_roles.values():
|
|
if role_id in user.get_roles():
|
|
return True
|
|
|
|
# if no formdata was given, lookup if there are some existing formdata
|
|
# where the user has access.
|
|
if not formdata:
|
|
data_class = self.data_class()
|
|
for role_id in user.get_roles():
|
|
if data_class.get_ids_with_indexed_value('workflow_roles', role_id):
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_user_allowed_read(self, user, formdata=None):
|
|
if not user:
|
|
if formdata and get_session() and \
|
|
get_session().is_anonymous_submitter(formdata):
|
|
return True
|
|
return False
|
|
if user.is_admin:
|
|
return True
|
|
|
|
user_roles = set(user.get_roles())
|
|
user_roles.add(logged_users_role().id)
|
|
|
|
def ensure_role_are_strings(roles):
|
|
# makes sure all roles are defined as strings, as different origins
|
|
# (formdef, user, workflow status...) may define them differently.
|
|
return set([str(x) for x in roles if x])
|
|
|
|
user_roles = ensure_role_are_strings(user_roles)
|
|
|
|
if formdata and formdata.is_submitter(user):
|
|
return True
|
|
if self.is_of_concern_for_user(user):
|
|
if not formdata:
|
|
return True
|
|
|
|
if formdata:
|
|
# current status
|
|
concerned_roles = ensure_role_are_strings(formdata.get_concerned_roles())
|
|
if '_submitter' in concerned_roles and formdata.is_submitter(user):
|
|
return True
|
|
if user_roles.intersection(concerned_roles):
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_user_allowed_read_status_and_history(self, user, formdata=None):
|
|
if user and user.is_admin:
|
|
return True
|
|
|
|
if user:
|
|
user_roles = set(user.get_roles())
|
|
else:
|
|
user_roles = set([])
|
|
|
|
if not self.workflow_roles:
|
|
self.workflow_roles = {}
|
|
form_roles = [x for x in self.workflow_roles.values() if x]
|
|
if formdata and formdata.workflow_roles:
|
|
form_roles.extend([x for x in formdata.workflow_roles.values() if x])
|
|
return self.is_user_allowed_read(user, formdata=formdata)
|
|
|
|
def is_disabled(self):
|
|
if self.disabled:
|
|
return True
|
|
if self.publication_date:
|
|
try:
|
|
publication_datetime = get_as_datetime(self.publication_date)
|
|
except ValueError:
|
|
return False
|
|
if publication_datetime > datetime.datetime.now():
|
|
return True
|
|
if self.expiration_date:
|
|
try:
|
|
expiration_datetime = get_as_datetime(self.expiration_date)
|
|
except ValueError:
|
|
return False
|
|
if expiration_datetime < datetime.datetime.now():
|
|
return True
|
|
return False
|
|
|
|
@classmethod
|
|
def update_filetype(cls, filetype_id, previous_filetype, new_filetype):
|
|
# look for file fields in all formdefs, to update them with the
|
|
# new mimetypes.
|
|
if previous_filetype == new_filetype:
|
|
return
|
|
for formdef in cls.select():
|
|
changed = False
|
|
for field in formdef.fields:
|
|
if not hasattr(field, 'document_type'):
|
|
continue
|
|
if not field.document_type:
|
|
continue
|
|
if field.document_type['id'] == filetype_id:
|
|
previous_filetype = field.document_type.copy()
|
|
del previous_filetype['id']
|
|
if previous_filetype == new_filetype:
|
|
continue
|
|
field.document_type = new_filetype.copy()
|
|
field.document_type['id'] = filetype_id
|
|
changed = True
|
|
if changed:
|
|
formdef.store()
|
|
|
|
class _EmptyClass(object): # helper for instance creation without calling __init__
|
|
pass
|
|
|
|
def __copy__(self, memo=None, deepcopy=False):
|
|
formdef_copy = self._EmptyClass()
|
|
formdef_copy.__class__ = self.__class__
|
|
if deepcopy:
|
|
formdef_copy.__dict__ = copy.deepcopy(self.__dict__, memo=memo)
|
|
else:
|
|
formdef_copy.__dict__ = copy.copy(self.__dict__)
|
|
return formdef_copy
|
|
|
|
def __deepcopy__(self, memo=None):
|
|
return self.__copy__(memo=memo, deepcopy=True)
|
|
|
|
# don't pickle computed attributes
|
|
def __getstate__(self):
|
|
odict = copy.copy(self.__dict__)
|
|
if '_workflow' in odict:
|
|
del odict['_workflow']
|
|
if '_start_page' in odict:
|
|
del odict['_start_page']
|
|
if self.lightweight and 'fields' in odict:
|
|
# will be stored independently
|
|
del odict['fields']
|
|
return odict
|
|
|
|
def __setstate__(self, dict):
|
|
self.__dict__ = dict
|
|
self._workflow = None
|
|
self._start_page = None
|
|
|
|
@classmethod
|
|
def storage_load(cls, fd, **kwargs):
|
|
o = super(FormDef, cls).storage_load(fd)
|
|
if kwargs.get('lightweight'):
|
|
o.fields = Ellipsis
|
|
return o
|
|
if cls.lightweight:
|
|
try:
|
|
o.fields = pickle.load(fd, **PICKLE_KWARGS)
|
|
except EOFError:
|
|
pass # old format
|
|
return o
|
|
|
|
@classmethod
|
|
def storage_dumps(cls, object):
|
|
if getattr(object, 'fields', None) is Ellipsis:
|
|
raise RuntimeError('storing a lightweight object is not allowed')
|
|
# use two separate pickle chunks to store the formdef, the first field
|
|
# is everything but fields (excluded via __getstate__) while the second
|
|
# chunk contains the fields.
|
|
return pickle.dumps(object, protocol=2) + pickle.dumps(object.fields, protocol=2)
|
|
|
|
|
|
from .qommon.admin.emails import EmailsDirectory
|
|
|
|
EmailsDirectory.register('new_user', N_('Notification of creation to user'),
|
|
enabled=False,
|
|
category=N_('Workflow'),
|
|
default_subject=N_('New form ({{ form_name }})'),
|
|
default_body=N_('''\
|
|
Hello,
|
|
|
|
This mail is a reminder about the form you just submitted.
|
|
{% if form_user %}
|
|
You can consult it with this link: {{ form_url }}
|
|
{% endif %}
|
|
|
|
{% if form_details %}
|
|
For reference, here are the details:
|
|
|
|
{{ form_details }}
|
|
{% endif %}
|
|
'''))
|
|
|
|
EmailsDirectory.register('change_user', N_('Notification of change to user'),
|
|
category=N_('Workflow'),
|
|
default_subject=N_('Form status change ({{ form_name }})'),
|
|
default_body=N_('''\
|
|
Hello,
|
|
|
|
{% if form_status_changed %}
|
|
Status of the form you submitted just changed (from "{{ form_previous_status }}" to "{{ form_status }}").
|
|
{% endif %}
|
|
|
|
{% if form_user %}
|
|
You can consult it with this link: {{ form_url }}
|
|
{% endif %}
|
|
|
|
{% if form_comment %}New comment: {{ form_comment }}{% endif %}
|
|
|
|
{% if form_evolution %}
|
|
{{ form_evolution }}
|
|
{% endif %}
|
|
'''))
|
|
|
|
|
|
EmailsDirectory.register('new_receiver', N_('Notification of creation to receiver'),
|
|
enabled=False,
|
|
category=N_('Workflow'),
|
|
default_subject=N_('New form ({{ form_name }})'),
|
|
default_body=N_('''\
|
|
Hello,
|
|
|
|
A new form has been submitted, you can see it with this link:
|
|
{{ form_url_backoffice }}
|
|
|
|
{% if form_details %}
|
|
For reference, here are the details:
|
|
|
|
{{ form_details }}
|
|
{% endif %}
|
|
'''))
|
|
|
|
|
|
EmailsDirectory.register('change_receiver', N_('Notification of change to receiver'),
|
|
category=N_('Workflow'),
|
|
default_subject=N_('Form status change ({{ form_name }})'),
|
|
default_body=N_('''\
|
|
Hello,
|
|
|
|
A form just changed, you can consult it with this link:
|
|
{{ form_url_backoffice }}
|
|
|
|
{% if form_status_changed %}
|
|
Status of the form just changed (from "{{ form_previous_status }}" to "{{ form_status }}").
|
|
{% endif %}
|
|
|
|
{% if form_comment %}New comment: {{ form_comment }}{% endif %}
|
|
|
|
{% if form_evolution %}
|
|
{{ form_evolution }}
|
|
{% endif %}
|
|
'''))
|
|
|
|
Substitutions.register('form_name', category=N_('Form'), comment=N_('Form Name'))
|
|
|
|
|
|
def clean_drafts(publisher):
|
|
import wcs.qommon.storage as st
|
|
from wcs.carddef import CardDef
|
|
removal_date = datetime.date.today() - datetime.timedelta(days=100)
|
|
for formdef in FormDef.select() + CardDef.select():
|
|
for formdata in formdef.data_class().select(
|
|
[st.Equal('status', 'draft'),
|
|
st.Less('receipt_time', removal_date.timetuple())]):
|
|
formdata.remove_self()
|
|
|
|
|
|
def clean_unused_files(publisher):
|
|
from wcs.wf.attachment import AttachmentEvolutionPart
|
|
|
|
unused_files_behaviour = publisher.get_site_option('unused-files-behaviour')
|
|
if unused_files_behaviour not in ('move', 'remove'):
|
|
return
|
|
|
|
known_filenames = set()
|
|
known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'uploads/*'))])
|
|
known_filenames.update([x for x in glob.glob(os.path.join(publisher.app_dir, 'attachments/*/*'))])
|
|
|
|
def is_upload(obj):
|
|
# we can't use isinstance() because obj can be a
|
|
# wcs.qommon.form.PicklableUpload or a qommon.form.PicklableUpload
|
|
return obj.__class__.__name__ == 'PicklableUpload'
|
|
|
|
def is_attachment(obj):
|
|
return obj.__class__.__name__ == 'AttachmentEvolutionPart'
|
|
|
|
def accumulate_filenames():
|
|
from wcs.carddef import CardDef
|
|
for formdef in FormDef.select(ignore_migration=True) + CardDef.select(ignore_migration=True):
|
|
for option_data in (formdef.workflow_options or {}).values():
|
|
if is_upload(option_data):
|
|
yield option_data.get_filename()
|
|
for formdata in formdef.data_class().select(ignore_errors=True):
|
|
for field_data in itertools.chain(
|
|
(formdata.data or {}).values(),
|
|
(formdata.workflow_data or {}).values()):
|
|
if is_upload(field_data):
|
|
yield field_data.get_filename()
|
|
for evolution in (formdata.evolution or []):
|
|
for part in (evolution.parts or []):
|
|
if is_attachment(part):
|
|
yield part.filename
|
|
for user in publisher.user_class.select():
|
|
for field_data in (user.form_data or {}).values():
|
|
if is_upload(field_data):
|
|
yield field_data.get_filename()
|
|
|
|
used_filenames = set()
|
|
for filename in accumulate_filenames():
|
|
if not os.path.isabs(filename):
|
|
filename = os.path.join(publisher.app_dir, filename)
|
|
used_filenames.add(filename)
|
|
|
|
unused_filenames = known_filenames - used_filenames
|
|
for filename in unused_filenames:
|
|
try:
|
|
if unused_files_behaviour == 'move':
|
|
new_filename = os.path.join(publisher.app_dir, 'unused-files', filename[len(publisher.app_dir)+1:])
|
|
if os.path.exists(new_filename):
|
|
os.unlink(filename)
|
|
else:
|
|
new_dirname = os.path.dirname(new_filename)
|
|
if not os.path.exists(new_dirname):
|
|
os.makedirs(new_dirname)
|
|
os.rename(filename, new_filename)
|
|
else:
|
|
os.unlink(filename)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def get_formdefs_of_all_kinds():
|
|
from wcs.carddef import CardDef
|
|
from wcs.wf.form import FormWorkflowStatusItem
|
|
from wcs.admin.settings import UserFieldsFormDef
|
|
from wcs.workflows import Workflow
|
|
|
|
kwargs = {
|
|
'ignore_errors': True,
|
|
'ignore_migration': True,
|
|
}
|
|
formdefs = [UserFieldsFormDef()]
|
|
formdefs += FormDef.select(**kwargs)
|
|
formdefs += CardDef.select(**kwargs)
|
|
for workflow in Workflow.select(**kwargs):
|
|
for status in workflow.possible_status:
|
|
for item in status.items:
|
|
if isinstance(item, FormWorkflowStatusItem) and item.formdef:
|
|
formdefs.append(item.formdef)
|
|
if workflow.variables_formdef:
|
|
formdefs.append(workflow.variables_formdef)
|
|
if workflow.backoffice_fields_formdef:
|
|
formdefs.append(workflow.backoffice_fields_formdef)
|
|
return formdefs
|
|
|
|
|
|
if get_publisher_class():
|
|
# once a month, look for drafts to remove
|
|
get_publisher_class().register_cronjob(CronJob(clean_drafts,
|
|
name='clean_drafts',
|
|
days=[2], hours=[0], minutes=[0]))
|
|
# once a day, look for unused files
|
|
get_publisher_class().register_cronjob(CronJob(clean_unused_files,
|
|
name='clean_unused_files',
|
|
hours=[2], minutes=[0]))
|