wcs/wcs/formdef.py

1090 lines
39 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import new
import sys
import json
import xml.etree.ElementTree as ET
import datetime
from qommon.storage import StorableObject
from quixote import get_request, get_publisher
from qommon.form import *
from qommon.misc import simplify, get_as_datetime
from qommon import get_cfg
from qommon.substitution import Substitutions
from formdata import FormData
from roles import logged_users_role
from categories import Category
import fields
class FormdefImportError(Exception):
pass
class FormField(object):
### only used to unpickle form fields from older (<200603) versions
def __setstate__(self, dict):
type = dict['type']
self.real_field = fields.get_field_class_by_type(type)(**dict)
def lax_int(s):
try:
return int(s)
except (ValueError, TypeError):
return -1
class FormDef(StorableObject):
_names = 'formdefs'
_indexes = ['url_name']
_hashed_indexes = ['backoffice_submission_roles']
name = None
description = None
keywords = None
url_name = None
table_name = None # for SQL only
fields = None
category_id = None
workflow_id = None
workflow_options = None
workflow_roles = None
roles = None
backoffice_submission_roles = None
discussion = False
confirmation = True
detailed_emails = True
disabled = False
only_allow_one = False
enable_tracking_codes = False
disabled_redirection = None
always_advertise = False
publication_date = None
expiration_date = None
has_captcha = False
private_status_and_history = False
last_modification_time = None
last_modification_user_id = None
max_field_id = None
# declarations for serialization
TEXT_ATTRIBUTES = ['name', 'url_name', 'description', 'keywords',
'publication_date', 'expiration_date',
'disabled_redirection',]
BOOLEAN_ATTRIBUTES = ['discussion', 'detailed_emails', 'disabled',
'only_allow_one', 'enable_tracking_codes',
'always_advertise', 'private_status_and_history',
'has_captcha']
def migrate(self):
changed = False
if self.__dict__.has_key('receiver'):
self.receiver_id = self.__dict__['receiver']
del self.__dict__['receiver']
changed = True
if self.__dict__.has_key('category'):
self.category_id = self.__dict__['category']
del self.__dict__['category']
changed = True
if not self.url_name:
try:
int(self.id)
except ValueError:
self.url_name = self.id
else:
self.url_name = self.get_new_url_name()
changed = True
if self.fields and type(self.fields[0]) is dict:
for f in self.fields:
if f.has_key('name'):
f['label'] = f['name']
del f['name']
self.fields = [FormField(**x) for x in self.fields]
for i, f in enumerate(self.fields):
f.id = str(i)
for formdata in self.data_class().select():
for f in self.fields:
if not formdata.data.has_key(f.label):
continue
formdata.data[f.id] = formdata.data[f.label]
del formdata.data[f.label]
formdata.store()
changed = True
if self.fields and isinstance(self.fields[0], FormField):
# migration from generic FormField to specific Field classes
# (200603)
self.fields = [x.real_field for x in self.fields]
if self.__dict__.has_key('public'):
del self.__dict__['public']
changed = True
if self.__dict__.has_key('receiver_id'):
# migration from a simple receiver role to workflow roles
if not self.workflow_roles:
self.workflow_roles = {}
self.workflow_roles['_receiver'] = self.__dict__['receiver_id']
del self.__dict__['receiver_id']
changed = True
if not self.table_name and get_publisher().has_site_option('postgresql'):
import sql
self.table_name = sql.get_formdef_table_name(self)
changed = True
if self.max_field_id is None and self.fields:
self.max_field_id = max([lax_int(x.id) for x in self.fields])
changed = True
if type(self.category_id) is int:
self.category_id = str(self.category_id)
changed = True
if type(self.workflow_id) is int:
self.workflow_id = str(self.workflow_id)
changed = True
if self.roles:
for role in self.roles:
if type(role) is int:
self.roles = [str(x) for x in self.roles]
changed = True
break
if type(self.last_modification_user_id) is int:
self.last_modification_user_id = str(self.last_modification_user_id)
changed = True
if self.workflow_roles:
workflow_roles_list = self.workflow_roles.items()
for role_key, role_id in self.workflow_roles.items():
if type(role_id) is int:
self.workflow_roles = dict([(x, str(y)) for x, y in workflow_roles_list])
changed = True
break
if changed:
self.store()
def data_class(self, mode=None):
if hasattr(sys.modules['formdef'], self.url_name.title()):
data_class = getattr(sys.modules['formdef'], self.url_name.title())
# only use existing data class if it has a reference to this actual
# formdef
if data_class._formdef is self:
return data_class
if (get_publisher().is_using_postgresql() and not mode == 'files') or mode == 'sql':
import sql
table_name = sql.get_formdef_table_name(self)
cls = new.classobj(self.url_name.title(), (sql.SqlFormData,),
{'_formdef': self,
'_table_name': table_name})
actions = sql.do_formdef_tables(self)
else:
cls = new.classobj(self.url_name.title(), (FormData,),
{'_names': 'form-%s' % self.url_name,
'_formdef': self})
actions = []
setattr(sys.modules['formdef'], self.url_name.title(), cls)
setattr(sys.modules['wcs.formdef'], self.url_name.title(), cls)
if actions:
for action in actions:
getattr(cls, action)()
return cls
def get_new_field_id(self):
if self.max_field_id is None:
field_id = 1
else:
field_id = self.max_field_id + 1
self.max_field_id = field_id
return str(field_id)
def get_new_url_name(self):
new_url_name = simplify(self.name)
base_new_url_name = new_url_name
suffix_no = 0
while True:
try:
formdef = self.get_by_urlname(new_url_name, ignore_migration=True)
except KeyError:
break
if formdef.id == self.id:
break
suffix_no += 1
new_url_name = '%s-%s' % (base_new_url_name, suffix_no)
return new_url_name
def store(self):
new_url_name = self.get_new_url_name()
if not self.url_name:
self.url_name = new_url_name
if new_url_name != self.url_name:
# title changed, url will be changed only if the formdef is
# currently being imported (self.id is None) or if there are not
# yet any submitted forms
data_class = self.data_class()
if self.id is None or data_class().count() == 0:
self.url_name = new_url_name
self.last_modification_time = time.localtime()
if get_request() and get_request().user:
self.last_modification_user_id = str(get_request().user.id)
else:
self.last_modification_user_id = None
t = StorableObject.store(self)
if get_publisher().is_using_postgresql():
import sql
sql.do_formdef_tables(self, rebuild_views=True,
rebuild_global_views=True)
return t
def update_endpoints(self):
if get_publisher().is_using_postgresql():
import sql
sql.do_formdef_tables(self, rebuild_views=True,
rebuild_global_views=True)
def get_category(self):
if self.category_id:
try:
return Category.get(self.category_id)
except KeyError:
return None
else:
return None
def set_category(self, category):
if category:
self.category_id = category.id
elif self.category_id:
self.category_id = None
category = property(get_category, set_category)
_workflow = None
def get_workflow(self):
if self._workflow:
return self._workflow
from wcs.workflows import Workflow
if self.workflow_id:
try:
workflow = Workflow.get(self.workflow_id)
except KeyError:
return Workflow.get_unknown_workflow()
self._workflow = self.get_workflow_with_options(workflow)
return self._workflow
else:
return Workflow.get_default_workflow()
def get_workflow_with_options(self, workflow):
# this needs to be kept in sync with admin/forms.ptl,
# FormDefPage::workflow
if not self.workflow_options:
return workflow
for status in workflow.possible_status:
for item in status.items:
prefix = '%s*%s*' % (status.id, item.id)
for parameter in item.get_parameters():
value = self.workflow_options.get(prefix + parameter)
if value:
setattr(item, parameter, value)
return workflow
def set_workflow(self, workflow):
if workflow:
self.workflow_id = workflow.id
elif self.workflow_id:
self.workflow_id = None
workflow = property(get_workflow, set_workflow)
def get_variable_options(self):
variables = {}
if not self.workflow.variables_formdef:
return variables
if not self.workflow_options:
return variables
for field in self.workflow.variables_formdef.fields:
if not field.varname:
continue
option_name = 'form_option_' + field.varname
variables[option_name] = self.workflow_options.get(field.varname)
if field.store_structured_value:
if '%s_structured' % field.varname in self.workflow_options:
variables[option_name + '_structured'] = self.workflow_options.get(
'%s_structured' % field.varname)
return variables
def get_variable_options_for_form(self):
variables = {}
if not self.workflow.variables_formdef:
return variables
if not self.workflow_options:
return {}
for field in self.workflow.variables_formdef.fields:
if not field.varname:
continue
variables[str(field.id)] = self.workflow_options.get(field.varname)
return variables
def set_variable_options(self, form):
data = self.workflow.variables_formdef.get_data(form)
variables = {}
for field in self.workflow.variables_formdef.fields:
if not field.varname:
continue
variables[field.varname] = data.get(field.id)
if field.store_display_value:
variables[field.varname + '_display'] = data.get(field.id + '_display')
if field.store_structured_value:
variables[field.varname + '_structured'] = data.get(field.id + '_structured')
if not self.workflow_options:
self.workflow_options = {}
self.workflow_options.update(variables)
def get_by_urlname(cls, url_name, ignore_migration=False):
return cls.get_on_index(url_name, 'url_name', ignore_migration=ignore_migration)
get_by_urlname = classmethod(get_by_urlname)
def get_url(self, backoffice=False, preview=False):
if backoffice:
base_url = get_publisher().get_backoffice_url() + '/management'
elif preview:
base_url = get_publisher().get_frontoffice_url() + '/preview'
else:
base_url = get_publisher().get_frontoffice_url()
return '%s/%s/' % (base_url, self.url_name)
def create_form(self, page_no = 0, displayed_fields = None):
form = Form(enctype = "multipart/form-data", use_tokens = False)
# had: , use_tokens = not self.confirmation)
self.add_fields_to_form(form, page_no = page_no, displayed_fields = displayed_fields)
return form
def add_fields_to_form(self, form, page_no = 0, displayed_fields = None, form_data = None):
current_page = 0
for field in self.fields:
if field.type == 'page':
if field is self.fields[0]:
continue
current_page += 1
if current_page > page_no:
break
continue
if current_page != page_no:
continue
if type(displayed_fields) is list:
displayed_fields.append(field)
value = None
if form_data:
value = form_data.get(field.id)
field.add_to_form(form, value)
def get_page(self, page_no):
return [x for x in self.fields if x.type == 'page'][page_no]
def create_view_form(self, dict = {}, use_tokens = True, visible = True):
if visible:
form = Form(enctype = 'multipart/form-data', use_tokens = use_tokens)
else:
form = Form(enctype = 'multipart/form-data',
use_tokens = use_tokens,
style = 'display: none;')
on_disabled_page = False
on_page = False
for field in self.fields:
if field.type == 'page':
on_disabled_page = False
if not field.is_visible(dict, self):
on_disabled_page = True
form_field = False
for f in self.fields[self.fields.index(field)+1:]:
if f.key == 'page':
break
if isinstance(f, fields.WidgetField):
form_field = True
break
if form_field is False:
on_disabled_page = True
if on_disabled_page:
continue
if field.type == 'page':
if on_page:
form.widgets.append(HtmlWidget(htmltext('</div>')))
form.widgets.append(HtmlWidget(
htmltext('<div class="page"><h3>%s</h3>' % field.label)))
on_page = True
value = dict.get(field.id, '')
field.add_to_view_form(form, value)
if on_page:
form.widgets.append(HtmlWidget(htmltext('</div>')))
return form
def get_data(self, form):
d = {}
for field in self.fields:
widget = form.get_widget('f%s' % field.id)
if widget:
d[field.id] = widget.parse()
if d.get(field.id) and field.convert_value_from_str:
d[field.id] = field.convert_value_from_str(d[field.id])
if d.get(field.id) and field.store_display_value:
display_value = field.store_display_value(d, field.id)
if display_value:
d['%s_display' % field.id] = display_value
elif d.has_key('%s_display' % field.id):
del d['%s_display' % field.id]
if d.get(field.id) and field.store_structured_value:
structured_value = field.store_structured_value(d, field.id)
if structured_value:
d['%s_structured' % field.id] = structured_value
elif '%s_structured' % field.id in d:
del d['%s_structured' % field.id]
if widget and widget.cleanup:
widget.cleanup()
return d
def export_to_json(self, include_id=False, indent=None):
charset = get_publisher().site_charset
root = {}
root['name'] = unicode(self.name, charset)
if include_id and self.id:
root['id'] = str(self.id)
if self.category:
root['category'] = unicode(self.category.name, charset)
root['category_id'] = str(self.category.id)
if self.workflow:
root['workflow'] = unicode(self.workflow.name, charset)
root['workflow_id'] = str(self.workflow.id)
if self.max_field_id is None and self.fields:
self.max_field_id = max([lax_int(x.id) for x in self.fields])
more_attributes = []
if self.max_field_id:
more_attributes.append('max_field_id')
if self.last_modification_time:
more_attributes.append('last_modification_time')
if include_id:
more_attributes.append('last_modification_user_id')
for attribute in self.TEXT_ATTRIBUTES + self.BOOLEAN_ATTRIBUTES + more_attributes:
if not hasattr(self, attribute):
continue
root[attribute] = getattr(self, attribute)
if type(root[attribute]) is time.struct_time:
root[attribute] = time.strftime('%Y-%m-%dT%H:%M:%S',
root[attribute])
root['fields'] = []
if self.fields:
for field in self.fields:
root['fields'].append(field.export_to_json(include_id=include_id))
if self.workflow_options:
root['options'] = self.workflow_options
return json.dumps(root, indent=indent)
def import_from_json(cls, fd, charset=None, include_id=False):
if charset is None:
charset = get_publisher().site_charset
formdef = cls()
def unicode2str(v):
if isinstance(v, dict):
return dict([(unicode2str(k), unicode2str(v)) for k, v in v.items()])
elif isinstance(v, list):
return [unicode2str(x) for x in v]
elif isinstance(v, unicode):
return v.encode(charset)
else:
return v
# we have to make sure all strings are str object, not unicode.
value = unicode2str(json.load(fd))
if include_id and 'id' in value:
formdef.id = value.get('id')
if include_id and 'category_id' in value:
formdef.category_id = value.get('category_id')
elif 'category' in value:
category = value.get('category')
for c in Category.select():
if c.name == category:
formdef.category_id = c.id
break
if include_id and 'workflow_id' in value:
formdef.workflow_id = value.get('workflow_id')
elif 'workflow' in value:
workflow = value.get('workflow')
from wcs.workflows import Workflow
for w in Workflow.select():
if w.name == workflow:
formdef.workflow_id = w.id
break
more_attributes = ['max_field_id', 'last_modification_time',
'last_modification_user_id']
for attribute in cls.TEXT_ATTRIBUTES + cls.BOOLEAN_ATTRIBUTES + more_attributes:
if attribute in value:
setattr(formdef, attribute, value.get(attribute))
# fixup last_modification_time to the proper type
if formdef.last_modification_time:
formdef.last_modification_time = time.strptime(
formdef.last_modification_time, '%Y-%m-%dT%H:%M:%S')
formdef.fields = []
for i, field in enumerate(value.get('fields', [])):
try:
field_o = fields.get_field_class_by_type(field.get('type'))()
except KeyError:
raise ValueError()
field_o.init_with_json(field, include_id=include_id)
if not field_o.id:
# this assumes all fields will have id, or none of them
field_o.id = str(i)
formdef.fields.append(field_o)
if formdef.fields and not formdef.max_field_id:
formdef.max_field_id = max([lax_int(x.id) for x in formdef.fields])+1
if value.get('options'):
formdef.workflow_options = value.get('options')
return formdef
import_from_json = classmethod(import_from_json)
def export_to_xml(self, include_id=False):
charset = get_publisher().site_charset
root = ET.Element('formdef')
if include_id and self.id:
root.attrib['id'] = str(self.id)
for text_attribute in list(self.TEXT_ATTRIBUTES):
if not hasattr(self, text_attribute) or not getattr(self, text_attribute):
continue
ET.SubElement(root, text_attribute).text = unicode(
getattr(self, text_attribute), charset)
for boolean_attribute in self.BOOLEAN_ATTRIBUTES:
if not hasattr(self, boolean_attribute):
continue
value = getattr(self, boolean_attribute)
if value:
value = 'true'
else:
value = 'false'
ET.SubElement(root, boolean_attribute).text = value
if self.category:
elem = ET.SubElement(root, 'category')
elem.text = unicode(self.category.name, charset)
if include_id:
elem.attrib['category_id'] = str(self.category.id)
if self.workflow:
elem = ET.SubElement(root, 'workflow')
elem.text = unicode(self.workflow.name, charset)
if include_id:
elem.attrib['workflow_id'] = str(self.workflow.id)
if self.max_field_id is None and self.fields:
self.max_field_id = max([lax_int(x.id) for x in self.fields])
if self.max_field_id:
ET.SubElement(root, 'max_field_id').text = str(self.max_field_id)
if self.last_modification_time:
elem = ET.SubElement(root, 'last_modification')
elem.text = time.strftime('%Y-%m-%d %H:%M:%S', self.last_modification_time)
if include_id:
elem.attrib['user_id'] = str(self.last_modification_user_id)
fields = ET.SubElement(root, 'fields')
for field in self.fields or []:
fields.append(field.export_to_xml(charset=charset, include_id=include_id))
options = ET.SubElement(root, 'options')
for option in self.workflow_options or []:
element = ET.SubElement(options, 'option')
element.attrib['varname'] = option
option_value = self.workflow_options.get(option)
if isinstance(option_value, basestring):
element.text = unicode(self.workflow_options.get(option, ''), charset)
else:
pass # TODO: extend support to other types
return root
def import_from_xml(cls, fd, charset=None, include_id=False):
try:
tree = ET.parse(fd)
except:
raise ValueError()
return cls.import_from_xml_tree(tree, charset=charset,
include_id=include_id)
import_from_xml = classmethod(import_from_xml)
def import_from_xml_tree(cls, tree, include_id=False, charset=None):
if charset is None:
charset = get_publisher().site_charset
formdef = cls()
if tree.find('name') is None or not tree.find('name').text:
raise FormdefImportError(N_('Missing name'))
# if the tree we get is actually a ElementTree for real, we get its
# root element and go on happily.
if not ET.iselement(tree):
tree = tree.getroot()
if tree.tag != 'formdef':
raise FormdefImportError(N_('Not a form'))
if include_id and tree.attrib.get('id'):
formdef.id = tree.attrib.get('id')
for text_attribute in list(cls.TEXT_ATTRIBUTES):
value = tree.find(text_attribute)
if value is None:
continue
setattr(formdef, text_attribute, value.text.encode(charset))
for boolean_attribute in cls.BOOLEAN_ATTRIBUTES:
value = tree.find(boolean_attribute)
if value is None:
continue
setattr(formdef, boolean_attribute, value.text == 'true')
formdef.fields = []
for i, field in enumerate(tree.find('fields')):
try:
field_o = fields.get_field_class_by_type(field.findtext('type'))()
except KeyError:
raise ValueError()
field_o.init_with_xml(field, charset, include_id=include_id)
if not field_o.id:
# this assumes all fields will have id, or none of them
field_o.id = str(i)
formdef.fields.append(field_o)
if formdef.fields:
value = tree.find('max_field_id')
if value is not None:
formdef.max_field_id = int(value.text)
else:
formdef.max_field_id = max([lax_int(x.id) for x in formdef.fields])+1
formdef.workflow_options = {}
for option in tree.findall('options/option'):
option_value = None
if option.text:
option_value = option.text.encode(charset)
formdef.workflow_options[option.attrib.get('varname')] = option_value
if tree.find('last_modification') is not None:
node = tree.find('last_modification')
formdef.last_modification_time = time.strptime(node.text, '%Y-%m-%d %H:%M:%S')
if include_id and node.attrib.get('user_id'):
formdef.last_modification_user_id = node.attrib.get('user_id')
if tree.find('category') is not None:
category_node = tree.find('category')
if include_id and category_node.attrib.get('category_id'):
category_id = str(category_node.attrib.get('category_id'))
if Category.has_key(category_id):
formdef.category_id = category_id
else:
category = category_node.text.encode(charset)
for c in Category.select():
if c.name == category:
formdef.category_id = c.id
break
if tree.find('workflow') is not None:
from wcs.workflows import Workflow
workflow_node = tree.find('workflow')
if include_id and workflow_node.attrib.get('workflow_id'):
workflow_id = workflow_node.attrib.get('workflow_id')
if Workflow.has_key(workflow_id):
formdef.workflow_id = workflow_id
else:
workflow = workflow_node.text.encode(charset)
for w in Workflow.select():
if w.name == workflow:
formdef.workflow_id = w.id
break
return formdef
import_from_xml_tree = classmethod(import_from_xml_tree)
def get_detailed_email_form(self, formdata, url):
details = []
display_username = True
# this is custom code so it is possible to mark forms as anonyms, this
# is done through the VoteAnonymity field, this is very specific but
# isn't generalised yet into an useful extension mechanism, as it's not
# clear at the moment what could be useful.
for f in self.fields:
if f.key == 'vote-anonymity':
display_username = False
break
if display_username and formdata.user_id and formdata.user:
details.append(_('User name:'))
details.append(' %s' % formdata.user.name)
details.append('')
data = formdata.data
for field in self.fields:
if isinstance(field, (fields.SubtitleField, fields.TitleField, fields.CommentField,
fields.PageField)):
continue
if data.get(field.id) is None:
continue
if data.get(field.id + '_display'):
value = data.get(field.id + '_display')
else:
value = data.get(field.id)
details.append(_('%s:') % field.label)
if field.type in ('text', 'file'):
# XXX: howto support preformatted text in a dl in docutils ?
details.append((' %s' % value).replace('\n', '\n '))
else:
details.append('%s' % field.get_rst_view_value(value, indent=' '))
details.append('')
return '\n'.join(details)
def get_submitter_email(self, formdata):
users_cfg = get_cfg('users', {})
field_email = users_cfg.get('field_email') or 'email'
if formdata.user:
if field_email == 'email' and formdata.user.email:
return formdata.user.email
elif formdata.user.form_data and formdata.user.form_data.get(field_email):
return formdata.user.form_data.get(field_email)
else:
# this shouldn't happen, but then data can get unsynced, so
# even if there's some user custom form with an unfilled email
# form we look at the straight email attribute, and use it if
# it exists.
if formdata.user.email:
return formdata.user.email
# if there is no user, or user has no email address, look
# up in submitted form for one that would hold the user
# email (the one set to be prefilled by user email)
if not formdata.data:
return None
fields = formdata.formdef.fields
for field in fields:
if not hasattr(field, 'prefill'):
continue
if field.prefill and field.prefill.get('type') == 'user':
if field.prefill.get('value') == field_email:
v = formdata.data.get(field.id)
if v:
return v
return None
def get_substitution_variables(self, minimal=False):
d = {
'form_name': self.name,
}
if not minimal:
d['form_objects'] = FormDefSubstVar(self)
if self.category:
d.update(self.category.get_substitution_variables(minimal=minimal))
d.update(self.get_variable_options())
return d
def get_detailed_evolution(self, formdata):
if not formdata.evolution:
return None
details = []
evo = formdata.evolution[-1]
if evo.who:
evo_who = None
if evo.who == '_submitter':
if formdata.user_id:
evo_who = formdata.user_id
elif formdata.user_hash and get_request() and (
get_request().user and formdata.is_submitter(get_request().user)):
evo_who = get_request().user.id
else:
evo_who = evo.who
if evo_who:
user_who = get_publisher().user_class.get(evo_who, ignore_errors=True)
if user_who:
details.append(_('User name'))
details.append(' %s' % user_who.name)
if evo.status:
details.append(_('Status'))
details.append(' %s' % formdata.get_status_label())
if evo.comment:
details.append('\n%s\n' % evo.comment)
return '\n\n----\n\n' + '\n'.join(details)
def is_of_concern_for_role_id(self, role_id):
if not self.workflow_roles:
return False
return (role_id in self.workflow_roles.values())
def is_of_concern_for_user(self, user, formdata=None):
if not self.workflow_roles:
self.workflow_roles = {}
for role_id in self.workflow_roles.values():
if role_id in (user.roles or []):
return True
data_class = self.data_class()
for role_id in user.roles or []:
if data_class.get_ids_with_indexed_value('workflow_roles', role_id):
return True
return False
def is_user_allowed_read(self, user, formdata=None):
if not user:
if formdata and get_session() and \
get_session().is_anonymous_submitter(formdata):
return True
return False
if user.is_admin:
return True
if user.roles: # set(None) raise an exception for python>2.6
user_roles = set(user.roles)
else:
user_roles = set([])
user_roles.add(logged_users_role().id)
def ensure_role_are_strings(roles):
# makes sure all roles are defined as strings, as different origins
# (formdef, user, workflow status...) may define them differently.
return set([str(x) for x in roles if x])
user_roles = ensure_role_are_strings(user_roles)
if formdata and formdata.is_submitter(user):
return True
if self.is_of_concern_for_user(user):
if not formdata:
return True
if formdata:
# current status
concerned_roles = ensure_role_are_strings(formdata.get_concerned_roles())
if '_submitter' in concerned_roles and formdata.is_submitter(user):
return True
if user_roles.intersection(concerned_roles):
return True
return False
def is_user_allowed_read_status_and_history(self, user, formdata=None):
if user and user.is_admin:
return True
if user and user.roles:
user_roles = set(user.roles)
else:
user_roles = set([])
if not self.workflow_roles:
self.workflow_roles = {}
form_roles = [x for x in self.workflow_roles.keys() if x]
if user and self.private_status_and_history and not user_roles.intersection(form_roles):
return False
return self.is_user_allowed_read(user, formdata=formdata)
def is_disabled(self):
if self.disabled:
return True
if self.publication_date:
try:
publication_datetime = get_as_datetime(self.publication_date)
except ValueError:
return False
if publication_datetime > datetime.datetime.now():
return True
if self.expiration_date:
try:
expiration_datetime = get_as_datetime(self.expiration_date)
except ValueError:
return False
if expiration_datetime < datetime.datetime.now():
return True
return False
@classmethod
def update_mimetypes(cls, previous_mimetypes, new_mimetypes):
# look for file fields in all formdefs, to update them with the
# new mimetypes.
if previous_mimetypes == new_mimetypes:
return
previous_mimetypes = ','.join(previous_mimetypes)
new_mimetypes = ','.join(new_mimetypes)
for formdef in cls.select():
changed = False
for field in formdef.fields:
if not hasattr(field, 'file_type'):
continue
if not field.file_type:
continue
if previous_mimetypes in field.file_type:
field.file_type.remove(previous_mimetypes)
field.file_type.append(new_mimetypes)
changed = True
if changed:
formdef.store()
# don't pickle _workflow cache
def __getstate__(self):
odict = self.__dict__
if odict.has_key('_workflow'):
del odict['_workflow']
return odict
def __setstate__(self, dict):
self.__dict__ = dict
self._workflow = None
class FormDefSubstVar(object):
def __init__(self, formdef):
self.formdef = formdef
@property
def count(self):
return self.formdef.data_class().count()
def __getattr__(self, attribute):
if attribute.startswith('count_status_'):
status = attribute[len('count_status_'):]
return len(self.formdef.data_class().get_ids_with_indexed_value(
'status', 'wf-%s' % status))
return super(FormDefSubstVar, self).__getattr__(attribute)
from qommon.admin.emails import EmailsDirectory
EmailsDirectory.register('new_user', N_('Notification of creation to user'),
enabled = False,
category = N_('Workflow'),
default_subject = N_('New form ([name])'),
default_body = N_('''\
Hello,
[if-any user]
This mail is a reminder about the form you just submitted; you can consult it
with this link: [url]
[else]
This mail is a reminder about the form you just submitted.
[end]
[if-any details]
For reference, here are the details:
[details]
[end]
'''))
EmailsDirectory.register('change_user', N_('Notification of change to user'),
N_('Available variables: user, url, before, after, evolution'),
category = N_('Workflow'),
default_subject = N_('Form status change'),
default_body = N_('''\
Hello,
[if-any form_status_changed]
Status of the form you submitted just changed (from "[before]" to "[after]").
[end]
[if-any user]
You can consult it with this link:
[url]
[end]
[if-any form_comment]New comment: [form_comment][end]
[if-any evolution]
[evolution]
[end]
'''))
EmailsDirectory.register('new_receiver', N_('Notification of creation to receiver'),
N_('Available variables: name, url, details'), enabled = False,
category = N_('Workflow'),
default_subject = N_('New form ([name])'),
default_body = N_('''\
Hello,
A new form has been submitted, you can see it with this link:
[form_url_backoffice]
[if-any details]
For reference, here are the details:
[details]
[end]
'''))
EmailsDirectory.register('change_receiver', N_('Notification of change to receiver'),
N_('Available variables: name, url, before, after, evolution'),
category = N_('Workflow'),
default_subject = N_('Form status change ([name])'),
default_body = N_('''\
Hello,
A form just changed, you can consult it with this link:
[form_url_backoffice]
[if-any form_comment]New comment: [form_comment][end]
[if-any evolution]
[evolution]
[end]
'''))
Substitutions.register('form_name', category=N_('Form'), comment=N_('Form Name'))