1704 lines
62 KiB
Python
1704 lines
62 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
from qommon import ezt
|
|
from cStringIO import StringIO
|
|
import copy
|
|
import xml.etree.ElementTree as ET
|
|
import random
|
|
import os
|
|
import string
|
|
|
|
from quixote import get_request
|
|
|
|
import qommon.misc
|
|
from qommon.storage import StorableObject
|
|
from qommon.form import *
|
|
from qommon import emails, get_cfg, get_logger
|
|
from quixote.directory import Directory
|
|
from quixote.html import htmltext
|
|
import qommon.errors
|
|
|
|
from wcs.roles import Role, logged_users_role, get_user_roles
|
|
from wcs.formdata import Evolution
|
|
from wcs.fields import SubtitleField, TitleField, CommentField, PageField
|
|
|
|
if not __name__.startswith('wcs.') and not __name__ == "__main__":
|
|
raise ImportError('Import of workflows module must be absolute (import wcs.workflows)')
|
|
|
|
def lax_int(s):
|
|
try:
|
|
return int(s)
|
|
except (ValueError, TypeError):
|
|
return -1
|
|
|
|
|
|
class WorkflowImportError(Exception):
|
|
pass
|
|
|
|
|
|
class AttachmentEvolutionPart:
|
|
orig_filename = None
|
|
base_filename = None
|
|
content_type = None
|
|
charset = None
|
|
|
|
def __init__(self, base_filename, fp, orig_filename=None, content_type=None,
|
|
charset=None):
|
|
self.base_filename = base_filename
|
|
self.orig_filename = orig_filename or base_filename
|
|
self.content_type = content_type
|
|
self.charset = charset
|
|
self.fp = fp
|
|
|
|
def from_upload(cls, upload):
|
|
return AttachmentEvolutionPart(
|
|
upload.base_filename,
|
|
upload.fp,
|
|
upload.orig_filename,
|
|
upload.content_type,
|
|
upload.charset)
|
|
from_upload = classmethod(from_upload)
|
|
|
|
def __getstate__(self):
|
|
odict = self.__dict__.copy()
|
|
if not odict.has_key('fp'):
|
|
return odict
|
|
|
|
del odict['fp']
|
|
dirname = os.path.join(get_publisher().app_dir, 'attachments')
|
|
if not os.path.exists(dirname):
|
|
os.mkdir(dirname)
|
|
|
|
def get_new_filename():
|
|
r = random.SystemRandom()
|
|
while True:
|
|
id = ''.join([r.choice(string.lowercase) for x in range(16)])
|
|
filename = os.path.join(dirname, id)
|
|
if not os.path.exists(filename):
|
|
return filename
|
|
|
|
odict['filename'] = get_new_filename()
|
|
self.fp.seek(0)
|
|
fd = file(odict['filename'], 'w')
|
|
fd.write(self.fp.read())
|
|
fd.close()
|
|
return odict
|
|
|
|
def view(self):
|
|
return htmltext('<p><a href="attachment?f=%s">%s</a>' % (
|
|
os.path.basename(self.filename), self.orig_filename))
|
|
|
|
|
|
class TemplatingError(qommon.errors.PublishError):
|
|
def __init__(self, description):
|
|
self.title = _('Templating Error')
|
|
self.description = description
|
|
|
|
|
|
class DuplicateStatusNameError(Exception):
|
|
pass
|
|
|
|
def get_varnames(fields):
|
|
'''Extract variable names for helping people fill their templates.
|
|
|
|
Prefer to variable name to the numeric field name.
|
|
'''
|
|
varnames = []
|
|
for field in fields:
|
|
if isinstance(field, (SubtitleField, TitleField, CommentField, PageField)):
|
|
continue
|
|
# add it as f$n$
|
|
label = field.label
|
|
if field.varname:
|
|
varnames.append(('var_%s' % field.varname, label))
|
|
else:
|
|
varnames.append(('f%s' % field.id, label))
|
|
return varnames
|
|
|
|
|
|
class Workflow(StorableObject):
|
|
_names = 'workflows'
|
|
name = None
|
|
possible_status = None
|
|
roles = None
|
|
|
|
last_modification_time = None
|
|
last_modification_user_id = None
|
|
|
|
def __init__(self, name = None):
|
|
StorableObject.__init__(self)
|
|
self.name = name
|
|
self.possible_status = []
|
|
|
|
def migrate(self):
|
|
changed = False
|
|
|
|
if not self.__dict__.has_key('roles') or self.roles is None:
|
|
self.roles = {'_receiver': _('Recipient')}
|
|
changed = True
|
|
|
|
if changed:
|
|
self.store()
|
|
|
|
def store(self):
|
|
self.last_modification_time = time.localtime()
|
|
if get_request() and get_request().user:
|
|
self.last_modification_user_id = get_request().user.id
|
|
else:
|
|
self.last_modification_user_id = None
|
|
StorableObject.store(self)
|
|
|
|
# instruct all formdefs to update their security rules
|
|
from formdef import FormDef
|
|
for form in FormDef.select(lambda x: x.workflow_id == self.id):
|
|
form.data_class().rebuild_security()
|
|
|
|
def get(cls, id, ignore_errors=False, ignore_migration=False):
|
|
if id == '_default':
|
|
return cls.get_default_workflow()
|
|
return super(Workflow, cls).get (id, ignore_errors=ignore_errors,
|
|
ignore_migration=ignore_migration)
|
|
get = classmethod(get)
|
|
|
|
def add_status(self, name, id=None):
|
|
if [x for x in self.possible_status if x.name == name]:
|
|
raise DuplicateStatusNameError()
|
|
status = WorkflowStatus(name)
|
|
status.parent = self
|
|
|
|
if id is None:
|
|
if self.possible_status:
|
|
status.id = str(max([lax_int(x.id) for x in self.possible_status]) + 1)
|
|
else:
|
|
status.id = '1'
|
|
else:
|
|
status.id = id
|
|
self.possible_status.append(status)
|
|
return status
|
|
|
|
def get_status(self, id):
|
|
for status in self.possible_status:
|
|
if status.id == id:
|
|
return status
|
|
raise KeyError()
|
|
|
|
def __setstate__(self, dict):
|
|
self.__dict__.update(dict)
|
|
for s in self.possible_status:
|
|
s.parent = self
|
|
for i, item in enumerate(s.items):
|
|
item.parent = s
|
|
if not item.id:
|
|
item.id = '%d' % (i+1)
|
|
|
|
def get_waitpoint_status(self):
|
|
# a waitpoint status is a status waiting for an event (be it user
|
|
# interaction or something else), but can also be an endpoint (where
|
|
# the user would wait, infinitely).
|
|
waitpoint_status = []
|
|
for status in self.possible_status:
|
|
waitpoint = False
|
|
endpoint = True
|
|
if status.forced_endpoint:
|
|
endpoint = True
|
|
else:
|
|
for item in status.items:
|
|
endpoint = item.endpoint and endpoint
|
|
waitpoint = item.waitpoint or waitpoint
|
|
if endpoint or waitpoint:
|
|
waitpoint_status.append(status)
|
|
return waitpoint_status
|
|
|
|
def get_endpoint_status(self):
|
|
not_endpoint_status = self.get_not_endpoint_status()
|
|
endpoint_status = [x for x in self.possible_status if x not in not_endpoint_status]
|
|
return endpoint_status
|
|
|
|
def get_not_endpoint_status(self):
|
|
not_endpoint_status = []
|
|
for status in self.possible_status:
|
|
if status.forced_endpoint:
|
|
continue
|
|
endpoint = True
|
|
for item in status.items:
|
|
endpoint = item.endpoint and endpoint
|
|
if endpoint is False:
|
|
not_endpoint_status.append(status)
|
|
break
|
|
return not_endpoint_status
|
|
|
|
def has_options(self):
|
|
for status in self.possible_status:
|
|
for item in status.items:
|
|
for parameter in item.get_parameters():
|
|
if not getattr(item, parameter):
|
|
return True
|
|
return False
|
|
|
|
def remove_self(self):
|
|
from formdef import FormDef
|
|
for form in FormDef.select(lambda x: x.workflow_id == self.id):
|
|
form.workflow_id = None
|
|
form.store()
|
|
StorableObject.remove_self(self)
|
|
|
|
def export_to_xml(self, include_id=False):
|
|
charset = get_publisher().site_charset
|
|
root = ET.Element('workflow')
|
|
if include_id and self.id and not str(self.id).startswith('_'):
|
|
root.attrib['id'] = str(self.id)
|
|
ET.SubElement(root, 'name').text = unicode(self.name, charset)
|
|
|
|
roles_node = ET.SubElement(root, 'roles')
|
|
if self.roles:
|
|
for role_id, role_label in self.roles.items():
|
|
role_node = ET.SubElement(roles_node, 'role')
|
|
role_node.attrib['id'] = role_id
|
|
role_node.text = unicode(role_label, charset)
|
|
|
|
if self.last_modification_time:
|
|
elem = ET.SubElement(root, 'last_modification')
|
|
elem.text = time.strftime('%Y-%m-%d %H:%M:%S', self.last_modification_time)
|
|
if include_id:
|
|
elem.attrib['user_id'] = str(self.last_modification_user_id)
|
|
|
|
possible_status = ET.SubElement(root, 'possible_status')
|
|
for status in self.possible_status:
|
|
possible_status.append(status.export_to_xml(charset=charset,
|
|
include_id=include_id))
|
|
return root
|
|
|
|
def import_from_xml(cls, fd, include_id=False):
|
|
try:
|
|
tree = ET.parse(fd)
|
|
except:
|
|
raise ValueError()
|
|
return cls.import_from_xml_tree(tree, include_id=include_id)
|
|
import_from_xml = classmethod(import_from_xml)
|
|
|
|
def import_from_xml_tree(cls, tree, include_id=False):
|
|
charset = get_publisher().site_charset
|
|
workflow = cls()
|
|
if tree.find('name') is None or not tree.find('name').text:
|
|
raise WorkflowImportError(N_('Missing name'))
|
|
|
|
# if the tree we get is actually a ElementTree for real, we get its
|
|
# root element and go on happily.
|
|
if not ET.iselement(tree):
|
|
tree = tree.getroot()
|
|
|
|
if tree.tag != 'workflow':
|
|
raise WorkflowImportError(N_('Not a workflow'))
|
|
|
|
if include_id and tree.attrib.get('id'):
|
|
workflow.id = tree.attrib.get('id')
|
|
|
|
workflow.name = tree.find('name').text.encode(charset)
|
|
|
|
if tree.find('roles') is not None:
|
|
workflow.roles = {}
|
|
for role_node in tree.findall('roles/role'):
|
|
workflow.roles[role_node.attrib['id']] = role_node.text.encode(charset)
|
|
|
|
if tree.find('last_modification') is not None:
|
|
node = tree.find('last_modification')
|
|
workflow.last_modification_time = time.strptime(node.text, '%Y-%m-%d %H:%M:%S')
|
|
if include_id and node.attrib.get('user_id'):
|
|
workflow.last_modification_user_id = node.attrib.get('user_id')
|
|
|
|
workflow.possible_status = []
|
|
for status in tree.find('possible_status'):
|
|
status_o = WorkflowStatus()
|
|
status_o.parent = workflow
|
|
status_o.init_with_xml(status, charset, include_id=include_id)
|
|
workflow.possible_status.append(status_o)
|
|
return workflow
|
|
import_from_xml_tree = classmethod(import_from_xml_tree)
|
|
|
|
def get_list_of_roles(self, include_logged_in_users=True):
|
|
t = []
|
|
t.append(('_submitter', misc.zap_context(_('role|User'))))
|
|
for workflow_role in self.roles.items():
|
|
t.append(workflow_role)
|
|
if include_logged_in_users:
|
|
t.append((logged_users_role().id, logged_users_role().name))
|
|
if get_user_roles():
|
|
t.append((None, '----'))
|
|
t.extend(get_user_roles())
|
|
return t
|
|
|
|
def render_list_of_roles(self, roles):
|
|
t = []
|
|
for r in roles:
|
|
role_label = get_role_translation_label(self, r)
|
|
if role_label:
|
|
t.append(role_label)
|
|
return ', '.join(t)
|
|
|
|
def get_unknown_workflow(cls):
|
|
workflow = Workflow(name=_('Unknown'))
|
|
workflow.id = '_unknown'
|
|
return workflow
|
|
get_unknown_workflow = classmethod(get_unknown_workflow)
|
|
|
|
def get_default_workflow(cls):
|
|
from qommon.admin.emails import EmailsDirectory
|
|
|
|
workflow = Workflow(name=_('Default'))
|
|
workflow.id = '_default'
|
|
workflow.roles = {'_receiver': _('Recipient')}
|
|
just_submitted_status = workflow.add_status(_('Just Submitted'), 'just_submitted')
|
|
just_submitted_status.visibility = ['_receiver']
|
|
new_status = workflow.add_status(_('New'), 'new')
|
|
new_status.colour = '66FF00'
|
|
rejected_status = workflow.add_status(_('Rejected'), 'rejected')
|
|
rejected_status.colour = 'FF3300'
|
|
accepted_status = workflow.add_status(_('Accepted'), 'accepted')
|
|
accepted_status.colour = '66CCFF'
|
|
finished_status = workflow.add_status(_('Finished'), 'finished')
|
|
finished_status.colour = 'CCCCCC'
|
|
|
|
commentable = CommentableWorkflowStatusItem()
|
|
commentable.id = '_commentable'
|
|
commentable.by = ['_submitter', '_receiver']
|
|
|
|
import wf.jump
|
|
jump_to_new = wf.jump.JumpWorkflowStatusItem()
|
|
jump_to_new.id = '_jump_to_new'
|
|
jump_to_new.status = new_status.id
|
|
jump_to_new.parent = just_submitted_status
|
|
|
|
notify_new_receiver_email = SendmailWorkflowStatusItem()
|
|
notify_new_receiver_email.id = '_notify_new_receiver_email'
|
|
notify_new_receiver_email.to = ['_receiver']
|
|
notify_new_receiver_email.subject = EmailsDirectory.get_subject('new_receiver')
|
|
notify_new_receiver_email.body = EmailsDirectory.get_body('new_receiver')
|
|
if not EmailsDirectory.is_enabled('new_receiver'):
|
|
notify_new_receiver_email = None
|
|
|
|
notify_new_user_email = SendmailWorkflowStatusItem()
|
|
notify_new_user_email.id = '_notify_new_user_email'
|
|
notify_new_user_email.to = ['_submitter']
|
|
notify_new_user_email.subject = EmailsDirectory.get_subject('new_user')
|
|
notify_new_user_email.body = EmailsDirectory.get_body('new_user')
|
|
if not EmailsDirectory.is_enabled('new_user'):
|
|
notify_change_user_email = None
|
|
|
|
notify_change_receiver_email = SendmailWorkflowStatusItem()
|
|
notify_change_receiver_email.id = '_notify_change_receiver_email'
|
|
notify_change_receiver_email.to = ['_receiver']
|
|
notify_change_receiver_email.subject = EmailsDirectory.get_subject('change_receiver')
|
|
notify_change_receiver_email.body = EmailsDirectory.get_body('change_receiver')
|
|
if not EmailsDirectory.is_enabled('change_receiver'):
|
|
notify_change_receiver_email = None
|
|
|
|
notify_change_user_email = SendmailWorkflowStatusItem()
|
|
notify_change_user_email.id = '_notify_change_user_email'
|
|
notify_change_user_email.to = ['_submitter']
|
|
notify_change_user_email.subject = EmailsDirectory.get_subject('change_user')
|
|
notify_change_user_email.body = EmailsDirectory.get_body('change_user')
|
|
if not EmailsDirectory.is_enabled('change_user'):
|
|
notify_change_user_email = None
|
|
|
|
if notify_new_receiver_email:
|
|
notify_new_receiver_email.parent = just_submitted_status
|
|
just_submitted_status.items.append(notify_new_receiver_email)
|
|
if notify_new_user_email:
|
|
notify_new_user_email.parent = just_submitted_status
|
|
just_submitted_status.items.append(notify_new_user_email)
|
|
|
|
just_submitted_status.items.append(jump_to_new)
|
|
|
|
if notify_change_receiver_email:
|
|
accepted_status.items.append(notify_change_receiver_email)
|
|
notify_change_receiver_email.parent = accepted_status
|
|
notify_change_receiver_email = copy.copy(notify_change_receiver_email)
|
|
|
|
rejected_status.items.append(notify_change_receiver_email)
|
|
notify_change_receiver_email.parent = rejected_status
|
|
notify_change_receiver_email = copy.copy(notify_change_receiver_email)
|
|
|
|
finished_status.items.append(notify_change_receiver_email)
|
|
notify_change_receiver_email.parent = finished_status
|
|
|
|
if notify_change_user_email:
|
|
accepted_status.items.append(notify_change_user_email)
|
|
notify_change_user_email.parent = accepted_status
|
|
notify_change_user_email = copy.copy(notify_change_user_email)
|
|
|
|
rejected_status.items.append(notify_change_user_email)
|
|
notify_change_user_email.parent = rejected_status
|
|
notify_change_user_email = copy.copy(notify_change_user_email)
|
|
|
|
finished_status.items.append(notify_change_user_email)
|
|
notify_change_user_email.parent = finished_status
|
|
|
|
new_status.items.append(commentable)
|
|
commentable.parent = new_status
|
|
|
|
commentable = copy.copy(commentable)
|
|
accepted_status.items.append(commentable)
|
|
commentable.parent = accepted_status
|
|
|
|
accept = ChoiceWorkflowStatusItem()
|
|
accept.id = '_accept'
|
|
accept.label = _('Accept')
|
|
accept.by = ['_receiver']
|
|
accept.status = accepted_status.id
|
|
accept.parent = new_status
|
|
new_status.items.append(accept)
|
|
|
|
reject = ChoiceWorkflowStatusItem()
|
|
reject.id = '_reject'
|
|
reject.label = _('Reject')
|
|
reject.by = ['_receiver']
|
|
reject.status = rejected_status.id
|
|
reject.parent = new_status
|
|
new_status.items.append(reject)
|
|
|
|
finish = ChoiceWorkflowStatusItem()
|
|
finish.id = '_finish'
|
|
finish.label = _('Finish')
|
|
finish.by = ['_receiver']
|
|
finish.status = finished_status.id
|
|
finish.parent = accepted_status
|
|
accepted_status.items.append(finish)
|
|
|
|
return workflow
|
|
get_default_workflow = classmethod(get_default_workflow)
|
|
|
|
|
|
|
|
class WorkflowStatus:
|
|
id = None
|
|
name = None
|
|
items = None
|
|
visibility = None
|
|
forced_endpoint = False
|
|
colour = 'FFFFFF'
|
|
|
|
def __init__(self, name = None):
|
|
self.name = name
|
|
self.items = []
|
|
|
|
def __eq__(self, other):
|
|
if other is None:
|
|
return False
|
|
# this assumes both status are from the same workflow
|
|
if type(other) is str:
|
|
other_id = other
|
|
else:
|
|
other_id = other.id
|
|
return self.id == other_id
|
|
|
|
def append_item(self, type):
|
|
for klass in item_classes:
|
|
if klass.key == type:
|
|
o = klass()
|
|
if self.items:
|
|
o.id = str(max([lax_int(x.id) for x in self.items]) + 1)
|
|
else:
|
|
o.id = '1'
|
|
self.items.append(o)
|
|
break
|
|
else:
|
|
raise KeyError()
|
|
|
|
def get_item(self, id):
|
|
for item in self.items:
|
|
if item.id == id:
|
|
return item
|
|
raise KeyError()
|
|
|
|
def perform_items(self, formdata, depth=20):
|
|
if depth == 0: # prevents infinite loops
|
|
return
|
|
url = None
|
|
old_status = formdata.status
|
|
for item in self.items:
|
|
url = item.perform(formdata) or url
|
|
if formdata.status != old_status:
|
|
break
|
|
if formdata.status != old_status:
|
|
if not formdata.evolution:
|
|
formdata.evolution = []
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
evo.status = formdata.status
|
|
formdata.evolution.append(evo)
|
|
formdata.store()
|
|
# performs the items of the new status
|
|
wf_status = formdata.get_status()
|
|
url = wf_status.perform_items(formdata, depth=depth-1) or url
|
|
return url
|
|
|
|
def get_action_form(self, filled, user):
|
|
form = Form(enctype='multipart/form-data', use_tokens = False)
|
|
for item in self.items:
|
|
if not item.check_auth(filled, user):
|
|
continue
|
|
item.fill_form(form, filled, user)
|
|
|
|
if form.widgets or form.submit_widgets:
|
|
return form
|
|
else:
|
|
return None
|
|
|
|
def handle_form(self, form, filled, user):
|
|
#form = self.get_action_form(filled, user)
|
|
|
|
evo = Evolution()
|
|
evo.time = time.localtime()
|
|
if user:
|
|
if filled.is_submitter(user):
|
|
evo.who = '_submitter'
|
|
else:
|
|
evo.who = user.id
|
|
if not filled.evolution:
|
|
filled.evolution = []
|
|
|
|
for item in self.items:
|
|
if hasattr(item, 'by'):
|
|
for role in item.by or []:
|
|
if role == logged_users_role().id:
|
|
break
|
|
if role == '_submitter':
|
|
if filled.is_submitter(user):
|
|
break
|
|
else:
|
|
continue
|
|
role = get_role_translation(filled, role)
|
|
if role in (user.roles or []):
|
|
break
|
|
else:
|
|
continue
|
|
next_url = item.submit_form(form, filled, user, evo)
|
|
if next_url is True:
|
|
break
|
|
if next_url:
|
|
if not form.has_errors():
|
|
filled.evolution.append(evo)
|
|
if evo.status:
|
|
filled.status = evo.status
|
|
filled.store()
|
|
return next_url
|
|
|
|
if form.has_errors():
|
|
return
|
|
|
|
filled.evolution.append(evo)
|
|
|
|
if evo.status:
|
|
filled.status = evo.status
|
|
filled.store()
|
|
url = filled.perform_workflow()
|
|
if url:
|
|
return url
|
|
|
|
def get_subdirectories(self, formdata):
|
|
subdirectories = []
|
|
for item in self.items:
|
|
if item.directory_name:
|
|
subdirectories.append((item.directory_name,
|
|
item.directory_class(formdata, item, self)))
|
|
return subdirectories
|
|
|
|
def is_visible(self, formdata, user):
|
|
if not self.visibility: # no restriction -> visible
|
|
return True
|
|
if user and user.is_admin:
|
|
return True
|
|
|
|
if user:
|
|
user_roles = set(user.roles or [])
|
|
user_roles.add(logged_users_role().id)
|
|
else:
|
|
user_roles = set([])
|
|
|
|
visibility_roles = self.visibility[:]
|
|
for item in self.items or []:
|
|
if not hasattr(item, 'by') or not item.by:
|
|
continue
|
|
visibility_roles.extend(item.by)
|
|
|
|
for role in visibility_roles:
|
|
if role != '_submitter':
|
|
role = get_role_translation(formdata, role)
|
|
if role in user_roles:
|
|
return True
|
|
return False
|
|
|
|
def __getstate__(self):
|
|
odict = self.__dict__.copy()
|
|
if odict.has_key('parent'):
|
|
del odict['parent']
|
|
return odict
|
|
|
|
def export_to_xml(self, charset, include_id=False):
|
|
status = ET.Element('status')
|
|
ET.SubElement(status, 'id').text = unicode(self.id, charset)
|
|
ET.SubElement(status, 'name').text = unicode(self.name, charset)
|
|
ET.SubElement(status, 'colour').text = unicode(self.colour, charset)
|
|
|
|
if self.forced_endpoint:
|
|
ET.SubElement(status, 'forced_endpoint').text = 'true'
|
|
|
|
visibility_node = ET.SubElement(status, 'visibility')
|
|
for role in self.visibility or []:
|
|
ET.SubElement(visibility_node, 'role').text = str(role)
|
|
|
|
items = ET.SubElement(status, 'items')
|
|
for item in self.items:
|
|
items.append(item.export_to_xml(charset=charset,
|
|
include_id=include_id))
|
|
return status
|
|
|
|
def init_with_xml(self, elem, charset, include_id=False):
|
|
self.id = elem.find('id').text.encode(charset)
|
|
self.name = elem.find('name').text.encode(charset)
|
|
if elem.find('colour') is not None:
|
|
self.colour = elem.find('colour').text.encode(charset)
|
|
if elem.find('forced_endpoint') is not None:
|
|
self.forced_endpoint = (elem.find('forced_endpoint').text == 'true')
|
|
|
|
self.visibility = []
|
|
for visibility_role in elem.findall('visibility/role'):
|
|
self.visibility.append(visibility_role.text)
|
|
|
|
self.items = []
|
|
for item in elem.find('items'):
|
|
item_type = item.attrib['type']
|
|
self.append_item(item_type)
|
|
item_o = self.items[-1]
|
|
item_o.parent = self
|
|
item_o.init_with_xml(item, charset, include_id=include_id)
|
|
|
|
class WorkflowStatusItem:
|
|
description = 'XX'
|
|
category = None # (key, label)
|
|
id = None
|
|
|
|
endpoint = True # means it's not possible to interact, and/or cause a status change
|
|
waitpoint = False # means it's possible to wait (user interaction, or other event)
|
|
directory_name = None
|
|
directory_class = None
|
|
support_substitution_variables = False
|
|
|
|
def init(cls):
|
|
pass
|
|
init = classmethod(init)
|
|
|
|
def is_available(cls):
|
|
return True
|
|
is_available = classmethod(is_available)
|
|
|
|
def render_as_line(self):
|
|
return _(self.description)
|
|
|
|
def render_list_of_roles(self, roles):
|
|
return self.parent.parent.render_list_of_roles(roles)
|
|
|
|
def get_list_of_roles(self, include_logged_in_users=True):
|
|
return self.parent.parent.get_list_of_roles(include_logged_in_users=include_logged_in_users)
|
|
|
|
def perform(self, formdata):
|
|
pass
|
|
|
|
def fill_form(self, form, formdata, user):
|
|
pass
|
|
|
|
def submit_form(self, form, formdata, user, evo):
|
|
pass
|
|
|
|
def check_auth(self, formdata, user):
|
|
if not hasattr(self, 'by'):
|
|
return True
|
|
|
|
for role in self.by or []:
|
|
if user and role == logged_users_role().id:
|
|
return True
|
|
if not user:
|
|
continue
|
|
if role == '_submitter':
|
|
t = formdata.is_submitter(user)
|
|
if t is True:
|
|
return True
|
|
continue
|
|
role = get_role_translation(formdata, role)
|
|
if role in (user.roles or []):
|
|
return True
|
|
|
|
return False
|
|
|
|
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
|
|
pass
|
|
|
|
def get_parameters(self):
|
|
return ()
|
|
|
|
def fill_admin_form(self, form):
|
|
self.add_parameters_widgets(form, self.get_parameters())
|
|
|
|
def submit_admin_form(self, form):
|
|
for f in self.get_parameters():
|
|
widget = form.get_widget(f)
|
|
if widget:
|
|
value = widget.parse()
|
|
if hasattr(self, '%s_parse' % f):
|
|
value = getattr(self, '%s_parse' % f)(value)
|
|
setattr(self, f, value)
|
|
|
|
def compute(self, var):
|
|
if not isinstance(var, basestring):
|
|
return var
|
|
if not var.startswith('='):
|
|
return var
|
|
vars = get_publisher().substitutions.get_context_variables()
|
|
try:
|
|
return eval(var[1:], get_publisher().get_global_eval_dict(), vars)
|
|
except:
|
|
return var
|
|
|
|
def get_substitution_variables(self, formdata):
|
|
return {}
|
|
|
|
def get_target_status(self):
|
|
"""Returns a list of status this item can lead to."""
|
|
return []
|
|
|
|
def export_to_xml(self, charset, include_id=False):
|
|
item = ET.Element('item')
|
|
item.attrib['type'] = self.key
|
|
for attribute in self.get_parameters():
|
|
if hasattr(self, '%s_export_to_xml' % attribute):
|
|
getattr(self, '%s_export_to_xml' % attribute)(item, charset,
|
|
include_id=include_id)
|
|
continue
|
|
if hasattr(self, attribute) and getattr(self, attribute) is not None:
|
|
el = ET.SubElement(item, attribute)
|
|
val = getattr(self, attribute)
|
|
if type(val) is dict:
|
|
for k, v in val.items():
|
|
ET.SubElement(el, k).text = unicode(v, charset, 'replace')
|
|
elif type(val) is list:
|
|
if attribute[-1] == 's':
|
|
atname = attribute[:-1]
|
|
else:
|
|
atname = 'item'
|
|
for v in val:
|
|
ET.SubElement(el, atname).text = unicode(str(v), charset, 'replace')
|
|
elif type(val) in (str, unicode):
|
|
if type(val) is unicode:
|
|
el.text = val
|
|
else:
|
|
el.text = unicode(val, charset, 'replace')
|
|
else:
|
|
el.text = str(val)
|
|
return item
|
|
|
|
def init_with_xml(self, elem, charset, include_id=False):
|
|
for attribute in self.get_parameters():
|
|
el = elem.find(attribute)
|
|
if hasattr(self, '%s_init_with_xml' % attribute):
|
|
getattr(self, '%s_init_with_xml' % attribute)(el, charset,
|
|
include_id=include_id)
|
|
continue
|
|
if el is None:
|
|
continue
|
|
if el.getchildren():
|
|
if type(getattr(self, attribute)) is list:
|
|
v = [x.text.encode(charset) for x in el.getchildren()]
|
|
elif type(getattr(self, attribute)) is dict:
|
|
v = {}
|
|
for e in el.getchildren():
|
|
v[e.tag] = e.text.encode(charset)
|
|
else:
|
|
# ???
|
|
raise AssertionError
|
|
setattr(self, attribute, v)
|
|
else:
|
|
if el.text is None:
|
|
setattr(self, attribute, None)
|
|
elif el.text in ('False', 'True'): # bools
|
|
setattr(self, attribute, eval(el.text))
|
|
elif type(getattr(self, attribute)) is int:
|
|
setattr(self, attribute, int(el.text.encode(charset)))
|
|
else:
|
|
setattr(self, attribute, el.text.encode(charset))
|
|
|
|
def _roles_export_to_xml(self, attribute, item, charset, include_id=False):
|
|
if not hasattr(self, attribute) or not getattr(self, attribute):
|
|
return
|
|
el = ET.SubElement(item, attribute)
|
|
for role_id in getattr(self, attribute):
|
|
if role_id is None:
|
|
continue
|
|
role_id = str(role_id)
|
|
if role_id.startswith('_') or role_id == 'logged-users':
|
|
role = unicode(role_id, charset)
|
|
else:
|
|
try:
|
|
role = unicode(Role.get(role_id).name, charset)
|
|
except KeyError:
|
|
role = unicode(role_id, charset)
|
|
sub = ET.SubElement(el, 'item')
|
|
sub.attrib['role_id'] = role_id
|
|
sub.text = role
|
|
|
|
def _roles_init_with_xml(self, attribute, elem, charset, include_id=False):
|
|
if elem is None:
|
|
setattr(self, attribute, [])
|
|
else:
|
|
imported_roles = []
|
|
for child in elem.getchildren():
|
|
imported_roles.append(self._get_role_id_from_xml(child,
|
|
charset, include_id=include_id))
|
|
setattr(self, attribute, imported_roles)
|
|
|
|
def _role_export_to_xml(self, attribute, item, charset, include_id=False):
|
|
if not hasattr(self, attribute) or not getattr(self, attribute):
|
|
return
|
|
role_id = str(getattr(self, attribute))
|
|
if role_id.startswith('_') or role_id == 'logged-users':
|
|
role = unicode(role_id, charset)
|
|
else:
|
|
try:
|
|
role = unicode(Role.get(role_id).name, charset)
|
|
except KeyError:
|
|
role = unicode(role_id, charset)
|
|
sub = ET.SubElement(item, attribute)
|
|
if include_id:
|
|
sub.attrib['role_id'] = role_id
|
|
sub.text = role
|
|
|
|
def _get_role_id_from_xml(self, elem, charset, include_id=False):
|
|
if elem is None:
|
|
return None
|
|
|
|
value = elem.text.encode(charset)
|
|
|
|
# look for known static values
|
|
if value.startswith('_') or value == 'logged-users':
|
|
return value
|
|
|
|
# if we import using id, only look at the role_id attribute
|
|
if include_id:
|
|
if not 'role_id' in elem.attrib:
|
|
return None
|
|
role_id = str(elem.attrib['role_id'])
|
|
if Role.has_key(role_id):
|
|
try:
|
|
return int(role_id)
|
|
except ValueError:
|
|
return str(role_id)
|
|
else:
|
|
return None
|
|
|
|
# if not using id, look up on the name
|
|
for role in Role.select(ignore_errors=True):
|
|
if role.name == value:
|
|
return role.id
|
|
|
|
# and if there's no match, create a new role
|
|
role = Role()
|
|
role.name = value
|
|
role.store()
|
|
return role.id
|
|
|
|
def _role_init_with_xml(self, attribute, elem, charset, include_id=False):
|
|
setattr(self, attribute, self._get_role_id_from_xml(elem, charset,
|
|
include_id=include_id))
|
|
|
|
def by_export_to_xml(self, item, charset, include_id=False):
|
|
self._roles_export_to_xml('by', item, charset, include_id=include_id)
|
|
|
|
def by_init_with_xml(self, elem, charset, include_id=False):
|
|
self._roles_init_with_xml('by', elem, charset, include_id=include_id)
|
|
|
|
def to_export_to_xml(self, item, charset, include_id=False):
|
|
self._roles_export_to_xml('to', item, charset, include_id=include_id)
|
|
|
|
def to_init_with_xml(self, elem, charset, include_id=False):
|
|
self._roles_init_with_xml('to', elem, charset, include_id)
|
|
|
|
def _q_admin_lookup(self, workflow, status, component, html_top):
|
|
return None
|
|
|
|
def __getstate__(self):
|
|
odict = self.__dict__.copy()
|
|
if odict.has_key('parent'):
|
|
del odict['parent']
|
|
return odict
|
|
|
|
|
|
class WorkflowStatusJumpItem(WorkflowStatusItem):
|
|
status = None
|
|
endpoint = False
|
|
|
|
def get_target_status(self):
|
|
if not self.status:
|
|
return []
|
|
try:
|
|
return [x for x in self.parent.parent.possible_status if x.id == self.status]
|
|
except IndexError:
|
|
get_publisher().get_app_logger().error(
|
|
'reference to invalid status in workflow %s, status %s, item %s' % (
|
|
self.parent.parent.name,
|
|
self.parent.name,
|
|
self.description))
|
|
return []
|
|
|
|
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
|
|
if 'status' in parameters:
|
|
form.add(SingleSelectWidget, '%sstatus' % prefix, title = _('Status'), value = self.status,
|
|
options = [(None, '---')] + [(x.id, x.name) for x in self.parent.parent.possible_status])
|
|
|
|
def get_parameters(self):
|
|
return ('status',)
|
|
|
|
|
|
def get_role_translation(formdata, role_name):
|
|
if role_name == '_submitter':
|
|
raise Exception('_submitter is not a valid role')
|
|
elif str(role_name).startswith('_'):
|
|
role_id = None
|
|
if formdata.workflow_roles:
|
|
role_id = formdata.workflow_roles.get(role_name)
|
|
if not role_id and formdata.formdef.workflow_roles:
|
|
role_id = formdata.formdef.workflow_roles.get(role_name)
|
|
return role_id
|
|
else:
|
|
return role_name
|
|
|
|
def get_role_translation_label(workflow, role_id):
|
|
if role_id == logged_users_role().id:
|
|
return logged_users_role().name
|
|
if role_id == '_submitter':
|
|
return misc.zap_context(_('role|User'))
|
|
if str(role_id).startswith('_'):
|
|
return workflow.roles.get(role_id)
|
|
else:
|
|
try:
|
|
return Role.get(role_id).name
|
|
except KeyError:
|
|
return
|
|
|
|
item_classes = []
|
|
|
|
def register_item_class(klass):
|
|
if not klass in item_classes:
|
|
item_classes.append(klass)
|
|
klass.init()
|
|
|
|
|
|
class CommentableWorkflowStatusItem(WorkflowStatusItem):
|
|
description = N_('Allow Comment')
|
|
key = 'commentable'
|
|
endpoint = False
|
|
waitpoint = True
|
|
|
|
varname = None
|
|
label = None
|
|
button_label = 0 # hack to handle legacy commentable items
|
|
hint = None
|
|
by = []
|
|
|
|
def render_as_line(self):
|
|
if self.by:
|
|
return _('Allow Comment by %s') % self.render_list_of_roles(self.by)
|
|
else:
|
|
return _('Allow Comment (not completed)')
|
|
|
|
def fill_form(self, form, formdata, user):
|
|
if not 'comment' in [x.name for x in form.widgets]:
|
|
if self.label is None:
|
|
title = _('Comment')
|
|
else:
|
|
title = self.label
|
|
form.add(TextWidget, 'comment', title=title, required=False,
|
|
cols=40, rows=10, hint=self.hint)
|
|
form.get_widget('comment').attrs['class'] = 'comment'
|
|
if self.button_label == 0:
|
|
form.add_submit('button%s' % self.id, _('Add Comment'))
|
|
elif self.button_label:
|
|
form.add_submit('button%s' % self.id, self.button_label)
|
|
|
|
def submit_form(self, form, formdata, user, evo):
|
|
if form.get_widget('comment'):
|
|
evo.comment = form.get_widget('comment').parse()
|
|
if self.varname:
|
|
workflow_data = {'comment_%s' % self.varname: evo.comment}
|
|
formdata.update_workflow_data(workflow_data)
|
|
|
|
def submit_admin_form(self, form):
|
|
for f in self.get_parameters():
|
|
widget = form.get_widget(f)
|
|
if widget:
|
|
setattr(self, f, widget.parse())
|
|
|
|
def fill_admin_form(self, form):
|
|
if self.by and not type(self.by) is list:
|
|
self.by = None
|
|
self.add_parameters_widgets(form, self.get_parameters())
|
|
|
|
def get_parameters(self):
|
|
return ('label', 'button_label', 'by', 'hint', 'varname')
|
|
|
|
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
|
|
if 'label' in parameters:
|
|
if self.label is None:
|
|
self.label = _('Comment')
|
|
form.add(StringWidget, '%slabel' % prefix, size=40, title=_('Label'), value=self.label)
|
|
if 'button_label' in parameters:
|
|
if self.button_label == 0:
|
|
self.button_label = _('Add Comment')
|
|
form.add(StringWidget, '%sbutton_label' % prefix, title=_('Button Label'),
|
|
hint=_('(empty to disable the button)'),
|
|
value=self.button_label)
|
|
if 'hint' in parameters:
|
|
form.add(StringWidget, '%shint' % prefix, size=40, title=_('Hint'), value=self.hint)
|
|
if 'by' in parameters:
|
|
form.add(WidgetList, '%sby' % prefix, title = _('By'),
|
|
element_type=SingleSelectWidget,
|
|
value=self.by,
|
|
add_element_label=_('Add Role'),
|
|
element_kwargs={'render_br': False,
|
|
'options': [(None, '---')] + self.get_list_of_roles()})
|
|
if 'varname' in parameters:
|
|
form.add(VarnameWidget, '%svarname' % prefix,
|
|
title=_('Variable Name'), value=self.varname,
|
|
hint=_('This will make the comment available in a variable named comment_varname.'))
|
|
|
|
def button_label_export_to_xml(self, xml_item, charset, include_id=False):
|
|
if self.button_label == 0:
|
|
pass
|
|
elif self.button_label is None:
|
|
# button_label being None is a special case meaning "no button", it
|
|
# should be handled differently than the "not filled" case
|
|
el = ET.SubElement(xml_item, 'button_label')
|
|
else:
|
|
el = ET.SubElement(xml_item, 'button_label')
|
|
if type(self.button_label) is unicode:
|
|
el.text = self.button_label
|
|
elif type(self.button_label) is str:
|
|
el.text = unicode(self.button_label, charset, 'replace')
|
|
else:
|
|
raise AssertionError('unknown type for button_label (%r)' % self.button_label)
|
|
|
|
def button_label_init_with_xml(self, element, charset, include_id=False):
|
|
if element is None:
|
|
return
|
|
if element.text is None:
|
|
# this means element is self-closing, <button_label />, which maps
|
|
# to None, meaning "no button".
|
|
self.button_label = None
|
|
else:
|
|
self.button_label = element.text.encode(charset)
|
|
|
|
register_item_class(CommentableWorkflowStatusItem)
|
|
|
|
|
|
class ChoiceWorkflowStatusItem(WorkflowStatusJumpItem):
|
|
description = N_('Change Status')
|
|
key = 'choice'
|
|
endpoint = False
|
|
waitpoint = True
|
|
|
|
label = None
|
|
by = []
|
|
|
|
def render_as_line(self):
|
|
if self.label:
|
|
if self.by:
|
|
return _('Change Status "%(label)s" by %(by)s') % \
|
|
{ 'label' : self.label,
|
|
'by' : self.render_list_of_roles(self.by) }
|
|
else:
|
|
return _('Change Status "%s"') % self.label
|
|
else:
|
|
return _('Change Status (not completed)')
|
|
|
|
def fill_form(self, form, formdata, user):
|
|
form.add_submit('button%s' % self.id, self.label)
|
|
|
|
def submit_form(self, form, formdata, user, evo):
|
|
if form.get_submit() == 'button%s' % self.id:
|
|
wf_status = self.get_target_status()
|
|
if wf_status:
|
|
evo.status = 'wf-%s' % wf_status[0].id
|
|
form.clear_errors()
|
|
return True # get out of processing loop
|
|
|
|
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
|
|
if 'label' in parameters:
|
|
form.add(StringWidget, '%slabel' % prefix, title = _('Label'), value = self.label)
|
|
WorkflowStatusJumpItem.add_parameters_widgets(self, form, parameters, prefix, formdef)
|
|
if 'by' in parameters:
|
|
form.add(WidgetList, '%sby' % prefix, title = _('By'), element_type = SingleSelectWidget,
|
|
value = self.by,
|
|
add_element_label = _('Add Role'),
|
|
element_kwargs={'render_br': False,
|
|
'options': [(None, '---')] + self.get_list_of_roles()})
|
|
|
|
def get_parameters(self):
|
|
return ('by', 'status', 'label')
|
|
|
|
register_item_class(ChoiceWorkflowStatusItem)
|
|
|
|
|
|
class JumpOnSubmitWorkflowStatusItem(WorkflowStatusJumpItem):
|
|
description = N_('Change Status on Submit')
|
|
key = 'jumponsubmit'
|
|
|
|
def render_as_line(self):
|
|
if self.status:
|
|
if self.get_target_status():
|
|
return _('Change Status on Submit (to %s)') % self.get_target_status()[0].name
|
|
else:
|
|
return _('Change Status on Submit (broken)')
|
|
else:
|
|
return _('Change Status on Submit (not completed)')
|
|
|
|
def submit_form(self, form, formdata, user, evo):
|
|
if form.is_submitted() and not form.has_errors():
|
|
wf_status = self.get_target_status()
|
|
if wf_status:
|
|
evo.status = 'wf-%s' % wf_status[0].id
|
|
|
|
def get_parameters(self):
|
|
return ('status',)
|
|
register_item_class(JumpOnSubmitWorkflowStatusItem)
|
|
|
|
|
|
class SendmailWorkflowStatusItem(WorkflowStatusItem):
|
|
description = N_('Send mail')
|
|
key = 'sendmail'
|
|
support_substitution_variables = True
|
|
|
|
to = []
|
|
subject = None
|
|
body = None
|
|
|
|
comment = None
|
|
|
|
def render_as_line(self):
|
|
if self.to:
|
|
return _('Send mail to %s') % self.render_list_of_roles(self.to)
|
|
else:
|
|
return _('Send mail (not completed)')
|
|
|
|
def get_parameters(self):
|
|
return ('to', 'subject', 'body')
|
|
|
|
def fill_admin_form(self, form):
|
|
self.add_parameters_widgets(form, self.get_parameters())
|
|
|
|
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
|
|
if 'to' in parameters:
|
|
form.add(WidgetList, '%sto' % prefix, title=_('To'),
|
|
element_type=SingleSelectWidget,
|
|
value=self.to,
|
|
add_element_label=_('Add Role'),
|
|
element_kwargs={'render_br': False,
|
|
'options': [(None, '---')] +
|
|
self.get_list_of_roles(include_logged_in_users=False)})
|
|
if 'subject' in parameters:
|
|
form.add(StringWidget, '%ssubject' % prefix, title=_('Subject'),
|
|
value=self.subject, size=40)
|
|
if 'body' in parameters:
|
|
form.add(TextWidget, '%sbody' % prefix, title=_('Body'),
|
|
value=self.body, cols=80, rows=10,
|
|
hint=_('Available variables: url, url_status, details, name, number, comment, field_NAME'))
|
|
|
|
|
|
def perform(self, formdata):
|
|
if not self.to:
|
|
return
|
|
if not self.subject:
|
|
return
|
|
|
|
url = formdata.get_url()
|
|
try:
|
|
mail_body = template_on_formdata(formdata, self.compute(self.body))
|
|
except ezt.EZTException:
|
|
get_logger().error('error in template for email body [%s], mail could not be generated' % url)
|
|
return
|
|
|
|
try:
|
|
mail_subject = template_on_formdata(formdata, self.compute(self.subject))
|
|
except ezt.EZTException:
|
|
get_logger().error('error in template for email subject [%s], mail could not be generated' % url)
|
|
return
|
|
|
|
users_cfg = get_cfg('users', {})
|
|
|
|
# this works around the fact that parametric workflows only support
|
|
# string values, so if we get set a string, we convert it here to an
|
|
# array.
|
|
if isinstance(self.to, basestring):
|
|
self.to = [self.to]
|
|
|
|
addresses = []
|
|
for dest in self.to:
|
|
dest = self.compute(dest)
|
|
|
|
if '@' in str(dest):
|
|
addresses.append(dest)
|
|
continue
|
|
|
|
if dest == '_submitter':
|
|
field_email = users_cfg.get('field_email') or 'email'
|
|
done = False
|
|
if formdata.user:
|
|
if formdata.user.email:
|
|
addresses.append(formdata.user.email)
|
|
done = True
|
|
elif formdata.user.form_data and formdata.user.form_data.get(field_email):
|
|
addresses.append(formdata.user.form_data.get(field_email))
|
|
done = True
|
|
|
|
if not done:
|
|
# if there is no user, or user has no email address, look
|
|
# up in submitted form for one that would hold the user
|
|
# email (the one set to be prefilled by user email)
|
|
fields = formdata.formdef.fields
|
|
for field in fields:
|
|
if not hasattr(field, 'prefill'):
|
|
continue
|
|
if field.prefill and field.prefill.get('type') == 'user':
|
|
if field.prefill.get('value') == field_email:
|
|
v = formdata.data.get(field.id)
|
|
if v:
|
|
addresses.append(v)
|
|
done = True
|
|
break
|
|
|
|
continue
|
|
|
|
dest = get_role_translation(formdata, dest)
|
|
|
|
try:
|
|
role = Role.get(dest)
|
|
except KeyError:
|
|
continue
|
|
addresses.extend(role.get_emails())
|
|
|
|
if not addresses:
|
|
return
|
|
|
|
if len(addresses) > 1:
|
|
emails.email(mail_subject, mail_body, email_rcpt=None,
|
|
bcc=addresses,
|
|
exclude_current_user=False,
|
|
fire_and_forget=True)
|
|
else:
|
|
emails.email(mail_subject, mail_body, email_rcpt=addresses,
|
|
exclude_current_user=False,
|
|
fire_and_forget=True)
|
|
register_item_class(SendmailWorkflowStatusItem)
|
|
|
|
|
|
def template_on_formdata(formdata, template, process=None):
|
|
dict = {}
|
|
dict.update(get_publisher().substitutions.get_context_variables())
|
|
dict['url'] = formdata.get_url()
|
|
dict['url_status'] = '%sstatus' % formdata.get_url()
|
|
dict['details'] = formdata.formdef.get_detailed_email_form(formdata, dict['url'])
|
|
dict['name'] = formdata.formdef.name
|
|
dict['number'] = formdata.id
|
|
if formdata.evolution and formdata.evolution[-1].comment:
|
|
dict['comment'] = formdata.evolution[-1].comment
|
|
else:
|
|
dict['comment'] = ''
|
|
dict.update(formdata.get_as_dict())
|
|
|
|
# compatibility vars
|
|
dict['before'] = dict.get('form_previous_status')
|
|
dict['after'] = dict.get('form_status')
|
|
dict['evolution'] = dict.get('form_evolution')
|
|
|
|
if process:
|
|
for key in dict:
|
|
dict[key] = process(dict[key])
|
|
|
|
charset = get_publisher().site_charset
|
|
for k, v in dict.items():
|
|
if isinstance(v, unicode):
|
|
dict[k] = v.encode(charset, 'ignore')
|
|
|
|
processor = ezt.Template(compress_whitespace=False)
|
|
processor.parse(template or '')
|
|
|
|
fd = StringIO()
|
|
processor.generate(fd, dict)
|
|
|
|
return fd.getvalue()
|
|
|
|
class SendSMSWorkflowStatusItem(WorkflowStatusItem):
|
|
description = N_('Send SMS')
|
|
key = 'sendsms'
|
|
support_substitution_variables = True
|
|
|
|
to = []
|
|
body = None
|
|
|
|
def render_as_line(self):
|
|
return _('Send SMS')
|
|
|
|
def fill_admin_form(self, form):
|
|
self.add_parameters_widgets(form, self.get_parameters())
|
|
|
|
def get_parameters(self):
|
|
return ('to', 'body')
|
|
|
|
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
|
|
if 'to' in parameters:
|
|
form.add(WidgetList, '%sto' % prefix, title=_('To'), element_type=StringWidget,
|
|
value=self.to, add_element_label=_('Add Number'),
|
|
element_kwargs = {'render_br': False})
|
|
if 'body' in parameters:
|
|
form.add(TextWidget, '%sbody' % prefix, title=_('Body'), value=self.body, cols=80, rows=10,
|
|
hint = _('Available variables: url, url_status, details, name, number, comment, field_NAME'))
|
|
|
|
def perform(self, formdata):
|
|
if not self.to:
|
|
return
|
|
if not self.body:
|
|
return
|
|
|
|
try:
|
|
sms_body = template_on_formdata(formdata, self.compute(self.body))
|
|
except ezt.EZTException:
|
|
url = formdata.get_url()
|
|
get_logger().error('error in template for sms [%s], sms could not be generated' % url)
|
|
return
|
|
|
|
from qommon.sms import SMS
|
|
|
|
sms_cfg = get_cfg('sms', {})
|
|
sender = sms_cfg.get('sender', 'AuQuotidien')[:11]
|
|
mode = sms_cfg.get('mode', 'none')
|
|
try:
|
|
SMS.get_sms_class(mode).send(sender, [self.compute(x) for x in self.to], sms_body[:160])
|
|
except qommon.errors.SMSError, e:
|
|
get_logger().error(e)
|
|
|
|
register_item_class(SendSMSWorkflowStatusItem)
|
|
|
|
|
|
class DisplayMessageWorkflowStatusItem(WorkflowStatusItem):
|
|
description = N_('Display message')
|
|
key = 'displaymsg'
|
|
support_substitution_variables = True
|
|
|
|
message = None
|
|
|
|
def get_message(self, filled):
|
|
if not self.message:
|
|
return ''
|
|
tmpl = ezt.Template()
|
|
tmpl.parse(self.message)
|
|
|
|
dict = {}
|
|
dict.update(get_publisher().substitutions.get_context_variables())
|
|
dict['date'] = misc.localstrftime(filled.receipt_time)
|
|
dict['number'] = filled.id
|
|
handling_role = filled.get_handling_role()
|
|
if handling_role and handling_role.details:
|
|
dict['receiver'] = handling_role.details.replace('\n', '<br />')
|
|
|
|
fd = StringIO()
|
|
tmpl.generate(fd, dict)
|
|
msg = fd.getvalue()
|
|
|
|
return msg
|
|
|
|
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
|
|
if 'message' in parameters:
|
|
form.add(TextWidget, '%smessage' % prefix, title = _('Message'),
|
|
value = self.message, cols = 80, rows = 10)
|
|
|
|
def get_parameters(self):
|
|
return ('message',)
|
|
|
|
register_item_class(DisplayMessageWorkflowStatusItem)
|
|
|
|
|
|
class RedirectToStatusWorkflowStatusItem(WorkflowStatusItem):
|
|
description = N_('Redirect to Status Page')
|
|
key = 'redirectstatus'
|
|
|
|
backoffice = False
|
|
|
|
def perform(self, formdata):
|
|
return formdata.get_url(self.backoffice)
|
|
|
|
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
|
|
if 'backoffice' in parameters:
|
|
form.add(CheckboxWidget, '%sbackoffice' % prefix,
|
|
title = _('Redirect to backoffice page'),
|
|
value = self.backoffice)
|
|
|
|
def get_parameters(self):
|
|
return ('backoffice',)
|
|
|
|
# RedirectToStatusWorkflowStatusItem is not registered as the class kept for
|
|
# backward compatibility only and should not be exposed to the user. (#3031)
|
|
|
|
|
|
class EditableWorkflowStatusItem(WorkflowStatusItem):
|
|
description = N_('Allow Edition')
|
|
key = 'editable'
|
|
endpoint = False
|
|
waitpoint = True
|
|
|
|
by = []
|
|
status = None
|
|
label = None
|
|
|
|
def render_as_line(self):
|
|
if self.by:
|
|
return _('Allow Edition by %s') % self.render_list_of_roles(self.by)
|
|
else:
|
|
return _('Allow Edition (not completed)')
|
|
|
|
def fill_form(self, form, formdata, user):
|
|
label = self.label
|
|
if not label:
|
|
label = _('Edit Form')
|
|
form.add_submit('button%s' % self.id, label)
|
|
|
|
def submit_form(self, form, formdata, user, evo):
|
|
if form.get_submit() == 'button%s' % self.id:
|
|
return formdata.get_url() + 'wfedit'
|
|
|
|
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
|
|
if 'by' in parameters:
|
|
form.add(WidgetList, '%sby' % prefix, title = _('By'), element_type = SingleSelectWidget,
|
|
value = self.by,
|
|
add_element_label = _('Add Role'),
|
|
element_kwargs={'render_br': False,
|
|
'options': [(None, '---')] + self.get_list_of_roles()})
|
|
if 'status' in parameters:
|
|
# XXX: look into this one, as None is a perfectly valid value, and
|
|
# it would put this question in the 'workflow options' part.
|
|
form.add(SingleSelectWidget, '%sstatus' % prefix, title = _('Status After Edit'), value = self.status,
|
|
hint = _("Don't select any if you don't want status change processing"),
|
|
options = [(None, '---')] + [(x.id, x.name) for x in self.parent.parent.possible_status])
|
|
if 'label' in parameters:
|
|
form.add(StringWidget, '%slabel' % prefix, title = _('Button Label'), value = self.label)
|
|
|
|
def get_parameters(self):
|
|
return ('by', 'status', 'label')
|
|
|
|
register_item_class(EditableWorkflowStatusItem)
|
|
|
|
class ExportToModelDirectory(Directory):
|
|
_q_exports = [ '' ]
|
|
|
|
def __init__(self, formdata, wfstatusitem, wfstatus):
|
|
self.formdata = formdata
|
|
self.wfstatusitem = wfstatusitem
|
|
|
|
def _q_index(self):
|
|
if not self.wfstatusitem.model_file:
|
|
raise TemplatingError(_('No model defined for this action'))
|
|
response = get_response()
|
|
response.content_type = self.wfstatusitem.model_file.content_type
|
|
response.set_header('location', '..')
|
|
|
|
if response.content_type != 'text/html':
|
|
response.set_header('content-disposition',
|
|
'attachment; filename="%s"' %
|
|
self.wfstatusitem.model_file.base_filename)
|
|
return self.wfstatusitem.apply_template_to_formdata(self.formdata)
|
|
|
|
def char2rtf(c):
|
|
if ord(c) < 128:
|
|
return c
|
|
else:
|
|
return '\\u%d?' % ord(c)
|
|
|
|
def str2rtf(s):
|
|
s = ''.join([ char2rtf(c) for c in s])
|
|
return '{\\uc1{%s}}' % s
|
|
|
|
def rtf_process(value):
|
|
if value is None:
|
|
return None
|
|
return str2rtf(unicode(str(value), get_publisher().site_charset))
|
|
|
|
class ExportToModel(WorkflowStatusItem):
|
|
description = N_('Create Document')
|
|
key = 'export_to_model'
|
|
support_substitution_variables = True
|
|
|
|
endpoint = False
|
|
waitpoint = True
|
|
|
|
label = None
|
|
model_file = None
|
|
attach_to_history = False
|
|
directory_class = ExportToModelDirectory
|
|
|
|
def render_as_line(self):
|
|
if self.label:
|
|
if self.model_file:
|
|
model = _('with model named %(file_name)s of %(size)s bytes') % {
|
|
'file_name': self.model_file.base_filename,
|
|
'size': self.model_file.size }
|
|
else:
|
|
model = _('no model set')
|
|
return _('Create document, labeled %(label)s, %(model)s') % {
|
|
'label': self.label,
|
|
'model': model }
|
|
else:
|
|
return _('Create document (not completed)')
|
|
|
|
def fill_form(self, form, formdata, user):
|
|
label = self.label
|
|
if not label:
|
|
label = _('Create Document')
|
|
form.add_submit('button%s' % self.id, label)
|
|
|
|
def submit_form(self, form, formdata, user, evo):
|
|
if form.get_submit() == 'button%s' % self.id:
|
|
if not evo.comment:
|
|
evo.comment = _('Form exported in a model')
|
|
if self.attach_to_history:
|
|
from wcs.wf.attachment import AttachmentEvolutionPart
|
|
evo.add_part(AttachmentEvolutionPart(
|
|
self.model_file.base_filename,
|
|
StringIO(self.apply_template_to_formdata(formdata)),
|
|
content_type=self.model_file.content_type))
|
|
return formdata.get_url()
|
|
return formdata.get_url() + self.get_directory_name()
|
|
|
|
def model_file_validation(self, upload):
|
|
if upload.content_type and upload.content_type == 'application/rtf':
|
|
return True, ''
|
|
if (upload.content_type and upload.content_type == 'application/octet-stream') or \
|
|
upload.content_type is None:
|
|
if upload.base_filename and upload.base_filename.endswith('.rtf'):
|
|
return True, ''
|
|
upload.fp.seek(0)
|
|
if upload.read(10).startswith('{\\rtf'):
|
|
upload.fp.seek(0)
|
|
return True, ''
|
|
return False, _('Only RTF files can be used')
|
|
|
|
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
|
|
if 'attach_to_history' in parameters:
|
|
form.add(CheckboxWidget, '%sattach_to_history' % prefix,
|
|
title=_('Attach generated file to the form history'),
|
|
value=self.attach_to_history)
|
|
if 'label' in parameters:
|
|
form.add(StringWidget, '%slabel' % prefix, title = _('Button Label'), value = self.label)
|
|
if 'model_file' in parameters:
|
|
ids = (self.parent.parent.id, self.parent.id, self.id)
|
|
if formdef:
|
|
hint = htmltext('%s: <ul class="varnames">') % _('Available variables')
|
|
varnames = get_varnames(formdef.fields)
|
|
for pair in varnames:
|
|
hint += htmltext('<li><tt class="varname">[%s]</tt> <label>%s</label></span></li>') % pair
|
|
hint += htmltext('</ul>')
|
|
ids = (formdef.id,) + ids
|
|
filename = 'export_to_model-%s-%s-%s-%s.upload' % ids
|
|
else:
|
|
hint = _('You can use variables in your model using '
|
|
'the [variable] syntax, available variables depends on the form.')
|
|
filename = 'export_to_model-%s-%s-%s.upload' % ids
|
|
widget_name = '%smodel_file' % prefix
|
|
if formdef and formdef.workflow_options and \
|
|
formdef.workflow_options.get(widget_name) is not None:
|
|
value = formdef.workflow_options.get(widget_name)
|
|
else:
|
|
value = self.model_file
|
|
if value:
|
|
hint = htmltext('<div>%s: <a href="?file=%s">%s</a></div>') % \
|
|
(_('Current value'), widget_name, value.base_filename) + hint
|
|
form.add(UploadWidget, widget_name, directory='models',
|
|
filename=filename, title=_('Model'), hint=hint,
|
|
validation=self.model_file_validation, value=value)
|
|
|
|
def get_directory_name(self):
|
|
return qommon.misc.simplify(self.label or 'export_to_model', space='_')
|
|
directory_name = property(get_directory_name)
|
|
|
|
def apply_template_to_formdata(self, formdata):
|
|
process = None
|
|
if self.model_file.base_filename.endswith('.rtf'):
|
|
process = rtf_process
|
|
try:
|
|
return template_on_formdata(formdata, self.model_file.get_file().read(),
|
|
process=process)
|
|
except ezt.UnknownReference, e:
|
|
url = formdata.get_url()
|
|
get_logger().error('error in template for export to model [%s], unknown reference %s' % (url, str(e)))
|
|
raise TemplatingError(_('Error in the template, reference %s is unknown') % str(e))
|
|
except ezt.EZTException, e:
|
|
url = formdata.get_url()
|
|
get_logger().error('error in template for export to model [%s], model could not be generated' % url)
|
|
raise TemplatingError(_('Unknown error in the template: %s') % str(e))
|
|
|
|
def get_parameters(self):
|
|
return ('label', 'model_file', 'attach_to_history')
|
|
|
|
def model_file_export_to_xml(self, xml_item, charset, include_id=False):
|
|
if not self.model_file:
|
|
return
|
|
el = ET.SubElement(xml_item, 'model_file')
|
|
ET.SubElement(el, 'base_filename').text = self.model_file.base_filename
|
|
ET.SubElement(el, 'content_type').text = self.model_file.content_type
|
|
ET.SubElement(el, 'content').text = self.model_file.get_file().read()
|
|
|
|
def model_file_init_with_xml(self, elem, charset, include_id=False):
|
|
if elem is None:
|
|
return
|
|
base_filename = elem.find('base_filename').text
|
|
content_type = elem.find('content_type').text
|
|
content = elem.find('content').text
|
|
|
|
from quixote.http_request import Upload
|
|
from wcs.qommon.form import UploadedFile
|
|
|
|
ids = (self.parent.parent.id, self.parent.id, self.id)
|
|
filename = 'export_to_model-%s-%s-%s.upload' % ids
|
|
|
|
upload = Upload(base_filename, content_type)
|
|
upload.fp = StringIO()
|
|
upload.fp.write(content)
|
|
upload.fp.seek(0)
|
|
self.model_file = UploadedFile('models', filename, upload)
|
|
|
|
register_item_class(ExportToModel)
|
|
|
|
|
|
def load_extra():
|
|
import wf.aggregation_email
|
|
import wf.timeout_jump
|
|
# import wf.anonymous_access
|
|
import wf.jump
|
|
import wf.attachment
|
|
import wf.remove
|
|
import wf.roles
|
|
import wf.dispatch
|
|
import wf.wscall
|
|
import wf.form
|
|
import wf.register_comment
|
|
import wf.anonymise
|