wcs/wcs/workflows.py

1820 lines
67 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from qommon import ezt
from cStringIO import StringIO
import copy
import xml.etree.ElementTree as ET
import random
import os
import string
from quixote import get_request
import qommon.misc
from qommon.misc import C_
from qommon.storage import StorableObject
from qommon.form import *
from qommon import emails, get_cfg, get_logger
from quixote.directory import Directory
from quixote.html import htmltext
import qommon.errors
from wcs.roles import Role, logged_users_role, get_user_roles
from wcs.formdef import FormDef
from wcs.formdata import Evolution
from wcs.fields import SubtitleField, TitleField, CommentField, PageField
if not __name__.startswith('wcs.') and not __name__ == "__main__":
raise ImportError('Import of workflows module must be absolute (import wcs.workflows)')
def lax_int(s):
try:
return int(s)
except (ValueError, TypeError):
return -1
class WorkflowImportError(Exception):
pass
class AbortActionException(Exception):
pass
class AttachmentEvolutionPart: #pylint: disable=C1001
orig_filename = None
base_filename = None
content_type = None
charset = None
def __init__(self, base_filename, fp, orig_filename=None, content_type=None,
charset=None):
self.base_filename = base_filename
self.orig_filename = orig_filename or base_filename
self.content_type = content_type
self.charset = charset
self.fp = fp
def from_upload(cls, upload):
return AttachmentEvolutionPart(
upload.base_filename,
upload.fp,
upload.orig_filename,
upload.content_type,
upload.charset)
from_upload = classmethod(from_upload)
def get_file_pointer(self):
return open(self.filename)
def __getstate__(self):
odict = self.__dict__.copy()
if not odict.has_key('fp'):
return odict
del odict['fp']
dirname = os.path.join(get_publisher().app_dir, 'attachments')
if not os.path.exists(dirname):
os.mkdir(dirname)
def get_new_filename():
r = random.SystemRandom()
while True:
id = ''.join([r.choice(string.lowercase) for x in range(16)])
filename = os.path.join(dirname, id)
if not os.path.exists(filename):
return filename
odict['filename'] = get_new_filename()
self.fp.seek(0)
fd = file(odict['filename'], 'w')
fd.write(self.fp.read())
fd.close()
return odict
def view(self):
return htmltext('<p><a href="attachment?f=%s">%s</a>' % (
os.path.basename(self.filename), self.orig_filename))
class TemplatingError(qommon.errors.PublishError):
def __init__(self, description):
self.title = _('Templating Error')
self.description = description
class DuplicateStatusNameError(Exception):
pass
def get_varnames(fields):
'''Extract variable names for helping people fill their templates.
Prefer to variable name to the numeric field name.
'''
varnames = []
for field in fields:
if isinstance(field, (SubtitleField, TitleField, CommentField, PageField)):
continue
# add it as f$n$
label = field.label
if field.varname:
varnames.append(('var_%s' % field.varname, label))
else:
varnames.append(('f%s' % field.id, label))
return varnames
class WorkflowVariablesFieldsFormDef(FormDef):
'''Class to handle workflow variables, it loads and saves from/to
the workflow object 'variables_formdef' attribute.'''
def __init__(self, workflow):
self.id = None
self.name = workflow.name
self.workflow = workflow
if workflow.variables_formdef:
self.fields = self.workflow.variables_formdef.fields
self.max_field_id = self.workflow.variables_formdef.max_field_id
else:
self.fields = []
def store(self):
for field in self.fields:
if hasattr(field, 'widget_class'):
if not field.varname:
field.varname = misc.simplify(field.label, space='_')
self.workflow.variables_formdef = self
self.workflow.store()
class Workflow(StorableObject):
_names = 'workflows'
name = None
possible_status = None
roles = None
variables_formdef = None
last_modification_time = None
last_modification_user_id = None
def __init__(self, name = None):
StorableObject.__init__(self)
self.name = name
self.possible_status = []
self.roles = {'_receiver': _('Recipient')}
def migrate(self):
changed = False
if not self.__dict__.has_key('roles') or self.roles is None:
self.roles = {'_receiver': _('Recipient')}
changed = True
for status in self.possible_status:
changed |= status.migrate()
if changed:
self.store()
def store(self):
must_update_endpoints = False
if self.id:
old_self = self.get(self.id, ignore_errors=True, ignore_migration=True)
if old_self:
old_endpoints = set([x.id for x in old_self.get_endpoint_status()])
if old_endpoints != set([x.id for x in self.get_endpoint_status()]):
must_update_endpoints = True
self.last_modification_time = time.localtime()
if get_request() and get_request().user:
self.last_modification_user_id = str(get_request().user.id)
else:
self.last_modification_user_id = None
StorableObject.store(self)
# instruct all related formdefs to update their security rules, and
# their views if endpoints have changed.
for form in FormDef.select(lambda x: x.workflow_id == self.id, ignore_migration=True):
form.data_class().rebuild_security()
if must_update_endpoints:
form.update_endpoints()
def get(cls, id, ignore_errors=False, ignore_migration=False):
if id == '_default':
return cls.get_default_workflow()
return super(Workflow, cls).get (id, ignore_errors=ignore_errors,
ignore_migration=ignore_migration)
get = classmethod(get)
def add_status(self, name, id=None):
if [x for x in self.possible_status if x.name == name]:
raise DuplicateStatusNameError()
status = WorkflowStatus(name)
status.parent = self
if id is None:
if self.possible_status:
status.id = str(max([lax_int(x.id) for x in self.possible_status]) + 1)
else:
status.id = '1'
else:
status.id = id
self.possible_status.append(status)
return status
def get_status(self, id):
for status in self.possible_status:
if status.id == id:
return status
raise KeyError()
def __setstate__(self, dict):
self.__dict__.update(dict)
for s in self.possible_status:
s.parent = self
for i, item in enumerate(s.items):
item.parent = s
if not item.id:
item.id = '%d' % (i+1)
def get_waitpoint_status(self):
# a waitpoint status is a status waiting for an event (be it user
# interaction or something else), but can also be an endpoint (where
# the user would wait, infinitely).
waitpoint_status = []
for status in self.possible_status:
waitpoint = False
endpoint = True
if status.forced_endpoint:
endpoint = True
else:
for item in status.items:
endpoint = item.endpoint and endpoint
waitpoint = item.waitpoint or waitpoint
if endpoint or waitpoint:
waitpoint_status.append(status)
return waitpoint_status
def get_endpoint_status(self):
not_endpoint_status = self.get_not_endpoint_status()
endpoint_status = [x for x in self.possible_status if x not in not_endpoint_status]
return endpoint_status
def get_not_endpoint_status(self):
not_endpoint_status = []
for status in self.possible_status:
if status.forced_endpoint:
continue
endpoint = True
for item in status.items:
endpoint = item.endpoint and endpoint
if endpoint is False:
not_endpoint_status.append(status)
break
return not_endpoint_status
def has_options(self):
for status in self.possible_status:
for item in status.items:
for parameter in item.get_parameters():
if not getattr(item, parameter):
return True
return False
def remove_self(self):
for form in FormDef.select(lambda x: x.workflow_id == self.id):
form.workflow_id = None
form.store()
StorableObject.remove_self(self)
def export_to_xml(self, include_id=False):
charset = get_publisher().site_charset
root = ET.Element('workflow')
if include_id and self.id and not str(self.id).startswith('_'):
root.attrib['id'] = str(self.id)
ET.SubElement(root, 'name').text = unicode(self.name, charset)
roles_node = ET.SubElement(root, 'roles')
if self.roles:
for role_id, role_label in self.roles.items():
role_node = ET.SubElement(roles_node, 'role')
role_node.attrib['id'] = role_id
role_node.text = unicode(role_label, charset)
if self.last_modification_time:
elem = ET.SubElement(root, 'last_modification')
elem.text = time.strftime('%Y-%m-%d %H:%M:%S', self.last_modification_time)
if include_id:
elem.attrib['user_id'] = str(self.last_modification_user_id)
possible_status = ET.SubElement(root, 'possible_status')
for status in self.possible_status:
possible_status.append(status.export_to_xml(charset=charset,
include_id=include_id))
if self.variables_formdef:
variables = ET.SubElement(root, 'variables')
formdef = ET.SubElement(variables, 'formdef')
ET.SubElement(formdef, 'name').text = '-' # required by formdef xml import
fields = ET.SubElement(formdef, 'fields')
for field in self.variables_formdef.fields:
fields.append(field.export_to_xml(charset=charset, include_id=include_id))
return root
def import_from_xml(cls, fd, include_id=False):
try:
tree = ET.parse(fd)
except:
raise ValueError()
return cls.import_from_xml_tree(tree, include_id=include_id)
import_from_xml = classmethod(import_from_xml)
def import_from_xml_tree(cls, tree, include_id=False):
charset = get_publisher().site_charset
workflow = cls()
if tree.find('name') is None or not tree.find('name').text:
raise WorkflowImportError(N_('Missing name'))
# if the tree we get is actually a ElementTree for real, we get its
# root element and go on happily.
if not ET.iselement(tree):
tree = tree.getroot()
if tree.tag != 'workflow':
raise WorkflowImportError(N_('Not a workflow'))
if include_id and tree.attrib.get('id'):
workflow.id = tree.attrib.get('id')
workflow.name = tree.find('name').text.encode(charset)
if tree.find('roles') is not None:
workflow.roles = {}
for role_node in tree.findall('roles/role'):
workflow.roles[role_node.attrib['id']] = role_node.text.encode(charset)
if tree.find('last_modification') is not None:
node = tree.find('last_modification')
workflow.last_modification_time = time.strptime(node.text, '%Y-%m-%d %H:%M:%S')
if include_id and node.attrib.get('user_id'):
workflow.last_modification_user_id = node.attrib.get('user_id')
workflow.possible_status = []
for status in tree.find('possible_status'):
status_o = WorkflowStatus()
status_o.parent = workflow
status_o.init_with_xml(status, charset, include_id=include_id)
workflow.possible_status.append(status_o)
variables = tree.find('variables')
if variables is not None:
formdef = variables.find('formdef')
imported_formdef = FormDef.import_from_xml_tree(formdef, include_id=True)
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow)
workflow.variables_formdef.fields = imported_formdef.fields
return workflow
import_from_xml_tree = classmethod(import_from_xml_tree)
def get_list_of_roles(self, include_logged_in_users=True):
t = []
t.append(('_submitter', C_('role|User'), '_submitter'))
for workflow_role in self.roles.items():
t.append(list(workflow_role) + [workflow_role[0]])
if include_logged_in_users:
t.append((logged_users_role().id, logged_users_role().name, logged_users_role().id))
if get_user_roles():
t.append((None, '----', None))
t.extend(get_user_roles())
return t
def render_list_of_roles(self, roles):
t = []
for r in roles:
role_label = get_role_translation_label(self, r)
if role_label:
t.append(role_label)
return ', '.join(t)
def get_unknown_workflow(cls):
workflow = Workflow(name=_('Unknown'))
workflow.id = '_unknown'
return workflow
get_unknown_workflow = classmethod(get_unknown_workflow)
def get_default_workflow(cls):
from qommon.admin.emails import EmailsDirectory
workflow = Workflow(name=_('Default'))
workflow.id = '_default'
workflow.roles = {'_receiver': _('Recipient')}
just_submitted_status = workflow.add_status(_('Just Submitted'), 'just_submitted')
just_submitted_status.visibility = ['_receiver']
new_status = workflow.add_status(_('New'), 'new')
new_status.colour = '66FF00'
rejected_status = workflow.add_status(_('Rejected'), 'rejected')
rejected_status.colour = 'FF3300'
accepted_status = workflow.add_status(_('Accepted'), 'accepted')
accepted_status.colour = '66CCFF'
finished_status = workflow.add_status(_('Finished'), 'finished')
finished_status.colour = 'CCCCCC'
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
import wf.jump
jump_to_new = wf.jump.JumpWorkflowStatusItem()
jump_to_new.id = '_jump_to_new'
jump_to_new.status = new_status.id
jump_to_new.parent = just_submitted_status
notify_new_receiver_email = SendmailWorkflowStatusItem()
notify_new_receiver_email.id = '_notify_new_receiver_email'
notify_new_receiver_email.to = ['_receiver']
notify_new_receiver_email.subject = EmailsDirectory.get_subject('new_receiver')
notify_new_receiver_email.body = EmailsDirectory.get_body('new_receiver')
if not EmailsDirectory.is_enabled('new_receiver'):
notify_new_receiver_email = None
notify_new_user_email = SendmailWorkflowStatusItem()
notify_new_user_email.id = '_notify_new_user_email'
notify_new_user_email.to = ['_submitter']
notify_new_user_email.subject = EmailsDirectory.get_subject('new_user')
notify_new_user_email.body = EmailsDirectory.get_body('new_user')
if not EmailsDirectory.is_enabled('new_user'):
notify_change_user_email = None
notify_change_receiver_email = SendmailWorkflowStatusItem()
notify_change_receiver_email.id = '_notify_change_receiver_email'
notify_change_receiver_email.to = ['_receiver']
notify_change_receiver_email.subject = EmailsDirectory.get_subject('change_receiver')
notify_change_receiver_email.body = EmailsDirectory.get_body('change_receiver')
if not EmailsDirectory.is_enabled('change_receiver'):
notify_change_receiver_email = None
notify_change_user_email = SendmailWorkflowStatusItem()
notify_change_user_email.id = '_notify_change_user_email'
notify_change_user_email.to = ['_submitter']
notify_change_user_email.subject = EmailsDirectory.get_subject('change_user')
notify_change_user_email.body = EmailsDirectory.get_body('change_user')
if not EmailsDirectory.is_enabled('change_user'):
notify_change_user_email = None
if notify_new_receiver_email:
notify_new_receiver_email.parent = just_submitted_status
just_submitted_status.items.append(notify_new_receiver_email)
if notify_new_user_email:
notify_new_user_email.parent = just_submitted_status
just_submitted_status.items.append(notify_new_user_email)
just_submitted_status.items.append(jump_to_new)
if notify_change_receiver_email:
accepted_status.items.append(notify_change_receiver_email)
notify_change_receiver_email.parent = accepted_status
notify_change_receiver_email = copy.copy(notify_change_receiver_email)
rejected_status.items.append(notify_change_receiver_email)
notify_change_receiver_email.parent = rejected_status
notify_change_receiver_email = copy.copy(notify_change_receiver_email)
finished_status.items.append(notify_change_receiver_email)
notify_change_receiver_email.parent = finished_status
if notify_change_user_email:
accepted_status.items.append(notify_change_user_email)
notify_change_user_email.parent = accepted_status
notify_change_user_email = copy.copy(notify_change_user_email)
rejected_status.items.append(notify_change_user_email)
notify_change_user_email.parent = rejected_status
notify_change_user_email = copy.copy(notify_change_user_email)
finished_status.items.append(notify_change_user_email)
notify_change_user_email.parent = finished_status
new_status.items.append(commentable)
commentable.parent = new_status
commentable = copy.copy(commentable)
accepted_status.items.append(commentable)
commentable.parent = accepted_status
accept = ChoiceWorkflowStatusItem()
accept.id = '_accept'
accept.label = _('Accept')
accept.by = ['_receiver']
accept.status = accepted_status.id
accept.parent = new_status
new_status.items.append(accept)
reject = ChoiceWorkflowStatusItem()
reject.id = '_reject'
reject.label = _('Reject')
reject.by = ['_receiver']
reject.status = rejected_status.id
reject.parent = new_status
new_status.items.append(reject)
finish = ChoiceWorkflowStatusItem()
finish.id = '_finish'
finish.label = _('Finish')
finish.by = ['_receiver']
finish.status = finished_status.id
finish.parent = accepted_status
accepted_status.items.append(finish)
return workflow
get_default_workflow = classmethod(get_default_workflow)
class WorkflowStatus(object):
id = None
name = None
items = None
visibility = None
forced_endpoint = False
colour = 'FFFFFF'
backoffice_info_text = None
def __init__(self, name = None):
self.name = name
self.items = []
def __eq__(self, other):
if other is None:
return False
# this assumes both status are from the same workflow
if type(other) is str:
other_id = other
else:
other_id = other.id
return self.id == other_id
def migrate(self):
changed = False
for item in self.items:
changed |= item.migrate()
return changed
def append_item(self, type):
for klass in item_classes:
if klass.key == type:
o = klass()
if self.items:
o.id = str(max([lax_int(x.id) for x in self.items]) + 1)
else:
o.id = '1'
self.items.append(o)
break
else:
raise KeyError()
def get_item(self, id):
for item in self.items:
if item.id == id:
return item
raise KeyError()
def perform_items(self, formdata, depth=20):
if depth == 0: # prevents infinite loops
return
url = None
old_status = formdata.status
for item in self.items:
try:
url = item.perform(formdata) or url
except AbortActionException:
break
if formdata.status != old_status:
break
if formdata.status != old_status:
if not formdata.evolution:
formdata.evolution = []
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
formdata.evolution.append(evo)
formdata.store()
# performs the items of the new status
wf_status = formdata.get_status()
url = wf_status.perform_items(formdata, depth=depth-1) or url
return url
def get_action_form(self, filled, user):
form = Form(enctype='multipart/form-data', use_tokens = False)
for item in self.items:
if not item.check_auth(filled, user):
continue
item.fill_form(form, filled, user)
if form.widgets or form.submit_widgets:
return form
else:
return None
def handle_form(self, form, filled, user):
evo = Evolution()
evo.time = time.localtime()
if user:
if filled.is_submitter(user):
evo.who = '_submitter'
else:
evo.who = user.id
if not filled.evolution:
filled.evolution = []
for item in self.items:
if hasattr(item, 'by'):
for role in item.by or []:
if role == logged_users_role().id:
break
if role == '_submitter':
if filled.is_submitter(user):
break
else:
continue
if user is None:
continue
role = get_role_translation(filled, role)
if role in (user.roles or []):
break
else:
continue
next_url = item.submit_form(form, filled, user, evo)
if next_url is True:
break
if next_url:
if not form.has_errors():
filled.evolution.append(evo)
if evo.status:
filled.status = evo.status
filled.store()
return next_url
if form.has_errors():
return
filled.evolution.append(evo)
if evo.status:
filled.status = evo.status
filled.store()
url = filled.perform_workflow()
if url:
return url
def get_subdirectories(self, formdata):
subdirectories = []
for item in self.items:
if item.directory_name:
subdirectories.append((item.directory_name,
item.directory_class(formdata, item, self)))
return subdirectories
def get_visibility_restricted_roles(self):
if not self.visibility: # no restriction -> visible
return []
return self.visibility
def is_visible(self, formdata, user):
if not self.visibility: # no restriction -> visible
return True
if user and user.is_admin:
return True
if user:
user_roles = set(user.roles or [])
user_roles.add(logged_users_role().id)
else:
user_roles = set([])
visibility_roles = self.visibility[:]
for item in self.items or []:
if not hasattr(item, 'by') or not item.by:
continue
visibility_roles.extend(item.by)
for role in visibility_roles:
if role != '_submitter':
role = get_role_translation(formdata, role)
if role in user_roles:
return True
return False
def __getstate__(self):
odict = self.__dict__.copy()
if odict.has_key('parent'):
del odict['parent']
return odict
def export_to_xml(self, charset, include_id=False):
status = ET.Element('status')
ET.SubElement(status, 'id').text = unicode(self.id, charset)
ET.SubElement(status, 'name').text = unicode(self.name, charset)
ET.SubElement(status, 'colour').text = unicode(self.colour, charset)
if self.forced_endpoint:
ET.SubElement(status, 'forced_endpoint').text = 'true'
if self.backoffice_info_text:
ET.SubElement(status, 'backoffice_info_text').text = unicode(
self.backoffice_info_text, charset)
visibility_node = ET.SubElement(status, 'visibility')
for role in self.visibility or []:
ET.SubElement(visibility_node, 'role').text = str(role)
items = ET.SubElement(status, 'items')
for item in self.items:
items.append(item.export_to_xml(charset=charset,
include_id=include_id))
return status
def init_with_xml(self, elem, charset, include_id=False):
self.id = elem.find('id').text.encode(charset)
self.name = elem.find('name').text.encode(charset)
if elem.find('colour') is not None:
self.colour = elem.find('colour').text.encode(charset)
if elem.find('forced_endpoint') is not None:
self.forced_endpoint = (elem.find('forced_endpoint').text == 'true')
if elem.find('backoffice_info_text') is not None:
self.backoffice_info_text = elem.find('backoffice_info_text').text.encode(charset)
self.visibility = []
for visibility_role in elem.findall('visibility/role'):
self.visibility.append(visibility_role.text)
self.items = []
for item in elem.find('items'):
item_type = item.attrib['type']
self.append_item(item_type)
item_o = self.items[-1]
item_o.parent = self
item_o.init_with_xml(item, charset, include_id=include_id)
class WorkflowStatusItem(object):
description = 'XX'
category = None # (key, label)
id = None
endpoint = True # means it's not possible to interact, and/or cause a status change
waitpoint = False # means it's possible to wait (user interaction, or other event)
directory_name = None
directory_class = None
support_substitution_variables = False
def init(cls):
pass
init = classmethod(init)
def is_available(cls):
return True
is_available = classmethod(is_available)
def migrate(self):
changed = False
for roles_attribute in ('to', 'by'):
attribute_value = getattr(self, roles_attribute, []) or []
if any((x for x in attribute_value if type(x) is int)):
setattr(self, roles_attribute, [str(x) for x in attribute_value])
changed = True
return changed
def render_as_line(self):
return _(self.description)
def render_list_of_roles(self, roles):
return self.parent.parent.render_list_of_roles(roles)
def get_list_of_roles(self, include_logged_in_users=True):
return self.parent.parent.get_list_of_roles(include_logged_in_users=include_logged_in_users)
def perform(self, formdata):
pass
def fill_form(self, form, formdata, user):
pass
def submit_form(self, form, formdata, user, evo):
pass
def check_auth(self, formdata, user):
if not hasattr(self, 'by'):
return True
for role in self.by or []:
if user and role == logged_users_role().id:
return True
if role == '_submitter':
t = formdata.is_submitter(user)
if t is True:
return True
continue
if not user:
continue
role = get_role_translation(formdata, role)
if role in (user.roles or []):
return True
return False
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
pass
def get_parameters(self):
return ()
def fill_admin_form(self, form):
self.add_parameters_widgets(form, self.get_parameters())
def submit_admin_form(self, form):
for f in self.get_parameters():
widget = form.get_widget(f)
if widget:
value = widget.parse()
if hasattr(self, '%s_parse' % f):
value = getattr(self, '%s_parse' % f)(value)
setattr(self, f, value)
def compute(self, var, raises=False):
if not isinstance(var, basestring):
return var
if not var.startswith('='):
return var
vars = get_publisher().substitutions.get_context_variables()
try:
return eval(var[1:], get_publisher().get_global_eval_dict(), vars)
except:
if raises:
raise
return var
def get_substitution_variables(self, formdata):
return {}
def get_target_status(self):
"""Returns a list of status this item can lead to."""
return []
def export_to_xml(self, charset, include_id=False):
item = ET.Element('item')
item.attrib['type'] = self.key
for attribute in self.get_parameters():
if hasattr(self, '%s_export_to_xml' % attribute):
getattr(self, '%s_export_to_xml' % attribute)(item, charset,
include_id=include_id)
continue
if hasattr(self, attribute) and getattr(self, attribute) is not None:
el = ET.SubElement(item, attribute)
val = getattr(self, attribute)
if type(val) is dict:
for k, v in val.items():
ET.SubElement(el, k).text = unicode(v, charset, 'replace')
elif type(val) is list:
if attribute[-1] == 's':
atname = attribute[:-1]
else:
atname = 'item'
for v in val:
ET.SubElement(el, atname).text = unicode(str(v), charset, 'replace')
elif type(val) in (str, unicode):
if type(val) is unicode:
el.text = val
else:
el.text = unicode(val, charset, 'replace')
else:
el.text = str(val)
return item
def init_with_xml(self, elem, charset, include_id=False):
for attribute in self.get_parameters():
el = elem.find(attribute)
if hasattr(self, '%s_init_with_xml' % attribute):
getattr(self, '%s_init_with_xml' % attribute)(el, charset,
include_id=include_id)
continue
if el is None:
continue
if el.getchildren():
if type(getattr(self, attribute)) is list:
v = [x.text.encode(charset) for x in el.getchildren()]
elif type(getattr(self, attribute)) is dict:
v = {}
for e in el.getchildren():
v[e.tag] = e.text.encode(charset)
else:
# ???
raise AssertionError
setattr(self, attribute, v)
else:
if el.text is None:
setattr(self, attribute, None)
elif el.text in ('False', 'True'): # bools
setattr(self, attribute, eval(el.text))
elif type(getattr(self, attribute)) is int:
setattr(self, attribute, int(el.text.encode(charset)))
else:
setattr(self, attribute, el.text.encode(charset))
def _roles_export_to_xml(self, attribute, item, charset, include_id=False):
if not hasattr(self, attribute) or not getattr(self, attribute):
return
el = ET.SubElement(item, attribute)
for role_id in getattr(self, attribute):
if role_id is None:
continue
role_id = str(role_id)
if role_id.startswith('_') or role_id == 'logged-users':
role = unicode(role_id, charset)
else:
try:
role = unicode(Role.get(role_id).name, charset)
except KeyError:
role = unicode(role_id, charset)
sub = ET.SubElement(el, 'item')
sub.attrib['role_id'] = role_id
sub.text = role
def _roles_init_with_xml(self, attribute, elem, charset, include_id=False):
if elem is None:
setattr(self, attribute, [])
else:
imported_roles = []
for child in elem.getchildren():
imported_roles.append(self._get_role_id_from_xml(child,
charset, include_id=include_id))
setattr(self, attribute, imported_roles)
def _role_export_to_xml(self, attribute, item, charset, include_id=False):
if not hasattr(self, attribute) or not getattr(self, attribute):
return
role_id = str(getattr(self, attribute))
if role_id.startswith('_') or role_id == 'logged-users':
role = unicode(role_id, charset)
else:
try:
role = unicode(Role.get(role_id).name, charset)
except KeyError:
role = unicode(role_id, charset)
sub = ET.SubElement(item, attribute)
if include_id:
sub.attrib['role_id'] = role_id
sub.text = role
def _get_role_id_from_xml(self, elem, charset, include_id=False):
if elem is None:
return None
value = elem.text.encode(charset)
# look for known static values
if value.startswith('_') or value == 'logged-users':
return value
# if we import using id, only look at the role_id attribute
if include_id:
if not 'role_id' in elem.attrib:
return None
role_id = str(elem.attrib['role_id'])
if Role.has_key(role_id):
return role_id
else:
return None
# if not using id, look up on the name
for role in Role.select(ignore_errors=True):
if role.name == value:
return role.id
# and if there's no match, create a new role
role = Role()
role.name = value
role.store()
return role.id
def _role_init_with_xml(self, attribute, elem, charset, include_id=False):
setattr(self, attribute, self._get_role_id_from_xml(elem, charset,
include_id=include_id))
def by_export_to_xml(self, item, charset, include_id=False):
self._roles_export_to_xml('by', item, charset, include_id=include_id)
def by_init_with_xml(self, elem, charset, include_id=False):
self._roles_init_with_xml('by', elem, charset, include_id=include_id)
def to_export_to_xml(self, item, charset, include_id=False):
self._roles_export_to_xml('to', item, charset, include_id=include_id)
def to_init_with_xml(self, elem, charset, include_id=False):
self._roles_init_with_xml('to', elem, charset, include_id)
def _q_admin_lookup(self, workflow, status, component, html_top):
return None
def __getstate__(self):
odict = self.__dict__.copy()
if odict.has_key('parent'):
del odict['parent']
return odict
class WorkflowStatusJumpItem(WorkflowStatusItem):
status = None
endpoint = False
def get_target_status(self):
if not self.status:
return []
try:
return [x for x in self.parent.parent.possible_status if x.id == self.status]
except IndexError:
get_publisher().get_app_logger().error(
'reference to invalid status in workflow %s, status %s, item %s' % (
self.parent.parent.name,
self.parent.name,
self.description))
return []
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'status' in parameters:
form.add(SingleSelectWidget, '%sstatus' % prefix, title = _('Status'), value = self.status,
options = [(None, '---')] + [(x.id, x.name) for x in self.parent.parent.possible_status])
def get_parameters(self):
return ('status',)
def get_role_translation(formdata, role_name):
if role_name == '_submitter':
raise Exception('_submitter is not a valid role')
elif str(role_name).startswith('_'):
role_id = None
if formdata.workflow_roles:
role_id = formdata.workflow_roles.get(role_name)
if not role_id and formdata.formdef.workflow_roles:
role_id = formdata.formdef.workflow_roles.get(role_name)
if role_id is None:
return role_id
return str(role_id)
else:
return str(role_name)
def get_role_translation_label(workflow, role_id):
if role_id == logged_users_role().id:
return logged_users_role().name
if role_id == '_submitter':
return C_('role|User')
if str(role_id).startswith('_'):
return workflow.roles.get(role_id)
else:
try:
return Role.get(role_id).name
except KeyError:
return
item_classes = []
def register_item_class(klass):
if not klass in item_classes:
item_classes.append(klass)
klass.init()
class CommentableWorkflowStatusItem(WorkflowStatusItem):
description = N_('Allow Comment')
key = 'commentable'
endpoint = False
waitpoint = True
varname = None
label = None
button_label = 0 # hack to handle legacy commentable items
hint = None
by = []
backoffice_info_text = None
def render_as_line(self):
if self.by:
return _('Allow Comment by %s') % self.render_list_of_roles(self.by)
else:
return _('Allow Comment (not completed)')
def fill_form(self, form, formdata, user):
if not 'comment' in [x.name for x in form.widgets]:
if self.label is None:
title = _('Comment')
else:
title = self.label
form.add(TextWidget, 'comment', title=title, required=False,
cols=40, rows=10, hint=self.hint)
form.get_widget('comment').attrs['class'] = 'comment'
if self.button_label == 0:
form.add_submit('button%s' % self.id, _('Add Comment'))
elif self.button_label:
form.add_submit('button%s' % self.id, self.button_label)
if form.get_widget('button%s' % self.id):
form.get_widget('button%s' % self.id).backoffice_info_text = self.backoffice_info_text
def submit_form(self, form, formdata, user, evo):
if form.get_widget('comment'):
evo.comment = form.get_widget('comment').parse()
if self.varname:
workflow_data = {'comment_%s' % self.varname: evo.comment}
formdata.update_workflow_data(workflow_data)
def submit_admin_form(self, form):
for f in self.get_parameters():
widget = form.get_widget(f)
if widget:
setattr(self, f, widget.parse())
def fill_admin_form(self, form):
if self.by and not type(self.by) is list:
self.by = None
self.add_parameters_widgets(form, self.get_parameters())
def get_parameters(self):
return ('label', 'button_label', 'by', 'hint', 'varname',
'backoffice_info_text')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'label' in parameters:
if self.label is None:
self.label = _('Comment')
form.add(StringWidget, '%slabel' % prefix, size=40, title=_('Label'), value=self.label)
if 'button_label' in parameters:
if self.button_label == 0:
self.button_label = _('Add Comment')
form.add(StringWidget, '%sbutton_label' % prefix, title=_('Button Label'),
hint=_('(empty to disable the button)'),
value=self.button_label)
if 'hint' in parameters:
form.add(StringWidget, '%shint' % prefix, size=40, title=_('Hint'), value=self.hint)
if 'by' in parameters:
form.add(WidgetList, '%sby' % prefix, title = _('By'),
element_type=SingleSelectWidget,
value=self.by,
add_element_label=_('Add Role'),
element_kwargs={'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles()})
if 'varname' in parameters:
form.add(VarnameWidget, '%svarname' % prefix,
title=_('Variable Name'), value=self.varname,
hint=_('This will make the comment available in a variable named comment_varname.'))
if 'backoffice_info_text' in parameters:
form.add(WysiwygTextWidget, '%sbackoffice_info_text' % prefix,
title=_('Information Text for Backoffice'),
value=self.backoffice_info_text)
def button_label_export_to_xml(self, xml_item, charset, include_id=False):
if self.button_label == 0:
pass
elif self.button_label is None:
# button_label being None is a special case meaning "no button", it
# should be handled differently than the "not filled" case
el = ET.SubElement(xml_item, 'button_label')
else:
el = ET.SubElement(xml_item, 'button_label')
if type(self.button_label) is unicode:
el.text = self.button_label
elif type(self.button_label) is str:
el.text = unicode(self.button_label, charset, 'replace')
else:
raise AssertionError('unknown type for button_label (%r)' % self.button_label)
def button_label_init_with_xml(self, element, charset, include_id=False):
if element is None:
return
if element.text is None:
# this means element is self-closing, <button_label />, which maps
# to None, meaning "no button".
self.button_label = None
else:
self.button_label = element.text.encode(charset)
register_item_class(CommentableWorkflowStatusItem)
class ChoiceWorkflowStatusItem(WorkflowStatusJumpItem):
description = N_('Change Status')
key = 'choice'
endpoint = False
waitpoint = True
label = None
by = []
backoffice_info_text = None
def render_as_line(self):
if self.label:
if self.by:
return _('Change Status "%(label)s" by %(by)s') % \
{ 'label' : self.label,
'by' : self.render_list_of_roles(self.by) }
else:
return _('Change Status "%s"') % self.label
else:
return _('Change Status (not completed)')
def fill_form(self, form, formdata, user):
form.add_submit('button%s' % self.id, self.label)
form.get_widget('button%s' % self.id).backoffice_info_text = self.backoffice_info_text
def submit_form(self, form, formdata, user, evo):
if form.get_submit() == 'button%s' % self.id:
wf_status = self.get_target_status()
if wf_status:
evo.status = 'wf-%s' % wf_status[0].id
form.clear_errors()
return True # get out of processing loop
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'label' in parameters:
form.add(StringWidget, '%slabel' % prefix, title = _('Label'), value = self.label)
WorkflowStatusJumpItem.add_parameters_widgets(self, form, parameters, prefix, formdef)
if 'by' in parameters:
form.add(WidgetList, '%sby' % prefix, title = _('By'), element_type = SingleSelectWidget,
value = self.by,
add_element_label = _('Add Role'),
element_kwargs={'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles()})
if 'backoffice_info_text' in parameters:
form.add(WysiwygTextWidget, '%sbackoffice_info_text' % prefix,
title=_('Information Text for Backoffice'),
value=self.backoffice_info_text)
def get_parameters(self):
return ('by', 'status', 'label', 'backoffice_info_text')
register_item_class(ChoiceWorkflowStatusItem)
class JumpOnSubmitWorkflowStatusItem(WorkflowStatusJumpItem):
description = N_('Change Status on Submit')
key = 'jumponsubmit'
def render_as_line(self):
if self.status:
if self.get_target_status():
return _('Change Status on Submit (to %s)') % self.get_target_status()[0].name
else:
return _('Change Status on Submit (broken)')
else:
return _('Change Status on Submit (not completed)')
def submit_form(self, form, formdata, user, evo):
if form.is_submitted() and not form.has_errors():
wf_status = self.get_target_status()
if wf_status:
evo.status = 'wf-%s' % wf_status[0].id
def get_parameters(self):
return ('status',)
register_item_class(JumpOnSubmitWorkflowStatusItem)
class SendmailWorkflowStatusItem(WorkflowStatusItem):
description = N_('Send mail')
key = 'sendmail'
support_substitution_variables = True
to = []
subject = None
body = None
comment = None
def render_as_line(self):
if self.to:
return _('Send mail to %s') % self.render_list_of_roles(self.to)
else:
return _('Send mail (not completed)')
def get_parameters(self):
return ('to', 'subject', 'body')
def fill_admin_form(self, form):
self.add_parameters_widgets(form, self.get_parameters())
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'to' in parameters:
form.add(WidgetList, '%sto' % prefix, title=_('To'),
element_type=SingleSelectWidget,
value=self.to,
add_element_label=_('Add Role'),
element_kwargs={'render_br': False,
'options': [(None, '---', None)] +
self.get_list_of_roles(include_logged_in_users=False)})
if 'subject' in parameters:
form.add(StringWidget, '%ssubject' % prefix, title=_('Subject'),
value=self.subject, size=40)
if 'body' in parameters:
form.add(TextWidget, '%sbody' % prefix, title=_('Body'),
value=self.body, cols=80, rows=10,
hint=_('Available variables: url, url_status, details, name, number, comment, field_NAME'))
def perform(self, formdata):
if not self.to:
return
if not self.subject:
return
if not self.body:
return
url = formdata.get_url()
try:
mail_body = template_on_formdata(formdata, self.compute(self.body))
except ezt.EZTException:
get_logger().error('error in template for email body [%s], mail could not be generated' % url)
return
try:
mail_subject = template_on_formdata(formdata, self.compute(self.subject))
except ezt.EZTException:
get_logger().error('error in template for email subject [%s], mail could not be generated' % url)
return
users_cfg = get_cfg('users', {})
# this works around the fact that parametric workflows only support
# string values, so if we get set a string, we convert it here to an
# array.
if isinstance(self.to, basestring):
self.to = [self.to]
addresses = []
for dest in self.to:
dest = self.compute(dest)
if '@' in str(dest):
addresses.append(dest)
continue
if dest == '_submitter':
submitter_email = formdata.formdef.get_submitter_email(formdata)
if submitter_email:
addresses.append(submitter_email)
continue
dest = get_role_translation(formdata, dest)
try:
role = Role.get(dest)
except KeyError:
continue
addresses.extend(role.get_emails())
if not addresses:
return
if len(addresses) > 1:
emails.email(mail_subject, mail_body, email_rcpt=None,
bcc=addresses,
exclude_current_user=False,
fire_and_forget=True)
else:
emails.email(mail_subject, mail_body, email_rcpt=addresses,
exclude_current_user=False,
fire_and_forget=True)
register_item_class(SendmailWorkflowStatusItem)
def template_on_html_string(template, process=None):
return template_on_formdata(None, template, process=process,
base_format=ezt.FORMAT_HTML)
def template_on_formdata(formdata=None, template=None, process=None,
base_format=ezt.FORMAT_RAW):
assert template is not None
if not '[' in template:
return template
dict = {}
dict.update(get_publisher().substitutions.get_context_variables())
if formdata:
dict['url'] = formdata.get_url()
dict['url_status'] = '%sstatus' % formdata.get_url()
dict['details'] = formdata.formdef.get_detailed_email_form(formdata, dict['url'])
dict['name'] = formdata.formdef.name
dict['number'] = formdata.id
if formdata.evolution and formdata.evolution[-1].comment:
dict['comment'] = formdata.evolution[-1].comment
else:
dict['comment'] = ''
dict.update(formdata.get_as_dict())
# compatibility vars
dict['before'] = dict.get('form_previous_status')
dict['after'] = dict.get('form_status')
dict['evolution'] = dict.get('form_evolution')
if process:
for key in dict:
dict[key] = process(dict[key])
charset = get_publisher().site_charset
for k, v in dict.items():
if isinstance(v, unicode):
dict[k] = v.encode(charset, 'ignore')
processor = ezt.Template(compress_whitespace=False)
processor.parse(template or '', base_format=base_format)
fd = StringIO()
processor.generate(fd, dict)
return fd.getvalue()
class SendSMSWorkflowStatusItem(WorkflowStatusItem):
description = N_('Send SMS')
key = 'sendsms'
support_substitution_variables = True
to = []
body = None
def render_as_line(self):
return _('Send SMS')
def fill_admin_form(self, form):
self.add_parameters_widgets(form, self.get_parameters())
def get_parameters(self):
return ('to', 'body')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'to' in parameters:
form.add(WidgetList, '%sto' % prefix, title=_('To'), element_type=StringWidget,
value=self.to, add_element_label=_('Add Number'),
element_kwargs = {'render_br': False})
if 'body' in parameters:
form.add(TextWidget, '%sbody' % prefix, title=_('Body'), value=self.body, cols=80, rows=10,
hint = _('Available variables: url, url_status, details, name, number, comment, field_NAME'))
def perform(self, formdata):
if not self.to:
return
if not self.body:
return
destinations = [self.compute(x) for x in self.to]
destinations = [x for x in destinations if x] # ignore empty elements
if not destinations:
return
try:
sms_body = template_on_formdata(formdata, self.compute(self.body))
except ezt.EZTException:
url = formdata.get_url()
get_logger().error('error in template for sms [%s], sms could not be generated' % url)
return
import qommon.sms
sms_cfg = get_cfg('sms', {})
sender = sms_cfg.get('sender', 'AuQuotidien')[:11]
mode = sms_cfg.get('mode', 'none')
try:
qommon.sms.SMS.get_sms_class(mode).send(sender, destinations, sms_body[:160])
except qommon.errors.SMSError, e:
get_logger().error(e)
register_item_class(SendSMSWorkflowStatusItem)
class DisplayMessageWorkflowStatusItem(WorkflowStatusItem):
description = N_('Display message')
key = 'displaymsg'
support_substitution_variables = True
message = None
def get_message(self, filled):
if not self.message:
return ''
tmpl = ezt.Template()
tmpl.parse(self.message, base_format=ezt.FORMAT_HTML)
dict = {}
dict.update(get_publisher().substitutions.get_context_variables())
dict['date'] = misc.localstrftime(filled.receipt_time)
dict['number'] = filled.id
handling_role = filled.get_handling_role()
if handling_role and handling_role.details:
dict['receiver'] = handling_role.details.replace('\n', '<br />')
fd = StringIO()
tmpl.generate(fd, dict)
msg = fd.getvalue()
return msg
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'message' in parameters:
form.add(TextWidget, '%smessage' % prefix, title = _('Message'),
value = self.message, cols = 80, rows = 10)
def get_parameters(self):
return ('message',)
register_item_class(DisplayMessageWorkflowStatusItem)
class RedirectToStatusWorkflowStatusItem(WorkflowStatusItem):
description = N_('Redirect to Status Page')
key = 'redirectstatus'
backoffice = False
def perform(self, formdata):
return formdata.get_url(self.backoffice)
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'backoffice' in parameters:
form.add(CheckboxWidget, '%sbackoffice' % prefix,
title = _('Redirect to backoffice page'),
value = self.backoffice)
def get_parameters(self):
return ('backoffice',)
# RedirectToStatusWorkflowStatusItem is not registered as the class kept for
# backward compatibility only and should not be exposed to the user. (#3031)
class EditableWorkflowStatusItem(WorkflowStatusItem):
description = N_('Allow Edition')
key = 'editable'
endpoint = False
waitpoint = True
by = []
status = None
label = None
backoffice_info_text = None
def render_as_line(self):
if self.by:
return _('Allow Edition by %s') % self.render_list_of_roles(self.by)
else:
return _('Allow Edition (not completed)')
def fill_form(self, form, formdata, user):
label = self.label
if not label:
label = _('Edit Form')
form.add_submit('button%s' % self.id, label)
form.get_widget('button%s' % self.id).backoffice_info_text = self.backoffice_info_text
def submit_form(self, form, formdata, user, evo):
if form.get_submit() == 'button%s' % self.id:
return formdata.get_url() + 'wfedit'
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'by' in parameters:
form.add(WidgetList, '%sby' % prefix, title = _('By'), element_type = SingleSelectWidget,
value = self.by,
add_element_label = _('Add Role'),
element_kwargs={'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles()})
if 'status' in parameters:
# XXX: look into this one, as None is a perfectly valid value, and
# it would put this question in the 'workflow options' part.
form.add(SingleSelectWidget, '%sstatus' % prefix, title = _('Status After Edit'), value = self.status,
hint = _("Don't select any if you don't want status change processing"),
options = [(None, '---')] + [(x.id, x.name) for x in self.parent.parent.possible_status])
if 'label' in parameters:
form.add(StringWidget, '%slabel' % prefix, title = _('Button Label'), value = self.label)
if 'backoffice_info_text' in parameters:
form.add(WysiwygTextWidget, '%sbackoffice_info_text' % prefix,
title=_('Information Text for Backoffice'),
value=self.backoffice_info_text)
def get_parameters(self):
return ('by', 'status', 'label', 'backoffice_info_text')
register_item_class(EditableWorkflowStatusItem)
class ExportToModelDirectory(Directory):
_q_exports = [ '' ]
def __init__(self, formdata, wfstatusitem, wfstatus):
self.formdata = formdata
self.wfstatusitem = wfstatusitem
def _q_index(self):
if not self.wfstatusitem.model_file:
raise TemplatingError(_('No model defined for this action'))
response = get_response()
response.content_type = self.wfstatusitem.model_file.content_type
response.set_header('location', '..')
if response.content_type != 'text/html':
response.set_header('content-disposition',
'attachment; filename="%s"' %
self.wfstatusitem.model_file.base_filename)
return self.wfstatusitem.apply_template_to_formdata(self.formdata)
def char2rtf(c):
if ord(c) < 128:
return c
else:
return '\\u%d?' % ord(c)
def str2rtf(s):
s = ''.join([ char2rtf(c) for c in s])
return '{\\uc1{%s}}' % s
def rtf_process(value):
if value is None:
return None
return str2rtf(unicode(str(value), get_publisher().site_charset))
class ExportToModel(WorkflowStatusItem):
description = N_('Create Document')
key = 'export_to_model'
support_substitution_variables = True
endpoint = False
waitpoint = True
label = None
model_file = None
attach_to_history = False
directory_class = ExportToModelDirectory
by = ['_receiver']
backoffice_info_text = None
def render_as_line(self):
if self.label:
if self.model_file:
model = _('with model named %(file_name)s of %(size)s bytes') % {
'file_name': self.model_file.base_filename,
'size': self.model_file.size }
else:
model = _('no model set')
return _('Create document, labeled %(label)s, %(model)s') % {
'label': self.label,
'model': model }
else:
return _('Create document (not completed)')
def fill_form(self, form, formdata, user):
label = self.label
if not label:
label = _('Create Document')
form.add_submit('button%s' % self.id, label)
form.get_widget('button%s' % self.id).backoffice_info_text = self.backoffice_info_text
def submit_form(self, form, formdata, user, evo):
if form.get_submit() == 'button%s' % self.id:
if not evo.comment:
evo.comment = _('Form exported in a model')
in_backoffice = get_request() and get_request().is_in_backoffice()
if self.attach_to_history:
evo.add_part(AttachmentEvolutionPart(
self.model_file.base_filename,
StringIO(self.apply_template_to_formdata(formdata)),
content_type=self.model_file.content_type))
return formdata.get_url(backoffice=in_backoffice)
return formdata.get_url(backoffice=in_backoffice) + self.get_directory_name()
def model_file_validation(self, upload):
if upload.content_type and upload.content_type == 'application/rtf':
return True, ''
if (upload.content_type and upload.content_type == 'application/octet-stream') or \
upload.content_type is None:
if upload.base_filename and upload.base_filename.endswith('.rtf'):
return True, ''
upload.fp.seek(0)
if upload.read(10).startswith('{\\rtf'):
upload.fp.seek(0)
return True, ''
return False, _('Only RTF files can be used')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'by' in parameters:
form.add(WidgetList, '%sby' % prefix, title = _('By'), element_type = SingleSelectWidget,
value = self.by,
add_element_label = _('Add Role'),
element_kwargs={'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles()})
if 'attach_to_history' in parameters:
form.add(CheckboxWidget, '%sattach_to_history' % prefix,
title=_('Attach generated file to the form history'),
value=self.attach_to_history)
if 'label' in parameters:
form.add(StringWidget, '%slabel' % prefix, title = _('Button Label'), value = self.label)
if 'model_file' in parameters:
ids = (self.parent.parent.id, self.parent.id, self.id)
if formdef:
hint = htmltext('%s: <ul class="varnames">') % _('Available variables')
varnames = get_varnames(formdef.fields)
for pair in varnames:
hint += htmltext('<li><tt class="varname">[%s]</tt> <label>%s</label></span></li>') % pair
hint += htmltext('</ul>')
ids = (formdef.id,) + ids
filename = 'export_to_model-%s-%s-%s-%s.upload' % ids
else:
hint = _('You can use variables in your model using '
'the [variable] syntax, available variables depends on the form.')
filename = 'export_to_model-%s-%s-%s.upload' % ids
widget_name = '%smodel_file' % prefix
if formdef and formdef.workflow_options and \
formdef.workflow_options.get(widget_name) is not None:
value = formdef.workflow_options.get(widget_name)
else:
value = self.model_file
if value:
hint = htmltext('<div>%s: <a href="?file=%s">%s</a></div>') % \
(_('Current value'), widget_name, value.base_filename) + hint
form.add(UploadWidget, widget_name, directory='models',
filename=filename, title=_('Model'), hint=hint,
validation=self.model_file_validation, value=value)
if 'backoffice_info_text' in parameters:
form.add(WysiwygTextWidget, '%sbackoffice_info_text' % prefix,
title=_('Information Text for Backoffice'),
value=self.backoffice_info_text)
def get_directory_name(self):
return qommon.misc.simplify(self.label or 'export_to_model', space='_')
directory_name = property(get_directory_name)
def apply_template_to_formdata(self, formdata):
process = None
if self.model_file.base_filename.endswith('.rtf'):
process = rtf_process
try:
return template_on_formdata(formdata, self.model_file.get_file().read(),
process=process)
except ezt.UnknownReference, e:
url = formdata.get_url()
get_logger().error('error in template for export to model [%s], unknown reference %s' % (url, str(e)))
raise TemplatingError(_('Error in the template, reference %s is unknown') % str(e))
except ezt.EZTException, e:
url = formdata.get_url()
get_logger().error('error in template for export to model [%s], model could not be generated' % url)
raise TemplatingError(_('Unknown error in the template: %s') % str(e))
def get_parameters(self):
return ('by', 'label', 'model_file', 'attach_to_history', 'backoffice_info_text')
def model_file_export_to_xml(self, xml_item, charset, include_id=False):
if not self.model_file:
return
el = ET.SubElement(xml_item, 'model_file')
ET.SubElement(el, 'base_filename').text = self.model_file.base_filename
ET.SubElement(el, 'content_type').text = self.model_file.content_type
ET.SubElement(el, 'content').text = self.model_file.get_file().read()
def model_file_init_with_xml(self, elem, charset, include_id=False):
if elem is None:
return
base_filename = elem.find('base_filename').text
content_type = elem.find('content_type').text
content = elem.find('content').text
ids = (self.parent.parent.id, self.parent.id, self.id)
filename = 'export_to_model-%s-%s-%s.upload' % ids
upload = Upload(base_filename, content_type)
upload.fp = StringIO()
upload.fp.write(content)
upload.fp.seek(0)
self.model_file = UploadedFile('models', filename, upload)
register_item_class(ExportToModel)
def load_extra():
import wf.aggregation_email
import wf.timeout_jump
import wf.jump
import wf.attachment
import wf.remove
import wf.roles
import wf.dispatch
import wf.wscall
import wf.form
import wf.register_comment
import wf.anonymise