wcs/wcs/workflows.py

1334 lines
47 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from qommon import ezt
from cStringIO import StringIO
try:
import elementtree.ElementTree as ET
except ImportError:
try:
import xml.etree.ElementTree as ET
except ImportError:
ET = None
from quixote import get_request
import qommon.misc
from qommon.storage import StorableObject
from qommon.form import *
from qommon import emails, get_cfg, get_logger
from quixote.directory import Directory
from quixote.html import htmltext
import qommon.errors
from wcs.roles import Role, logged_users_role, get_user_roles
from wcs.formdata import Evolution
from wcs.fields import SubtitleField, TitleField, CommentField, PageField
if not __name__.startswith('wcs.') and not __name__ == "__main__":
raise ImportError('Import of workflows module must be absolute (import wcs.workflows)')
def lax_int(s):
try:
return int(s)
except (ValueError, TypeError):
return -1
class TemplatingError(qommon.errors.PublishError):
def __init__(self, description):
self.title = _('Templating Error')
self.description = description
class DuplicateStatusNameError(Exception):
pass
def get_varnames(fields):
'''Extract variable names for helping people fill their templates.
Prefer to variable name to the numeric field name.
'''
varnames = []
for field in fields:
if isinstance(field, (SubtitleField, TitleField, CommentField, PageField)):
continue
# add it as f$n$
label = field.label
if field.varname:
varnames.append(('var_%s' % field.varname, label))
else:
varnames.append(('f%s' % field.id, label))
return varnames
class Workflow(StorableObject):
_names = 'workflows'
name = None
details = None
possible_status = None
def __init__(self, name = None):
StorableObject.__init__(self)
self.name = name
self.possible_status = []
def get(cls, id, ignore_errors=False, ignore_migration=False):
if id == '_default':
return cls.get_default_workflow()
return super(Workflow, cls).get (id, ignore_errors=ignore_errors,
ignore_migration=ignore_migration)
get = classmethod(get)
def add_status(self, name, id=None):
if [x for x in self.possible_status if x.name == name]:
raise DuplicateStatusNameError()
status = WorkflowStatus(name)
status.parent = self
if id is None:
if self.possible_status:
status.id = str(max([lax_int(x.id) for x in self.possible_status]) + 1)
else:
status.id = '1'
else:
status.id = id
self.possible_status.append(status)
return status
def get_status(self, id):
for status in self.possible_status:
if status.id == id:
return status
raise KeyError()
def __setstate__(self, dict):
self.__dict__.update(dict)
for s in self.possible_status:
s.parent = self
for i, item in enumerate(s.items):
item.parent = s
if not item.id:
item.id = '%d' % (i+1)
def get_waitpoint_status(self):
# a waitpoint status is a status waiting for an event (be it user
# interaction or something else), but can also be an endpoint (where
# the user would wait, infinitely).
waitpoint_status = []
for status in self.possible_status:
waitpoint = False
endpoint = True
if status.forced_endpoint:
endpoint = True
else:
for item in status.items:
endpoint = item.endpoint and endpoint
waitpoint = item.waitpoint or waitpoint
if endpoint or waitpoint:
waitpoint_status.append(status)
return waitpoint_status
def get_endpoint_status(self):
not_endpoint_status = self.get_not_endpoint_status()
endpoint_status = [x for x in self.possible_status if x not in not_endpoint_status]
return endpoint_status
def get_not_endpoint_status(self):
not_endpoint_status = []
for status in self.possible_status:
if status.forced_endpoint:
continue
endpoint = True
for item in status.items:
endpoint = item.endpoint and endpoint
if endpoint is False:
not_endpoint_status.append(status)
break
return not_endpoint_status
def has_options(self):
for status in self.possible_status:
for item in status.items:
for parameter in item.get_parameters():
if not getattr(item, parameter):
return True
return False
def remove_self(self):
from formdef import FormDef
for form in FormDef.select(lambda x: x.workflow_id == self.id):
form.workflow_id = None
form.store()
StorableObject.remove_self(self)
def export_to_xml(self):
charset = get_publisher().site_charset
root = ET.Element('workflow')
ET.SubElement(root, 'name').text = unicode(self.name, charset)
possible_status = ET.SubElement(root, 'possible_status')
for status in self.possible_status:
possible_status.append(status.export_to_xml(charset=charset))
return root
def import_from_xml(cls, fd):
try:
tree = ET.parse(fd)
except:
raise ValueError()
return cls.import_from_xml_tree(tree)
import_from_xml = classmethod(import_from_xml)
def import_from_xml_tree(cls, tree):
charset = get_publisher().site_charset
workflow = cls()
if tree.find('name') is None or not tree.find('name').text:
raise ValueError()
workflow.name = tree.find('name').text.encode(charset)
workflow.possible_status = []
for status in tree.find('possible_status'):
status_o = WorkflowStatus()
status_o.init_with_xml(status, charset)
workflow.possible_status.append(status_o)
return workflow
import_from_xml_tree = classmethod(import_from_xml_tree)
def get_unknown_workflow(cls):
workflow = Workflow(name=_('Unknown'))
workflow.id = '_unknown'
return workflow
get_unknown_workflow = classmethod(get_unknown_workflow)
def get_default_workflow(cls):
from qommon.admin.emails import EmailsDirectory
workflow = Workflow(name=_('Default'))
workflow.id = '_default'
just_submitted_status = workflow.add_status(_('Just Submitted'), 'just_submitted')
just_submitted_status.visibility = ['_receiver']
new_status = workflow.add_status(_('New'), 'new')
handled_status = workflow.add_status(_('Handled'), 'handled')
rejected_status = workflow.add_status(_('Rejected'), 'rejected')
accepted_status = workflow.add_status(_('Accepted'), 'accepted')
finished_status = workflow.add_status(_('Finished'), 'finished')
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_receiver']
commentable.button_label = _('Add Comment')
commentable_all = CommentableWorkflowStatusItem()
commentable_all.id = '_commentable_all'
commentable_all.by = ['_submitter', '_receiver']
commentable_all.button_label = _('Add Comment')
import wf.jump
jump_to_new = wf.jump.JumpWorkflowStatusItem()
jump_to_new.id = '_jump_to_new'
jump_to_new.status = new_status.id
jump_to_new.parent = just_submitted_status
jump_to_handled = JumpOnSubmitWorkflowStatusItem()
jump_to_handled.id = '_jump_to_handled'
jump_to_handled.status = handled_status.id
jump_to_handled.parent = new_status
notify_new_receiver_email = SendmailWorkflowStatusItem()
notify_new_receiver_email.id = '_notify_new_receiver_email'
notify_new_receiver_email.to = ['_receiver']
notify_new_receiver_email.subject = EmailsDirectory.get_subject('new_receiver')
notify_new_receiver_email.body = EmailsDirectory.get_body('new_receiver')
if not EmailsDirectory.is_enabled('new_receiver'):
notify_new_receiver_email = None
notify_new_user_email = SendmailWorkflowStatusItem()
notify_new_user_email.id = '_notify_new_user_email'
notify_new_user_email.to = ['_submitter']
notify_new_user_email.subject = EmailsDirectory.get_subject('new_user')
notify_new_user_email.body = EmailsDirectory.get_body('new_user')
if not EmailsDirectory.is_enabled('new_user'):
notify_change_user_email = None
notify_change_receiver_email = SendmailWorkflowStatusItem()
notify_change_receiver_email.id = '_notify_change_receiver_email'
notify_change_receiver_email.to = ['_receiver']
notify_change_receiver_email.subject = EmailsDirectory.get_subject('change_receiver')
notify_change_receiver_email.body = EmailsDirectory.get_body('change_receiver')
if not EmailsDirectory.is_enabled('change_receiver'):
notify_change_receiver_email = None
notify_change_user_email = SendmailWorkflowStatusItem()
notify_change_user_email.id = '_notify_change_user_email'
notify_change_user_email.to = ['_submitter']
notify_change_user_email.subject = EmailsDirectory.get_subject('change_user')
notify_change_user_email.body = EmailsDirectory.get_body('change_user')
if not EmailsDirectory.is_enabled('change_user'):
notify_change_user_email = None
if notify_new_receiver_email:
just_submitted_status.items.append(notify_new_receiver_email)
if notify_new_user_email:
just_submitted_status.items.append(notify_new_user_email)
just_submitted_status.items.append(jump_to_new)
if notify_change_receiver_email:
accepted_status.items.append(notify_change_receiver_email)
handled_status.items.append(notify_change_receiver_email)
rejected_status.items.append(notify_change_receiver_email)
finished_status.items.append(notify_change_receiver_email)
if notify_change_user_email:
accepted_status.items.append(notify_change_user_email)
handled_status.items.append(notify_change_user_email)
rejected_status.items.append(notify_change_user_email)
finished_status.items.append(notify_change_user_email)
new_status.items.append(commentable)
accepted_status.items.append(commentable_all)
handled_status.items.append(commentable_all)
accept = ChoiceWorkflowStatusItem()
accept.id = '_accept'
accept.label = _('Accept')
accept.by = ['_receiver']
accept.status = accepted_status.id
accept.parent = new_status
new_status.items.append(accept)
handled_status.items.append(accept)
reject = ChoiceWorkflowStatusItem()
reject.id = '_reject'
reject.label = _('Reject')
reject.by = ['_receiver']
reject.status = rejected_status.id
reject.parent = new_status
new_status.items.append(reject)
handled_status.items.append(reject)
new_status.items.append(jump_to_handled)
finish = ChoiceWorkflowStatusItem()
finish.id = '_finish'
finish.label = _('Finish')
finish.by = ['_receiver']
finish.status = finished_status.id
finish.parent = accepted_status
accepted_status.items.append(finish)
return workflow
get_default_workflow = classmethod(get_default_workflow)
class WorkflowStatus:
id = None
name = None
items = None
visibility = None
forced_endpoint = False
def __init__(self, name = None):
self.name = name
self.items = []
def __eq__(self, other):
if other is None:
return False
# this assumes both status are from the same workflow
return self.id == other.id
def append_item(self, type):
for klass in item_classes:
if klass.key == type:
o = klass()
if self.items:
o.id = str(max([lax_int(x.id) for x in self.items]) + 1)
else:
o.id = '1'
self.items.append(o)
break
else:
raise KeyError()
def get_item(self, id):
for item in self.items:
if item.id == id:
return item
raise KeyError()
def perform_items(self, formdata, depth=20):
if depth == 0: # prevents infinite loops
return
url = None
old_status = formdata.status
for item in self.items:
url = item.perform(formdata) or url
if formdata.status != old_status:
break
if formdata.status != old_status:
if not formdata.evolution:
formdata.evolution = []
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
formdata.evolution.append(evo)
formdata.store()
# performs the items of the new status
wf_status = formdata.get_workflow_status()
url = wf_status.perform_items(formdata, depth=depth-1) or url
return url
def get_action_form(self, filled, user):
form = Form(enctype='multipart/form-data', use_tokens = False)
for item in self.items:
if not item.check_auth(filled, user):
continue
item.fill_form(form, filled, user)
if form.widgets or form.submit_widgets:
return form
else:
return None
def handle_form(self, form, filled, user):
#form = self.get_action_form(filled, user)
evo = Evolution()
evo.time = time.localtime()
if user:
if filled.is_submitter(user):
evo.who = '_submitter'
else:
evo.who = user.id
if not filled.evolution:
filled.evolution = []
for item in self.items:
if hasattr(item, 'by'):
for role in item.by or []:
if role == logged_users_role().id:
break
if role == '_submitter':
if filled.is_submitter(user):
break
else:
continue
role = get_role_translation(filled.formdef, role)
if role in (user.roles or []):
break
else:
continue
next_url = item.submit_form(form, filled, user, evo)
if next_url is True:
break
if next_url:
if not form.has_errors():
filled.evolution.append(evo)
if evo.status:
filled.status = evo.status
filled.store()
return next_url
if form.has_errors():
return
filled.evolution.append(evo)
if evo.status:
filled.status = evo.status
filled.store()
url = filled.perform_workflow()
if url:
return url
def get_subdirectories(self, formdata):
subdirectories = []
for item in self.items:
if item.directory_name:
subdirectories.append((item.directory_name,
item.directory_class(formdata, item, self)))
return subdirectories
def is_visible(self, formdata, user):
if not self.visibility: # no restriction -> visible
return True
if user and user.is_admin:
return True
if user:
user_roles = set(user.roles or [])
user_roles.add(logged_users_role().id)
else:
user_roles = set([])
for role in self.visibility:
if role == '_receiver':
role = formdata.formdef.receiver_id
if role in user_roles:
return True
return False
def __getstate__(self):
odict = self.__dict__.copy()
if odict.has_key('parent'):
del odict['parent']
return odict
def export_to_xml(self, charset):
status = ET.Element('status')
ET.SubElement(status, 'id').text = unicode(self.id, charset)
ET.SubElement(status, 'name').text = unicode(self.name, charset)
items = ET.SubElement(status, 'items')
for item in self.items:
items.append(item.export_to_xml(charset=charset))
return status
def init_with_xml(self, elem, charset, include_id=False):
self.id = elem.find('id').text.encode(charset)
self.name = elem.find('name').text.encode(charset)
self.items = []
for item in elem.find('items'):
item_type = item.attrib['type']
self.append_item(item_type)
item_o = self.items[-1]
item_o.init_with_xml(item, charset)
class WorkflowStatusItem:
description = 'XX'
id = None
endpoint = True # means it's not possible to interact, and/or cause a status change
waitpoint = False # means it's possible to wait (user interaction, or other event)
directory_name = None
directory_class = None
support_substitution_variables = False
def init(cls):
pass
init = classmethod(init)
def is_available(cls):
return True
is_available = classmethod(is_available)
def render_as_line(self):
return _(self.description)
def perform(self, formdata):
pass
def fill_form(self, form, formdata, user):
pass
def submit_form(self, form, formdata, user, evo):
pass
def check_auth(self, formdata, user):
if not hasattr(self, 'by'):
return True
for role in self.by or []:
if user and role == logged_users_role().id:
return True
if not user:
continue
if role == '_submitter':
t = formdata.is_submitter(user)
if t is True:
return True
continue
role = get_role_translation(formdata.formdef, role)
if role in (user.roles or []):
return True
return False
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
pass
def get_parameters(self):
return ()
def fill_admin_form(self, form):
self.add_parameters_widgets(form, self.get_parameters())
def submit_admin_form(self, form):
for f in self.get_parameters():
widget = form.get_widget(f)
if widget:
value = widget.parse()
if hasattr(self, '%s_parse' % f):
value = getattr(self, '%s_parse' % f)(value)
setattr(self, f, value)
def compute(self, var):
if not isinstance(var, basestring):
return var
if not var.startswith('='):
return var
vars = get_publisher().substitutions.get_context_variables()
try:
return eval(var[1:], get_publisher().get_global_eval_dict(), vars)
except:
return var
def get_substitution_variables(self, formdata):
return {}
def export_to_xml(self, charset, include_id=False):
item = ET.Element('item')
item.attrib['type'] = self.key
for attribute in self.get_parameters():
if hasattr(self, attribute) and getattr(self, attribute) is not None:
el = ET.SubElement(item, attribute)
val = getattr(self, attribute)
if type(val) is dict:
for k, v in val.items():
ET.SubElement(el, k).text = unicode(v, charset, 'replace')
elif type(val) is list:
if attribute[-1] == 's':
atname = attribute[:-1]
else:
atname = 'item'
for v in val:
ET.SubElement(el, atname).text = unicode(str(v), charset, 'replace')
elif type(val) in (str, unicode):
if type(val) is unicode:
el.text = val
else:
el.text = unicode(val, charset, 'replace')
else:
el.text = str(val)
return item
def init_with_xml(self, elem, charset):
for attribute in self.get_parameters():
el = elem.find(attribute)
if el is None:
continue
if el.getchildren():
if type(getattr(self, attribute)) is list:
v = [x.text.encode(charset) for x in el.getchildren()]
elif type(getattr(self, attribute)) is dict:
v = {}
for e in el.getchildren():
v[e.tag] = e.text.encode(charset)
else:
# ???
raise AssertionError
setattr(self, attribute, v)
else:
if el.text is None:
setattr(self, attribute, None)
elif el.text in ('False', 'True'): # bools
setattr(self, attribute, eval(el.text))
elif type(getattr(self, attribute)) is int:
setattr(self, attribute, int(el.text.encode(charset)))
else:
setattr(self, attribute, el.text.encode(charset))
def __getstate__(self):
odict = self.__dict__.copy()
if odict.has_key('parent'):
del odict['parent']
return odict
class WorkflowStatusJumpItem(WorkflowStatusItem):
status = None
endpoint = False
def get_status(self):
if not self.status:
return None
try:
return [x for x in self.parent.parent.possible_status if x.id == self.status][0]
except IndexError:
get_publisher().get_app_logger().error(
'reference to invalid status in workflow %s, status %s, item %s' % (
self.parent.parent.name,
self.parent.name,
self.description))
return None
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'status' in parameters:
form.add(SingleSelectWidget, '%sstatus' % prefix, title = _('Status'), value = self.status,
options = [(None, '---')] + [(x.id, x.name) for x in self.parent.parent.possible_status])
def get_parameters(self):
return ('status',)
def get_role_translation(formdef, role_name):
if role_name == '_receiver':
return formdef.receiver_id
elif role_name == '_submitter':
raise Exception('_submitter is not a valid role')
else:
return role_name
def render_list_of_roles(roles):
t = []
for r in roles:
if r == logged_users_role().id:
t.append(logged_users_role().name)
elif r == '_submitter':
t.append(_('sender'))
elif r == '_receiver':
t.append(_('receiver'))
else:
try:
t.append(Role.get(r).name)
except KeyError:
pass
return ', '.join(t)
item_classes = []
def register_item_class(klass):
if not klass in item_classes:
item_classes.append(klass)
klass.init()
class CommentableWorkflowStatusItem(WorkflowStatusItem):
description = N_('Allow Comment')
key = 'commentable'
endpoint = False
waitpoint = True
label = None
button_label = 0 # hack to handle legacy commentable items
hint = None
by = []
def render_as_line(self):
if self.by:
return _('Allow Comment by %s') % render_list_of_roles(self.by)
else:
return _('Allow Comment (not completed)')
def fill_form(self, form, formdata, user):
if not 'comment' in [x.name for x in form.widgets]:
if self.label is None:
title = _('Comment')
else:
title = self.label
form.add(TextWidget, 'comment', title=title, required=False,
cols=40, rows=10, hint=self.hint)
form.get_widget('comment').attrs['class'] = 'comment'
if self.button_label == 0:
form.add_submit('button%s' % self.id, _('Add Comment'))
elif self.button_label:
form.add_submit('button%s' % self.id, self.button_label)
def submit_form(self, form, formdata, user, evo):
if form.get_widget('comment'):
evo.comment = form.get_widget('comment').parse()
def submit_admin_form(self, form):
for f in self.get_parameters():
widget = form.get_widget(f)
if widget:
setattr(self, f, widget.parse())
def fill_admin_form(self, form):
if self.by and not type(self.by) is list:
self.by = None
self.add_parameters_widgets(form, self.get_parameters())
def get_parameters(self):
return ('label', 'button_label', 'by', 'hint')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'label' in parameters:
if self.label is None:
self.label = _('Comment')
form.add(StringWidget, '%slabel' % prefix, size=40, title=_('Label'), value=self.label)
if 'button_label' in parameters:
if self.button_label == 0:
self.button_label = _('Add Comment')
form.add(StringWidget, '%sbutton_label' % prefix, title=_('Button Label'),
hint=_('(empty to disable the button)'),
value=self.button_label)
if 'hint' in parameters:
form.add(StringWidget, '%shint' % prefix, size=40, title=_('Hint'), value=self.hint)
if 'by' in parameters:
form.add(WidgetList, '%sby' % prefix, title = _('By'),
element_type=SingleSelectWidget,
value=self.by,
add_element_label=_('Add Role'),
element_kwargs={'render_br': False,
'options': [(None, '---'),
('_submitter', _('Sender')),
('_receiver', _('Receiver')),
(logged_users_role().id, logged_users_role().name),
(None, '----')] + get_user_roles()})
register_item_class(CommentableWorkflowStatusItem)
class ChoiceWorkflowStatusItem(WorkflowStatusJumpItem):
description = N_('Change Status')
key = 'choice'
endpoint = False
waitpoint = True
label = None
by = []
def render_as_line(self):
if self.label:
if self.by:
return _('Change Status "%(label)s" by %(by)s') % \
{ 'label' : self.label,
'by' : render_list_of_roles(self.by) }
else:
return _('Change Status "%s"') % self.label
else:
return _('Change Status (not completed)')
def fill_form(self, form, formdata, user):
form.add_submit('button%s' % self.id, self.label)
def submit_form(self, form, formdata, user, evo):
if form.get_submit() == 'button%s' % self.id:
wf_status = self.get_status()
if wf_status:
evo.status = 'wf-%s' % wf_status.id
form.clear_errors()
return True # get out of processing loop
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'label' in parameters:
form.add(StringWidget, '%slabel' % prefix, title = _('Label'), value = self.label)
WorkflowStatusJumpItem.add_parameters_widgets(self, form, parameters, prefix, formdef)
if 'by' in parameters:
form.add(WidgetList, '%sby' % prefix, title = _('By'), element_type = SingleSelectWidget,
value = self.by,
add_element_label = _('Add Role'),
element_kwargs = {'render_br': False,
'options': [(None, '---'),
('_submitter', _('Sender')),
('_receiver', _('Receiver')),
(logged_users_role().id, logged_users_role().name),
(None, '----')] + get_user_roles()})
def get_parameters(self):
return ('by', 'status', 'label')
register_item_class(ChoiceWorkflowStatusItem)
class JumpOnSubmitWorkflowStatusItem(WorkflowStatusJumpItem):
description = N_('Change Status on Submit')
key = 'jumponsubmit'
def render_as_line(self):
if self.status:
if self.get_status():
return _('Change Status on Submit (to %s)') % self.get_status().name
else:
return _('Change Status on Submit (broken)')
else:
return _('Change Status on Submit (not completed)')
def submit_form(self, form, formdata, user, evo):
if form.is_submitted() and not form.has_errors():
wf_status = self.get_status()
if wf_status:
evo.status = 'wf-%s' % wf_status.id
def get_parameters(self):
return ('status',)
register_item_class(JumpOnSubmitWorkflowStatusItem)
class SendmailWorkflowStatusItem(WorkflowStatusItem):
description = N_('Send mail')
key = 'sendmail'
support_substitution_variables = True
to = []
subject = None
body = None
comment = None
def render_as_line(self):
if self.to:
return _('Send mail to %s') % render_list_of_roles(self.to)
else:
return _('Send mail (not completed)')
def get_parameters(self):
return ('to', 'subject', 'body')
def fill_admin_form(self, form):
self.add_parameters_widgets(form, self.get_parameters())
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'to' in parameters:
form.add(WidgetList, '%sto' % prefix, title=_('To'),
element_type=SingleSelectWidget,
value=self.to,
add_element_label=_('Add Role'),
element_kwargs={'render_br': False,
'options': [(None, '---'),
('_submitter', _('Sender')),
('_receiver', _('Receiver')), ] + get_user_roles()})
if 'subject' in parameters:
form.add(StringWidget, '%ssubject' % prefix, title=_('Subject'),
value=self.subject, size=40)
if 'body' in parameters:
form.add(TextWidget, '%sbody' % prefix, title=_('Body'),
value=self.body, cols=80, rows=10,
hint=_('Available variables: url, url_status, details, name, number, comment, field_NAME'))
def perform(self, formdata):
if not self.to:
return
if not self.subject:
return
url = formdata.get_url()
try:
mail_body = template_on_formdata(formdata, self.compute(self.body))
except ezt.EZTException:
get_logger().error('error in template for email body [%s], mail could not be generated' % url)
return
try:
mail_subject = template_on_formdata(formdata, self.compute(self.subject))
except ezt.EZTException:
get_logger().error('error in template for email subject [%s], mail could not be generated' % url)
return
users_cfg = get_cfg('users', {})
# this works around the fact that parametric workflows only support
# string values, so if we get set a string, we convert it here to an
# array.
if isinstance(self.to, basestring):
self.to = [self.to]
addresses = []
for dest in self.to:
dest = self.compute(dest)
if '@' in str(dest):
addresses.append(dest)
continue
if dest == '_submitter':
field_email = users_cfg.get('field_email') or 'email'
done = False
if formdata.user:
if formdata.user.email:
addresses.append(formdata.user.email)
done = True
elif formdata.user.form_data and formdata.user.form_data.get(field_email):
addresses.append(formdata.user.form_data.get(field_email))
done = True
if not done:
# if there is no user, or user has no email address, look
# up in submitted form for one that would hold the user
# email (the one set to be prefilled by user email)
fields = formdata.formdef.fields
for field in fields:
if not hasattr(field, 'prefill'):
continue
if field.prefill and field.prefill.get('type') == 'user':
if field.prefill.get('value') == field_email:
v = formdata.data.get(field.id)
if v:
addresses.append(v)
done = True
break
continue
dest = get_role_translation(formdata.formdef, dest)
try:
role = Role.get(dest)
except KeyError:
continue
addresses.extend(role.get_emails())
if not addresses:
return
if len(addresses) > 1:
emails.email(mail_subject, mail_body, email_rcpt=None,
bcc=addresses,
exclude_current_user=False,
fire_and_forget=True)
else:
emails.email(mail_subject, mail_body, email_rcpt=addresses,
exclude_current_user=False,
fire_and_forget=True)
register_item_class(SendmailWorkflowStatusItem)
def template_on_formdata(formdata, template, process=None):
dict = {}
dict.update(get_publisher().substitutions.get_context_variables())
dict['url'] = formdata.get_url()
dict['url_status'] = '%sstatus' % formdata.get_url()
dict['details'] = formdata.formdef.get_detailed_email_form(formdata, dict['url'])
dict['name'] = formdata.formdef.name
dict['number'] = formdata.id
if formdata.evolution and formdata.evolution[-1].comment:
dict['comment'] = formdata.evolution[-1].comment
else:
dict['comment'] = ''
dict.update(formdata.get_as_dict())
# compatibility vars
dict['before'] = dict.get('form_previous_status')
dict['after'] = dict.get('form_status')
dict['evolution'] = dict.get('form_evolution')
if process:
for key in dict:
dict[key] = process(dict[key])
processor = ezt.Template(compress_whitespace=False)
processor.parse(template or '')
fd = StringIO()
processor.generate(fd, dict)
return fd.getvalue()
class SendSMSWorkflowStatusItem(WorkflowStatusItem):
description = N_('Send SMS')
key = 'sendsms'
support_substitution_variables = True
to = []
body = None
def render_as_line(self):
return _('Send SMS')
def fill_admin_form(self, form):
self.add_parameters_widgets(form, self.get_parameters())
def get_parameters(self):
return ('to', 'body')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'to' in parameters:
form.add(WidgetList, '%sto' % prefix, title=_('To'), element_type=StringWidget,
value=self.to, add_element_label=_('Add Number'),
element_kwargs = {'render_br': False})
if 'body' in parameters:
form.add(TextWidget, '%sbody' % prefix, title=_('Body'), value=self.body, cols=80, rows=10,
hint = _('Available variables: url, url_status, details, name, number, comment, field_NAME'))
def perform(self, formdata):
if not self.to:
return
if not self.body:
return
try:
sms_body = template_on_formdata(formdata, self.compute(self.body))
except ezt.EZTException:
url = formdata.get_url()
get_logger().error('error in template for sms [%s], sms could not be generated' % url)
return
from qommon.sms import SMS
sms_cfg = get_cfg('sms', {})
sender = sms_cfg.get('sender', 'AuQuotidien')[:11]
try:
SMS.get_sms_class().send([self.compute(x) for x in self.to], sms_body[:160], sender)
except qommon.errors.SMSError, e:
get_logger().error(e)
register_item_class(SendSMSWorkflowStatusItem)
class DisplayMessageWorkflowStatusItem(WorkflowStatusItem):
description = N_('Display message')
key = 'displaymsg'
support_substitution_variables = True
message = None
def get_message(self, filled):
if not self.message:
return ''
tmpl = ezt.Template()
tmpl.parse(self.message)
dict = {}
dict.update(get_publisher().substitutions.get_context_variables())
dict['date'] = misc.localstrftime(filled.receipt_time)
dict['number'] = filled.id
if filled.formdef.receiver and filled.formdef.receiver.details:
dict['receiver'] = filled.formdef.receiver.details.replace('\n', '<br />')
fd = StringIO()
tmpl.generate(fd, dict)
msg = fd.getvalue()
return msg
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'message' in parameters:
form.add(TextWidget, '%smessage' % prefix, title = _('Message'),
value = self.message, cols = 80, rows = 10)
def get_parameters(self):
return ('message',)
register_item_class(DisplayMessageWorkflowStatusItem)
class RedirectToStatusWorkflowStatusItem(WorkflowStatusItem):
description = N_('Redirect to Status Page')
key = 'redirectstatus'
backoffice = False
def perform(self, formdata):
if not get_request().user:
return None
return formdata.get_url(self.backoffice)
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'backoffice' in parameters:
form.add(CheckboxWidget, '%sbackoffice' % prefix,
title = _('Redirect to backoffice page'),
value = self.backoffice)
def get_parameters(self):
return ('backoffice',)
register_item_class(RedirectToStatusWorkflowStatusItem)
class EditableWorkflowStatusItem(WorkflowStatusItem):
description = N_('Allow Edition')
key = 'editable'
endpoint = False
waitpoint = True
by = []
status = None
label = None
def render_as_line(self):
if self.by:
return _('Allow Edition by %s') % render_list_of_roles(self.by)
else:
return _('Allow Edition (not completed)')
def fill_form(self, form, formdata, user):
label = self.label
if not label:
label = _('Edit Form')
form.add_submit('button%s' % self.id, label)
def submit_form(self, form, formdata, user, evo):
if form.get_submit() == 'button%s' % self.id:
return formdata.get_url() + 'wfedit'
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'by' in parameters:
form.add(WidgetList, '%sby' % prefix, title = _('By'), element_type = SingleSelectWidget,
value = self.by,
add_element_label = _('Add Role'),
element_kwargs = {'render_br': False,
'options': [(None, '---'),
('_submitter', _('Sender')),
('_receiver', _('Receiver')),
(logged_users_role().id, logged_users_role().name),
(None, '----')] + get_user_roles()})
if 'status' in parameters:
# XXX: look into this one, as None is a perfectly valid value, and
# it would put this question in the 'workflow options' part.
form.add(SingleSelectWidget, '%sstatus' % prefix, title = _('Status After Edit'), value = self.status,
hint = _("Don't select any if you don't want status change processing"),
options = [(None, '---')] + [(x.id, x.name) for x in self.parent.parent.possible_status])
if 'label' in parameters:
form.add(StringWidget, '%slabel' % prefix, title = _('Button Label'), value = self.label)
def get_parameters(self):
return ('by', 'status', 'label')
register_item_class(EditableWorkflowStatusItem)
class ExportToModelDirectory(Directory):
_q_exports = [ '' ]
def __init__(self, formdata, wfstatusitem, wfstatus):
self.formdata = formdata
self.wfstatusitem = wfstatusitem
def _q_index(self):
if not self.wfstatusitem.model_file:
raise TemplatingError(_('No model defined for this action'))
response = get_response()
response.content_type = self.wfstatusitem.model_file.content_type
response.set_header('location', '..')
if response.content_type != 'text/html':
response.set_header('content-disposition',
'attachment; filename="%s"' %
self.wfstatusitem.model_file.base_filename)
return self.wfstatusitem.apply_template_to_formdata(self.formdata)
def char2rtf(c):
if ord(c) < 128:
return c
else:
return '\u%d?' % ord(c)
def str2rtf(s):
s = ''.join([ char2rtf(c) for c in s])
return '{\uc1{%s}}' % s
def rtf_process(value):
if value is None:
return None
return str2rtf(unicode(str(value), get_publisher().site_charset))
class ExportToModel(WorkflowStatusItem):
description = N_('Export to model')
key = 'export_to_model'
support_substitution_variables = True
endpoint = False
waitpoint = True
label = None
model_file = None
directory_class = ExportToModelDirectory
def render_as_line(self):
if self.label:
if self.model_file:
model = _('with model named %(file_name)s of %(size)s bytes') % {
'file_name': self.model_file.base_filename,
'size': self.model_file.size }
else:
model = _('no model set')
return _('Export to model, labeled %(label)s, %(model)s') % {
'label': self.label,
'model': model }
else:
return _('Export to model (not completed)')
def fill_form(self, form, formdata, user):
label = self.label
if not label:
label = _('Export to model')
form.add_submit('button%s' % self.id, label)
def submit_form(self, form, formdata, user, evo):
if form.get_submit() == 'button%s' % self.id:
if not evo.comment:
evo.comment = _('Form exported in a model')
return formdata.get_url() + self.get_directory_name()
def model_file_validation(self, upload):
if upload.content_type and upload.content_type == 'application/rtf':
return True, ''
if (upload.content_type and upload.content_type == 'application/octet-stream') or \
upload.content_type is None:
if upload.base_filename and upload.base_filename.endswith('.rtf'):
return True, ''
upload.fp.seek(0)
if upload.read(10).startswith('{\\rtf'):
upload.fp.seek(0)
return True, ''
return False, _('Only RTF files can be used')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
if 'label' in parameters:
form.add(StringWidget, '%slabel' % prefix, title = _('Button Label'), value = self.label)
if 'model_file' in parameters:
ids = (self.parent.parent.id, self.parent.id, self.id)
if formdef:
hint = htmltext('%s: <ul class="varnames">') % _('Available variables')
varnames = get_varnames(formdef.fields)
for pair in varnames:
hint += htmltext('<li><tt class="varname">[%s]</tt> <label>%s</label></span></li>') % pair
hint += htmltext('</ul>')
ids = (formdef.id,) + ids
filename = 'export_to_model-%s-%s-%s-%s.upload' % ids
else:
hint = _('You can use variables in your model using '
'the [variable] syntax, available variables depends on the form.')
filename = 'export_to_model-%s-%s-%s.upload' % ids
widget_name = '%smodel_file' % prefix
if formdef and formdef.workflow_options and \
formdef.workflow_options.get(widget_name) is not None:
value = formdef.workflow_options.get(widget_name)
else:
value = self.model_file
if value:
hint = htmltext('<div>%s: <a href="?file=%s">%s</a></div>') % \
(_('Current value'), widget_name, value.base_filename) + hint
form.add(UploadWidget, widget_name, directory='models',
filename=filename, title=_('Model'), hint=hint,
validation=self.model_file_validation, value=value)
def get_directory_name(self):
return qommon.misc.simplify(self.label or 'export_to_model', space='_')
directory_name = property(get_directory_name)
def apply_template_to_formdata(self, formdata):
process = None
if self.model_file.base_filename.endswith('.rtf'):
process = rtf_process
try:
return template_on_formdata(formdata, self.model_file.get_file().read(),
process=process)
except ezt.UnknownReference, e:
url = formdata.get_url()
get_logger().error('error in template for export to model [%s], unknown reference %s' % (url, str(e)))
raise TemplatingError(_('Error in the template, reference %s is unknown') % str(e))
except ezt.EZTException, e:
url = formdata.get_url()
get_logger().error('error in template for export to model [%s], model could not be generated' % url)
raise TemplatingError(_('Unknown error in the template: %s') % str(e))
def get_parameters(self):
return ('label', 'model_file')
register_item_class(ExportToModel)
def load_extra():
import wf.aggregation_email
import wf.timeout_jump
# import wf.anonymous_access
import wf.jump
import wf.attachment
import wf.remove
import wf.roles