# w.c.s. - web application for online forms # Copyright (C) 2005-2010 Entr'ouvert # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . from .qommon import ezt import collections import copy import datetime import xml.etree.ElementTree as ET import random import os import sys import time import uuid from django.utils import six from django.utils.encoding import force_text from quixote import get_request, get_response, redirect from .qommon import _, N_, force_str from .qommon.misc import C_, get_as_datetime, file_digest, get_foreground_colour, xml_node_text from .qommon.storage import StorableObject, atomic_write, NotEqual, Contains, Null, pickle_2to3_conversion from .qommon.form import * from .qommon.humantime import seconds2humanduration from .qommon import emails, get_cfg, get_logger from quixote.html import htmltext from .qommon import errors from .qommon.template import Template, TemplateError from .conditions import Condition from .roles import Role, logged_users_role, get_user_roles from .fields import FileField from .formdef import FormDef from .formdata import Evolution from .mail_templates import MailTemplate if not __name__.startswith('wcs.') and not __name__ == "__main__": raise ImportError('Import of workflows module must be absolute (import wcs.workflows)') def lax_int(s): try: return int(s) except (ValueError, TypeError): return -1 def perform_items(items, formdata, depth=20): if depth == 0: # prevents infinite loops return url = None old_status = formdata.status for item in items: if not item.check_condition(formdata): continue try: url = item.perform(formdata) or url except AbortActionException: break if formdata.status != old_status: break if formdata.status != old_status: if not formdata.evolution: formdata.evolution = [] evo = Evolution() evo.time = time.localtime() evo.status = formdata.status formdata.evolution.append(evo) formdata.store() # performs the items of the new status wf_status = formdata.get_status() url = perform_items(wf_status.items, formdata, depth=depth-1) or url return url class WorkflowImportError(Exception): def __init__(self, msg, msg_args=None): super(WorkflowImportError, self).__init__(msg) self.msg_args = msg_args or () class AbortActionException(Exception): pass class AttachmentSubstitutionProxy(object): def __init__(self, formdata, attachment_evolution_part): self.formdata = formdata self.attachment_evolution_part = attachment_evolution_part @property def filename(self): return self.attachment_evolution_part.orig_filename @property def content_type(self): return self.attachment_evolution_part.content_type @property def content(self): return self.attachment_evolution_part.get_file_pointer().read() @property def b64_content(self): return base64.b64encode(self.content) @property def url(self): return '%sattachment?f=%s' % (self.formdata.get_url(), os.path.basename(self.attachment_evolution_part.filename)) class NamedAttachmentsSubstitutionProxy(object): def __init__(self, formdata, parts): self.formdata = formdata self.parts = parts[:] self.parts.reverse() def __len__(self): return len(self.parts) def __getattr__(self, name): return getattr(self[0], name) def __getitem__(self, i): return AttachmentSubstitutionProxy(self.formdata, self.parts[i]) class AttachmentsSubstitutionProxy(object): def __init__(self, formdata): self.formdata = formdata def __getattr__(self, name): if name.startswith('__'): raise AttributeError(name) def has_varname_attachment(part): return isinstance(part, AttachmentEvolutionPart) and getattr(part, 'varname', None) == name parts = [part for part in self.formdata.iter_evolution_parts() if has_varname_attachment(part)] if parts: return NamedAttachmentsSubstitutionProxy(self.formdata, parts) raise AttributeError(name) class AttachmentEvolutionPart: #pylint: disable=C1001 orig_filename = None base_filename = None content_type = None charset = None varname = None def __init__(self, base_filename, fp, orig_filename=None, content_type=None, charset=None, varname=None): self.base_filename = base_filename self.orig_filename = orig_filename or base_filename self.content_type = content_type self.charset = charset self.fp = fp self.varname = varname @classmethod def from_upload(cls, upload, varname=None): return AttachmentEvolutionPart( upload.base_filename, upload.fp, upload.orig_filename, upload.content_type, upload.charset, varname=varname) def get_file_pointer(self): return open(self.filename, 'rb') def __getstate__(self): odict = self.__dict__.copy() if not 'fp' in odict: return odict del odict['fp'] dirname = os.path.join(get_publisher().app_dir, 'attachments') if not os.path.exists(dirname): os.mkdir(dirname) if not 'filename' in odict: filename = file_digest(self.fp) dirname = os.path.join(dirname, filename[:4]) if not os.path.exists(dirname): os.mkdir(dirname) odict['filename'] = os.path.join(dirname, filename) self.filename = odict['filename'] self.fp.seek(0) atomic_write(self.filename, self.fp) return odict def view(self): return htmltext('

%s' % ( os.path.basename(self.filename), self.orig_filename)) @classmethod def get_substitution_variables(cls, formdata): return {'attachments': AttachmentsSubstitutionProxy(formdata), 'form_attachments': AttachmentsSubstitutionProxy(formdata)} # mimic PicklableUpload methods: def can_thumbnail(self): return True def has_redirect_url(self): return False def get_redirect_url(self, upload, backoffice=False): # should never be called, has_redirect_url is False raise AssertionError('no get_redirect_url on AttachmentEvolutionPart object') class DuplicateGlobalActionNameError(Exception): pass class DuplicateStatusNameError(Exception): pass class WorkflowVariablesFieldsFormDef(FormDef): '''Class to handle workflow variables, it loads and saves from/to the workflow object 'variables_formdef' attribute.''' lightweight = False def __init__(self, workflow): self.id = None self.workflow = workflow if workflow.variables_formdef and workflow.variables_formdef.fields: self.fields = self.workflow.variables_formdef.fields self.max_field_id = max([lax_int(x.id) for x in self.fields or []]) else: self.fields = [] @property def name(self): return _('Options of workflow "%s"') % self.workflow.name def get_admin_url(self): base_url = get_publisher().get_backoffice_url() return '%s/workflows/%s/variables/fields/' % (base_url, self.workflow.id) def store(self): for field in self.fields: if hasattr(field, 'widget_class'): if not field.varname: field.varname = misc.simplify(field.label, space='_') self.workflow.variables_formdef = self self.workflow.store() class WorkflowBackofficeFieldsFormDef(FormDef): '''Class to handle workflow backoffice fields, it loads and saves from/to the workflow object 'backoffice_fields_formdef' attribute.''' lightweight = False field_prefix = 'bo' def __init__(self, workflow): self.id = None self.workflow = workflow if workflow.backoffice_fields_formdef and workflow.backoffice_fields_formdef.fields: self.fields = self.workflow.backoffice_fields_formdef.fields else: self.fields = [] @property def name(self): return _('Backoffice fields of workflow "%s"') % self.workflow.name def get_admin_url(self): base_url = get_publisher().get_backoffice_url() return '%s/workflows/%s/backoffice-fields/fields/' % (base_url, self.workflow.id) def get_new_field_id(self): return '%s%s' % (self.field_prefix, str(uuid.uuid4())) def store(self): self.workflow.backoffice_fields_formdef = self self.workflow.store() class Workflow(StorableObject): _names = 'workflows' name = None possible_status = None roles = None variables_formdef = None backoffice_fields_formdef = None global_actions = None criticality_levels = None last_modification_time = None last_modification_user_id = None def __init__(self, name = None): StorableObject.__init__(self) self.name = name self.possible_status = [] self.roles = {'_receiver': _('Recipient')} self.global_actions = [] self.criticality_levels = [] def migrate(self): changed = False if not 'roles' in self.__dict__ or self.roles is None: self.roles = {'_receiver': _('Recipient')} changed = True for status in self.possible_status: changed |= status.migrate() if self.backoffice_fields_formdef and self.backoffice_fields_formdef.fields: for field in self.backoffice_fields_formdef.fields: changed |= field.migrate() if not self.global_actions: self.global_actions = [] if changed: self.store() def store(self): must_update = False if self.id: old_self = self.get(self.id, ignore_errors=True, ignore_migration=True) if old_self: old_endpoints = set([x.id for x in old_self.get_endpoint_status()]) if old_endpoints != set([x.id for x in self.get_endpoint_status()]): must_update = True old_criticality_levels = len(old_self.criticality_levels or [0]) if old_criticality_levels != len(self.criticality_levels or [0]): must_update = True try: old_backoffice_fields = old_self.backoffice_fields_formdef.fields except AttributeError: old_backoffice_fields = [] try: new_backoffice_fields = self.backoffice_fields_formdef.fields except AttributeError: new_backoffice_fields = [] if len(old_backoffice_fields) != len(new_backoffice_fields): must_update = True elif self.backoffice_fields_formdef: must_update = True self.last_modification_time = time.localtime() if get_request() and get_request().user: self.last_modification_user_id = str(get_request().user.id) else: self.last_modification_user_id = None StorableObject.store(self) def update(job=None): # instruct all related formdefs to update. for form in self.formdefs(ignore_migration=True, order_by='id'): form.data_class().rebuild_security() if must_update: form.rebuild() if get_response(): get_response().add_after_job( N_('Reindexing forms after workflow change'), update) else: update() @classmethod def get(cls, id, ignore_errors=False, ignore_migration=False): if id == '_default': return cls.get_default_workflow() elif id == '_carddef_default': from wcs.carddef import CardDef return CardDef.get_default_workflow() return super(Workflow, cls).get (id, ignore_errors=ignore_errors, ignore_migration=ignore_migration) def add_status(self, name, id=None): if [x for x in self.possible_status if x.name == name]: raise DuplicateStatusNameError() status = WorkflowStatus(name) status.parent = self if id is None: if self.possible_status: status.id = str(max([lax_int(x.id) for x in self.possible_status]) + 1) else: status.id = '1' else: status.id = id self.possible_status.append(status) return status def get_status(self, id): if id.startswith('wf-'): id = id[3:] for status in self.possible_status: if status.id == id: return status raise KeyError() def get_backoffice_fields(self): if self.backoffice_fields_formdef: return self.backoffice_fields_formdef.fields or [] return [] def get_all_items(self): for status in self.possible_status or []: for item in status.items or []: yield item for action in self.global_actions or []: for item in action.items or []: yield item def add_global_action(self, name, id=None): if [x for x in self.global_actions if x.name == name]: raise DuplicateGlobalActionNameError() action = WorkflowGlobalAction(name) action.parent = self action.append_trigger('manual') if id is None: if self.global_actions: action.id = str(max([lax_int(x.id) for x in self.global_actions]) + 1) else: action.id = '1' else: action.id = id self.global_actions.append(action) return action def get_global_manual_actions(self): actions = [] for action in self.global_actions or []: roles = [] for trigger in action.triggers or []: if not isinstance(trigger, WorkflowGlobalActionManualTrigger): continue roles.extend(trigger.roles or []) functions = [x for x in roles if x in self.roles] roles = [x for x in roles if x not in self.roles] if functions or roles: actions.append({'action': action, 'roles': roles, 'functions': functions}) return actions def get_global_actions_for_user(self, formdata, user): if not user: return [] actions = [] for action in self.global_actions or []: for trigger in action.triggers or []: if isinstance(trigger, WorkflowGlobalActionManualTrigger): if '_submitter' in (trigger.roles or []) and formdata.is_submitter(user): actions.append(action) break roles = [get_role_translation(formdata, x) for x in (trigger.roles or []) if x != '_submitter'] if set(roles).intersection(user.get_roles()): actions.append(action) break return actions def get_subdirectories(self, formdata): wf_status = formdata.get_status() if not wf_status: # draft return [] directories = [] for action in self.global_actions: for trigger in action.triggers or []: directories.extend(trigger.get_subdirectories(formdata)) directories.extend(wf_status.get_subdirectories(formdata)) return directories def __setstate__(self, dict): self.__dict__.update(dict) pickle_2to3_conversion(self) for s in self.possible_status + (self.global_actions or []): s.parent = self triggers = getattr(s, 'triggers', None) or [] for i, item in enumerate(s.items + triggers): item.parent = s if not item.id: item.id = '%d' % (i+1) if self.variables_formdef: self.variables_formdef.workflow = self if self.backoffice_fields_formdef: self.backoffice_fields_formdef.workflow = self self.backoffice_fields_formdef.__class__ = WorkflowBackofficeFieldsFormDef def get_waitpoint_status(self): return [x for x in self.possible_status if x.is_waitpoint()] def get_endpoint_status(self): return [x for x in self.possible_status if x.is_endpoint()] def get_not_endpoint_status(self): return [x for x in self.possible_status if not x.is_endpoint()] def has_options(self): for status in self.possible_status: for item in status.items: for parameter in item.get_parameters(): if not getattr(item, parameter): return True return False def remove_self(self): for form in self.formdefs(): form.workflow_id = None form.store() StorableObject.remove_self(self) def export_to_xml(self, include_id=False): charset = get_publisher().site_charset root = ET.Element('workflow') if include_id and self.id and not str(self.id).startswith('_'): root.attrib['id'] = str(self.id) ET.SubElement(root, 'name').text = force_text(self.name, charset) roles_node = ET.SubElement(root, 'roles') if self.roles: for role_id, role_label in sorted(self.roles.items()): role_node = ET.SubElement(roles_node, 'role') role_node.attrib['id'] = role_id role_node.text = force_text(role_label, charset) if self.last_modification_time: elem = ET.SubElement(root, 'last_modification') elem.text = time.strftime('%Y-%m-%d %H:%M:%S', self.last_modification_time) if include_id: elem.attrib['user_id'] = str(self.last_modification_user_id) possible_status = ET.SubElement(root, 'possible_status') for status in self.possible_status: possible_status.append(status.export_to_xml(charset=charset, include_id=include_id)) if self.global_actions: global_actions = ET.SubElement(root, 'global_actions') for action in self.global_actions: global_actions.append(action.export_to_xml(charset=charset, include_id=include_id)) if self.criticality_levels: criticality_levels = ET.SubElement(root, 'criticality_levels') for level in self.criticality_levels: criticality_levels.append(level.export_to_xml(charset=charset)) if self.variables_formdef: variables = ET.SubElement(root, 'variables') formdef = ET.SubElement(variables, 'formdef') ET.SubElement(formdef, 'name').text = '-' # required by formdef xml import fields = ET.SubElement(formdef, 'fields') for field in self.variables_formdef.fields: fields.append(field.export_to_xml(charset=charset, include_id=include_id)) if self.backoffice_fields_formdef: variables = ET.SubElement(root, 'backoffice-fields') formdef = ET.SubElement(variables, 'formdef') ET.SubElement(formdef, 'name').text = '-' # required by formdef xml import fields = ET.SubElement(formdef, 'fields') for field in self.backoffice_fields_formdef.fields: fields.append(field.export_to_xml(charset=charset, include_id=include_id)) return root @classmethod def import_from_xml(cls, fd, include_id=False): try: tree = ET.parse(fd) except: raise ValueError() return cls.import_from_xml_tree(tree, include_id=include_id) @classmethod def import_from_xml_tree(cls, tree, include_id=False): charset = get_publisher().site_charset workflow = cls() if tree.find('name') is None or not tree.find('name').text: raise WorkflowImportError(N_('Missing name')) # if the tree we get is actually a ElementTree for real, we get its # root element and go on happily. if not ET.iselement(tree): tree = tree.getroot() if tree.tag != 'workflow': raise WorkflowImportError(N_('Not a workflow')) if include_id and tree.attrib.get('id'): workflow.id = tree.attrib.get('id') workflow.name = xml_node_text(tree.find('name')) if tree.find('roles') is not None: workflow.roles = {} for role_node in tree.findall('roles/role'): workflow.roles[role_node.attrib['id']] = xml_node_text(role_node) if tree.find('last_modification') is not None: node = tree.find('last_modification') workflow.last_modification_time = time.strptime(node.text, '%Y-%m-%d %H:%M:%S') if include_id and node.attrib.get('user_id'): workflow.last_modification_user_id = node.attrib.get('user_id') workflow.possible_status = [] for status in tree.find('possible_status'): status_o = WorkflowStatus() status_o.parent = workflow status_o.init_with_xml(status, charset, include_id=include_id) workflow.possible_status.append(status_o) workflow.global_actions = [] global_actions = tree.find('global_actions') if global_actions is not None: for action in global_actions: action_o = WorkflowGlobalAction() action_o.parent = workflow action_o.init_with_xml(action, charset, include_id=include_id) workflow.global_actions.append(action_o) workflow.criticality_levels = [] criticality_levels = tree.find('criticality_levels') if criticality_levels is not None: for level in criticality_levels: level_o = WorkflowCriticalityLevel() level_o.init_with_xml(level, charset) workflow.criticality_levels.append(level_o) variables = tree.find('variables') if variables is not None: formdef = variables.find('formdef') imported_formdef = FormDef.import_from_xml_tree(formdef, include_id=True) workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow) workflow.variables_formdef.fields = imported_formdef.fields variables = tree.find('backoffice-fields') if variables is not None: formdef = variables.find('formdef') imported_formdef = FormDef.import_from_xml_tree(formdef, include_id=True) workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow=workflow) workflow.backoffice_fields_formdef.fields = imported_formdef.fields return workflow def get_list_of_roles(self, include_logged_in_users=True): t = [] t.append(('_submitter', C_('role|User'), '_submitter')) for workflow_role in self.roles.items(): t.append(list(workflow_role) + [workflow_role[0]]) if include_logged_in_users: t.append((logged_users_role().id, logged_users_role().name, logged_users_role().id)) include_roles = not(get_publisher().has_site_option('workflow-functions-only')) if include_roles and get_user_roles(): t.append((None, '----', None)) t.extend(get_user_roles()) return t def render_list_of_roles(self, roles): return render_list_of_roles(self, roles) def get_json_export_dict(self, include_id=False): charset = get_publisher().site_charset root = {} root['name'] = force_text(self.name, charset) if include_id and self.id: root['id'] = str(self.id) if self.last_modification_time: root['last_modification_time'] = time.strftime('%Y-%m-%dT%H:%M:%S', self.last_modification_time) roles = root['functions'] = {} for role, label in self.roles.items(): roles[role] = force_text(label, charset) statuses = root['statuses'] = [] endpoint_status_ids = [s.id for s in self.get_endpoint_status()] waitpoint_status_ids = [s.id for s in self.get_waitpoint_status()] for status in self.possible_status: statuses.append({ 'id': status.id, 'name': force_text(status.name, charset), 'forced_endpoint': status.forced_endpoint, 'endpoint': status.id in endpoint_status_ids, 'waitpoint': status.id in waitpoint_status_ids, }) root['fields'] = [] for field in self.get_backoffice_fields(): root['fields'].append(field.export_to_json(include_id=include_id)) return root @classmethod def get_unknown_workflow(cls): workflow = Workflow(name=_('Unknown')) workflow.id = '_unknown' return workflow @classmethod def get_default_workflow(cls): from .qommon.admin.emails import EmailsDirectory workflow = Workflow(name=_('Default')) workflow.id = '_default' workflow.roles = {'_receiver': _('Recipient')} just_submitted_status = workflow.add_status(_('Just Submitted'), 'just_submitted') just_submitted_status.visibility = ['_receiver'] new_status = workflow.add_status(_('New'), 'new') new_status.colour = '66FF00' rejected_status = workflow.add_status(_('Rejected'), 'rejected') rejected_status.colour = 'FF3300' accepted_status = workflow.add_status(_('Accepted'), 'accepted') accepted_status.colour = '66CCFF' finished_status = workflow.add_status(_('Finished'), 'finished') finished_status.colour = 'CCCCCC' commentable = CommentableWorkflowStatusItem() commentable.id = '_commentable' commentable.by = ['_submitter', '_receiver'] from wcs.wf.jump import JumpWorkflowStatusItem jump_to_new = JumpWorkflowStatusItem() jump_to_new.id = '_jump_to_new' jump_to_new.status = new_status.id jump_to_new.parent = just_submitted_status notify_new_receiver_email = SendmailWorkflowStatusItem() notify_new_receiver_email.id = '_notify_new_receiver_email' notify_new_receiver_email.to = ['_receiver'] notify_new_receiver_email.subject = EmailsDirectory.get_subject('new_receiver') notify_new_receiver_email.body = EmailsDirectory.get_body('new_receiver') if not EmailsDirectory.is_enabled('new_receiver'): notify_new_receiver_email = None notify_new_user_email = SendmailWorkflowStatusItem() notify_new_user_email.id = '_notify_new_user_email' notify_new_user_email.to = ['_submitter'] notify_new_user_email.subject = EmailsDirectory.get_subject('new_user') notify_new_user_email.body = EmailsDirectory.get_body('new_user') if not EmailsDirectory.is_enabled('new_user'): notify_change_user_email = None notify_change_receiver_email = SendmailWorkflowStatusItem() notify_change_receiver_email.id = '_notify_change_receiver_email' notify_change_receiver_email.to = ['_receiver'] notify_change_receiver_email.subject = EmailsDirectory.get_subject('change_receiver') notify_change_receiver_email.body = EmailsDirectory.get_body('change_receiver') if not EmailsDirectory.is_enabled('change_receiver'): notify_change_receiver_email = None notify_change_user_email = SendmailWorkflowStatusItem() notify_change_user_email.id = '_notify_change_user_email' notify_change_user_email.to = ['_submitter'] notify_change_user_email.subject = EmailsDirectory.get_subject('change_user') notify_change_user_email.body = EmailsDirectory.get_body('change_user') if not EmailsDirectory.is_enabled('change_user'): notify_change_user_email = None if notify_new_receiver_email: notify_new_receiver_email.parent = just_submitted_status just_submitted_status.items.append(notify_new_receiver_email) if notify_new_user_email: notify_new_user_email.parent = just_submitted_status just_submitted_status.items.append(notify_new_user_email) just_submitted_status.items.append(jump_to_new) if notify_change_receiver_email: accepted_status.items.append(notify_change_receiver_email) notify_change_receiver_email.parent = accepted_status notify_change_receiver_email = copy.copy(notify_change_receiver_email) rejected_status.items.append(notify_change_receiver_email) notify_change_receiver_email.parent = rejected_status notify_change_receiver_email = copy.copy(notify_change_receiver_email) finished_status.items.append(notify_change_receiver_email) notify_change_receiver_email.parent = finished_status if notify_change_user_email: accepted_status.items.append(notify_change_user_email) notify_change_user_email.parent = accepted_status notify_change_user_email = copy.copy(notify_change_user_email) rejected_status.items.append(notify_change_user_email) notify_change_user_email.parent = rejected_status notify_change_user_email = copy.copy(notify_change_user_email) finished_status.items.append(notify_change_user_email) notify_change_user_email.parent = finished_status new_status.items.append(commentable) commentable.parent = new_status commentable = copy.copy(commentable) accepted_status.items.append(commentable) commentable.parent = accepted_status accept = ChoiceWorkflowStatusItem() accept.id = '_accept' accept.label = _('Accept') accept.by = ['_receiver'] accept.status = accepted_status.id accept.parent = new_status new_status.items.append(accept) reject = ChoiceWorkflowStatusItem() reject.id = '_reject' reject.label = _('Reject') reject.by = ['_receiver'] reject.status = rejected_status.id reject.parent = new_status new_status.items.append(reject) finish = ChoiceWorkflowStatusItem() finish.id = '_finish' finish.label = _('Finish') finish.by = ['_receiver'] finish.status = finished_status.id finish.parent = accepted_status accepted_status.items.append(finish) return workflow def formdefs(self, **kwargs): return list(FormDef.select(lambda x: x.workflow_id == self.id, **kwargs)) class XmlSerialisable(object): node_name = None key = None def export_to_xml(self, charset, include_id=False): node = ET.Element(self.node_name) if self.key: node.attrib['type'] = self.key if include_id and getattr(self, 'id', None): node.attrib['id'] = self.id for attribute in self.get_parameters(): if getattr(self, '%s_export_to_xml' % attribute, None): getattr(self, '%s_export_to_xml' % attribute)(node, charset, include_id=include_id) continue if hasattr(self, attribute) and getattr(self, attribute) is not None: el = ET.SubElement(node, attribute) val = getattr(self, attribute) if type(val) is dict: for k, v in val.items(): ET.SubElement(el, k).text = force_text(v, charset, errors='replace') elif type(val) is list: if attribute[-1] == 's': atname = attribute[:-1] else: atname = 'item' for v in val: ET.SubElement(el, atname).text = force_text(str(v), charset, errors='replace') elif isinstance(val, six.string_types): el.text = force_text(val, charset, errors='replace') else: el.text = str(val) return node def init_with_xml(self, elem, charset, include_id=False): if include_id and elem.attrib.get('id'): self.id = elem.attrib.get('id') for attribute in self.get_parameters(): el = elem.find(attribute) if getattr(self, '%s_init_with_xml' % attribute, None): getattr(self, '%s_init_with_xml' % attribute)(el, charset, include_id=include_id) continue if el is None: continue if list(el): if type(getattr(self, attribute)) is list: v = [xml_node_text(x) or '' for x in el] elif type(getattr(self, attribute)) is dict: v = {} for e in el: v[e.tag] = xml_node_text(e) else: # ??? raise AssertionError setattr(self, attribute, v) else: if el.text is None: setattr(self, attribute, None) elif el.text in ('False', 'True') and not isinstance(getattr(self, attribute), six.string_types): # booleans setattr(self, attribute, el.text == 'True') elif type(getattr(self, attribute)) is int: setattr(self, attribute, int(el.text)) else: setattr(self, attribute, xml_node_text(el)) def _roles_export_to_xml(self, attribute, item, charset, include_id=False): if not hasattr(self, attribute) or not getattr(self, attribute): return el = ET.SubElement(item, attribute) for role_id in getattr(self, attribute): if role_id is None: continue role_id = str(role_id) if role_id.startswith('_') or role_id == 'logged-users': role = force_text(role_id, charset) else: try: role = force_text(Role.get(role_id).name, charset) except KeyError: role = force_text(role_id, charset) sub = ET.SubElement(el, 'item') sub.attrib['role_id'] = role_id sub.text = role def _roles_init_with_xml(self, attribute, elem, charset, include_id=False): if elem is None: setattr(self, attribute, []) else: imported_roles = [] for child in elem: imported_roles.append(self._get_role_id_from_xml(child, charset, include_id=include_id)) setattr(self, attribute, imported_roles) def _role_export_to_xml(self, attribute, item, charset, include_id=False): if not hasattr(self, attribute) or not getattr(self, attribute): return role_id = str(getattr(self, attribute)) if role_id.startswith('_') or role_id == 'logged-users': role = force_text(role_id, charset) else: try: role = force_text(Role.get(role_id).name, charset) except KeyError: role_id = role = force_text(role_id, charset) sub = ET.SubElement(item, attribute) if include_id: sub.attrib['role_id'] = role_id sub.text = role def _get_role_id_from_xml(self, elem, charset, include_id=False): if elem is None: return None value = xml_node_text(elem) or '' # look for known static values if value.startswith('_') or value == 'logged-users': return value # if we import using id, look at the role_id attribute if include_id and 'role_id' in elem.attrib: role_id = force_str(elem.attrib['role_id']) if Role.has_key(role_id): return role_id if WorkflowStatusItem.get_expression(role_id)['type'] in ('python', 'template'): return role_id # if not using id, look up on the name for role in Role.select(ignore_errors=True): if role.name == value: return role.id # if a computed value is possible and value looks like # an expression, use it if WorkflowStatusItem.get_expression(value)['type'] in ('python', 'template'): return value # if the roles are managed by the idp, don't try further. if get_publisher() and get_cfg('sp', {}).get('idp-manage-roles') is True: raise WorkflowImportError(N_('Unknown referenced role (%s)'), (value,)) # and if there's no match, create a new role role = Role() role.name = value role.store() return role.id def _role_init_with_xml(self, attribute, elem, charset, include_id=False): setattr(self, attribute, self._get_role_id_from_xml(elem, charset, include_id=include_id)) class WorkflowGlobalActionTrigger(XmlSerialisable): node_name = 'trigger' def submit_admin_form(self, form): for f in self.get_parameters(): widget = form.get_widget(f) if widget: value = widget.parse() if hasattr(self, '%s_parse' % f): value = getattr(self, '%s_parse' % f)(value) setattr(self, f, value) def get_subdirectories(self, formdata): return [] class WorkflowGlobalActionManualTrigger(WorkflowGlobalActionTrigger): key = 'manual' roles = None def get_parameters(self): return ('roles',) def render_as_line(self): if self.roles: return _('Manual by %s') % render_list_of_roles( self.parent.parent, self.roles) else: return _('Manual (not assigned)') def form(self, workflow): form = Form(enctype='multipart/form-data') options = [(None, '---', None)] options += workflow.get_list_of_roles(include_logged_in_users=False) form.add(WidgetList, 'roles', title=_('Roles'), element_type=SingleSelectWidget, value=self.roles, add_element_label=_('Add Role'), element_kwargs={'render_br': False, 'options': options}) return form def roles_export_to_xml(self, item, charset, include_id=False): self._roles_export_to_xml('roles', item, charset, include_id=include_id) def roles_init_with_xml(self, elem, charset, include_id=False): self._roles_init_with_xml('roles', elem, charset, include_id=include_id) class WorkflowGlobalActionTimeoutTriggerMarker(object): def __init__(self, timeout_id): self.timeout_id = timeout_id class WorkflowGlobalActionTimeoutTrigger(WorkflowGlobalActionTrigger): key = 'timeout' anchor = None anchor_expression = '' anchor_status_first = None anchor_status_latest = None timeout = None def get_parameters(self): return ('anchor', 'anchor_expression', 'anchor_status_first', 'anchor_status_latest', 'timeout') def get_anchor_labels(self): return collections.OrderedDict([ ('creation', _('Creation')), ('1st-arrival', _('First arrival in status')), ('latest-arrival', _('Latest arrival in status')), ('finalized', _('Arrival in final status')), ('python', _('Python expression')), ]) def properly_configured(self): workflow = self.parent.parent if not (self.anchor and self.timeout): return False if self.anchor == '1st-arrival' and self.anchor_status_first: try: workflow.get_status(self.anchor_status_first) except KeyError: return False if self.anchor == 'latest-arrival' and self.anchor_status_latest: try: workflow.get_status(self.anchor_status_latest) except KeyError: return False return True def render_as_line(self): if self.properly_configured(): return _('Automatic, %(timeout)s, relative to: %(anchor)s') % { 'anchor': self.get_anchor_labels().get(self.anchor).lower(), 'timeout': _('%s days') % self.timeout} else: return _('Automatic (not configured)') def form(self, workflow): form = Form(enctype='multipart/form-data') options = list(self.get_anchor_labels().items()) form.add(SingleSelectWidget, 'anchor', title=_('Reference Date'), options=options, value=self.anchor, required=True, attrs={'data-dynamic-display-parent': 'true'}) form.add(StringWidget, 'anchor_expression', title=_('Expression'), size=80, value=self.anchor_expression, hint=_('This will only apply to open forms.'), attrs={'data-dynamic-display-child-of': 'anchor', 'data-dynamic-display-value': _('Python expression')}) possible_status = [(None, _('Current Status'), None)] possible_status.extend([('wf-%s' % x.id, x.name, x.id) for x in workflow.possible_status]) form.add(SingleSelectWidget, 'anchor_status_first', title=_('Status'), options=possible_status, value=self.anchor_status_first, attrs={'data-dynamic-display-child-of': 'anchor', 'data-dynamic-display-value': _('First arrival in status')} ) form.add(SingleSelectWidget, 'anchor_status_latest', title=_('Status'), options=possible_status, value=self.anchor_status_latest, attrs={'data-dynamic-display-child-of': 'anchor', 'data-dynamic-display-value': _('Latest arrival in status')} ) form.add(ValidatedStringWidget, 'timeout', title=_('Delay (in days)'), value=self.timeout, regex=r'^-?\d+$', required=True, hint=_(''' Number of days relative to the reference date. If the reference date is computed from an expression, a negative delay is accepted to trigger the action before the date.''')) return form def must_trigger(self, formdata, endpoint_status_ids): if formdata.status in endpoint_status_ids: if not ((self.anchor == '1st-arrival' and self.anchor_status_first in endpoint_status_ids) or ( self.anchor == 'latest-arrival' and self.anchor_status_latest in endpoint_status_ids) or ( self.anchor == 'finalized')): # don't trigger on finalized formdata (unless explicit anchor point) return False anchor_date = None if self.anchor == 'creation': anchor_date = formdata.receipt_time elif self.anchor == '1st-arrival': anchor_status = self.anchor_status_first or formdata.status for evolution in formdata.evolution: if evolution.status == anchor_status: anchor_date = evolution.last_jump_datetime or evolution.time break elif self.anchor == 'latest-arrival': anchor_status = self.anchor_status_latest or formdata.status latest_no_status_evolution = None for evolution in reversed(formdata.evolution): if evolution.status == anchor_status: if latest_no_status_evolution: evolution = latest_no_status_evolution anchor_date = evolution.last_jump_datetime or evolution.time break elif evolution.status: latest_no_status_evolution = None elif latest_no_status_evolution is None: latest_no_status_evolution = evolution elif self.anchor == 'finalized': if formdata.status in endpoint_status_ids: for evolution in reversed(formdata.evolution): if not evolution.status: continue if evolution.status in endpoint_status_ids: anchor_date = evolution.time else: break elif self.anchor == 'python': variables = get_publisher().substitutions.get_context_variables() try: anchor_date = eval(self.anchor_expression, get_publisher().get_global_eval_dict(), variables) except: # get the variables in the locals() namespace so they are # displayed within the trace. expression = self.anchor_expression global_variables = get_publisher().get_global_eval_dict() get_publisher().notify_of_exception(sys.exc_info(), context='[TIMEOUTS]') # convert anchor_date to datetime.datetime() if isinstance(anchor_date, datetime.datetime): pass elif isinstance(anchor_date, datetime.date): anchor_date = datetime.datetime(year=anchor_date.year, month=anchor_date.month, day=anchor_date.day) elif isinstance(anchor_date, time.struct_time): anchor_date = datetime.datetime.fromtimestamp(time.mktime(anchor_date)) elif isinstance(anchor_date, six.string_types) and anchor_date: try: anchor_date = get_as_datetime(anchor_date) except ValueError: get_publisher().notify_of_exception(sys.exc_info(), context='[TIMEOUTS]') anchor_date = None elif anchor_date: # timestamp try: anchor_date = datetime.datetime.fromtimestamp(anchor_date) except TypeError: get_publisher().notify_of_exception(sys.exc_info(), context='[TIMEOUTS]') anchor_date = None if not anchor_date: return False anchor_date = anchor_date + datetime.timedelta(days=int(self.timeout)) return bool(datetime.datetime.now() > anchor_date) @classmethod def apply(cls, workflow): triggers = [] for action in workflow.global_actions or []: triggers.extend([(action, x) for x in action.triggers or [] if isinstance(x, WorkflowGlobalActionTimeoutTrigger) and x.properly_configured()]) if not triggers: return not_endpoint_status = workflow.get_not_endpoint_status() not_endpoint_status_ids = ['wf-%s' % x.id for x in not_endpoint_status] endpoint_status = workflow.get_endpoint_status() endpoint_status_ids = ['wf-%s' % x.id for x in endpoint_status] # check if triggers are defined relative to terminal status run_on_finalized = False for action, trigger in triggers: if trigger.anchor == 'finalized': run_on_finalized = True elif (trigger.anchor == 'creation' and workflow.possible_status and workflow.possible_status[0] in endpoint_status): run_on_finalized = True elif (trigger.anchor == '1st-arrival' and trigger.anchor_status_first and workflow.get_status(trigger.anchor_status_first) in endpoint_status): run_on_finalized = True elif (trigger.anchor == 'latest-arrival' and trigger.anchor_status_latest and workflow.get_status(trigger.anchor_status_latest) in endpoint_status): run_on_finalized = True criterias = [NotEqual('status', 'draft'), Null('anonymised')] if not run_on_finalized: # limit to formdata that are not finalized criterias.append(Contains('status', not_endpoint_status_ids)) for formdef in workflow.formdefs(): open_formdata_ids = [] data_class = formdef.data_class() for formdata in data_class.select(criterias, iterator=True): get_publisher().substitutions.reset() get_publisher().substitutions.feed(get_publisher()) get_publisher().substitutions.feed(formdef) get_publisher().substitutions.feed(formdata) seen_triggers = [] for part in formdata.iter_evolution_parts(): if not isinstance(part, WorkflowGlobalActionTimeoutTriggerMarker): continue seen_triggers.append(part.timeout_id) for action, trigger in triggers: if trigger.id in seen_triggers: continue # already triggered if trigger.must_trigger(formdata, endpoint_status_ids): if not formdata.evolution: continue formdata.evolution[-1].add_part( WorkflowGlobalActionTimeoutTriggerMarker(trigger.id)) formdata.store() perform_items(action.items, formdata) break class WorkflowGlobalActionWebserviceTrigger(WorkflowGlobalActionManualTrigger): key = 'webservice' identifier = None roles = None def get_parameters(self): return ('identifier', 'roles') def render_as_line(self): if self.identifier: return _('Webservice (%s)') % self.identifier else: return _('Webservice (not configured)') def form(self, workflow): form = Form(enctype='multipart/form-data') form.add(StringWidget, 'identifier', title=_('Identifier'), required=True, value=self.identifier) options = [(None, '---', None)] options += workflow.get_list_of_roles(include_logged_in_users=True) form.add(WidgetList, 'roles', title=_('Roles'), element_type=SingleSelectWidget, value=self.roles, add_element_label=_('Add Role'), element_kwargs={'render_br': False, 'options': options}) return form def get_subdirectories(self, formdata): from wcs.forms.workflows import WorkflowGlobalActionWebserviceHooksDirectory return [('hooks', WorkflowGlobalActionWebserviceHooksDirectory(formdata))] class WorkflowGlobalAction(object): id = None name = None items = None triggers = None backoffice_info_text = None def __init__(self, name=None): self.name = name self.items = [] def append_item(self, type): for klass in item_classes: if klass.key == type: o = klass() if self.items: o.id = str(max([lax_int(x.id) for x in self.items]) + 1) else: o.id = '1' self.items.append(o) return o else: raise KeyError() def append_trigger(self, type): trigger_types = { 'manual': WorkflowGlobalActionManualTrigger, 'timeout': WorkflowGlobalActionTimeoutTrigger, 'webservice': WorkflowGlobalActionWebserviceTrigger, } o = trigger_types.get(type)() if not self.triggers: self.triggers = [] o.id = str(uuid.uuid4()) self.triggers.append(o) return o def export_to_xml(self, charset, include_id=False): status = ET.Element('action') ET.SubElement(status, 'id').text = force_text(self.id, charset) ET.SubElement(status, 'name').text = force_text(self.name, charset) if self.backoffice_info_text: ET.SubElement(status, 'backoffice_info_text').text = force_text( self.backoffice_info_text, charset) items = ET.SubElement(status, 'items') for item in self.items: items.append(item.export_to_xml(charset=charset, include_id=include_id)) triggers = ET.SubElement(status, 'triggers') for trigger in self.triggers or []: triggers.append(trigger.export_to_xml(charset=charset, include_id=include_id)) return status def init_with_xml(self, elem, charset, include_id=False): self.id = xml_node_text(elem.find('id')) self.name = xml_node_text(elem.find('name')) if elem.find('backoffice_info_text') is not None: self.backoffice_info_text = xml_node_text(elem.find('backoffice_info_text')) self.items = [] for item in elem.find('items'): item_type = item.attrib['type'] self.append_item(item_type) item_o = self.items[-1] item_o.parent = self item_o.init_with_xml(item, charset, include_id=include_id) self.triggers = [] for trigger in elem.find('triggers'): trigger_type = trigger.attrib['type'] self.append_trigger(trigger_type) trigger_o = self.triggers[-1] trigger_o.parent = self trigger_o.init_with_xml(trigger, charset, include_id=include_id) class WorkflowCriticalityLevel(object): id = None name = None colour = None def __init__(self, name=None, colour=None): self.name = name self.colour = colour self.id = str(random.randint(0, 100000)) def export_to_xml(self, charset, include_id=False): level = ET.Element('criticality-level') ET.SubElement(level, 'id').text = force_text(self.id, charset) if self.id else '' ET.SubElement(level, 'name').text = force_text(self.name, charset) if self.colour: ET.SubElement(level, 'colour').text = force_text(self.colour, charset) return level def init_with_xml(self, elem, charset, include_id=False): self.id = xml_node_text(elem.find('id')) self.name = xml_node_text(elem.find('name')) if elem.find('colour') is not None: self.colour = xml_node_text(elem.find('colour')) class WorkflowStatus(object): id = None name = None items = None visibility = None forced_endpoint = False colour = 'FFFFFF' backoffice_info_text = None extra_css_class = '' def __init__(self, name = None): self.name = name self.items = [] def __eq__(self, other): if other is None: return False # this assumes both status are from the same workflow if type(other) is str: other_id = other else: other_id = other.id return self.id == other_id def migrate(self): changed = False for item in self.items: changed |= item.migrate() return changed def append_item(self, type): for klass in item_classes: if klass.key == type: o = klass() if self.items: o.id = str(max([lax_int(x.id) for x in self.items]) + 1) else: o.id = '1' self.items.append(o) break else: raise KeyError() def get_item(self, id): for item in self.items: if item.id == id: return item raise KeyError() def get_action_form(self, filled, user, displayed_fields=None): form = Form(enctype='multipart/form-data', use_tokens=False) form.attrs['id'] = 'wf-actions' for item in self.items: if not item.check_auth(filled, user): continue if not item.check_condition(filled): continue item.fill_form(form, filled, user, displayed_fields=displayed_fields) for action in filled.formdef.workflow.get_global_actions_for_user(filled, user): form.add_submit('button-action-%s' % action.id, action.name) if form.get_widget('button-action-%s' % action.id): form.get_widget('button-action-%s' % action.id).backoffice_info_text = action.backoffice_info_text if form.widgets or form.submit_widgets: return form else: return None def get_active_items(self, form, filled, user): for item in self.items: if hasattr(item, 'by'): for role in item.by or []: if role == logged_users_role().id: break if role == '_submitter': if filled.is_submitter(user): break else: continue if user is None: continue role = get_role_translation(filled, role) if role in user.get_roles(): break else: continue if not item.check_condition(filled): continue yield item def evaluate_live_form(self, form, filled, user): for item in self.get_active_items(form, filled, user): item.evaluate_live_form(form, filled, user) def handle_form(self, form, filled, user): # check for global actions for action in filled.formdef.workflow.get_global_actions_for_user(filled, user): if 'button-action-%s' % action.id in get_request().form: url = perform_items(action.items, filled) if url: return url return evo = Evolution() evo.time = time.localtime() if user: if filled.is_submitter(user): evo.who = '_submitter' else: evo.who = user.id if not filled.evolution: filled.evolution = [] for item in self.get_active_items(form, filled, user): next_url = item.submit_form(form, filled, user, evo) if next_url is True: break if next_url: if not form.has_errors(): filled.evolution.append(evo) if evo.status: filled.status = evo.status filled.store() return next_url if form.has_errors(): return filled.evolution.append(evo) if evo.status: filled.status = evo.status filled.store() url = filled.perform_workflow() if url: return url def get_subdirectories(self, formdata): subdirectories = [] for item in self.items: if item.directory_name: subdirectories.append((item.directory_name, item.directory_class(formdata, item, self))) return subdirectories def get_visibility_restricted_roles(self): if not self.visibility: # no restriction -> visible return [] return self.visibility def is_visible(self, formdata, user): if not self.visibility: # no restriction -> visible return True if get_request() and get_request().is_in_frontoffice(): # always hide in front return False if user and user.is_admin: return True if user: user_roles = set(user.get_roles()) user_roles.add(logged_users_role().id) else: user_roles = set([]) visibility_roles = self.visibility[:] for item in self.items or []: if not hasattr(item, 'by') or not item.by: continue visibility_roles.extend(item.by) for role in visibility_roles: if role != '_submitter': role = get_role_translation(formdata, role) if role in user_roles: return True return False def is_endpoint(self): # an endpoint status is a status that marks the end of the workflow; it # can either be computed automatically (if there's no way out of the # status) or be set manually (to mark the expected end while still # allowing to go back and re-enter the workflow). if self.forced_endpoint: return True endpoint = True for item in self.items: endpoint = endpoint and item.endpoint if endpoint is False: break return endpoint def is_waitpoint(self): # a waitpoint status is a status waiting for an event (be it user # interaction or something else), but can also be an endpoint (where # the user would wait, infinitely). waitpoint = False endpoint = True if self.forced_endpoint: endpoint = True else: for item in self.items: endpoint = item.endpoint and endpoint waitpoint = item.waitpoint or waitpoint return bool(endpoint or waitpoint) def get_contrast_color(self): colour = self.colour or 'ffffff' return misc.get_foreground_colour(colour) def __getstate__(self): odict = self.__dict__.copy() if 'parent' in odict: del odict['parent'] return odict def export_to_xml(self, charset, include_id=False): status = ET.Element('status') ET.SubElement(status, 'id').text = force_text(self.id, charset) ET.SubElement(status, 'name').text = force_text(self.name, charset) ET.SubElement(status, 'colour').text = force_text(self.colour, charset) if self.extra_css_class: ET.SubElement(status, 'extra_css_class').text = force_text(self.extra_css_class, charset) if self.forced_endpoint: ET.SubElement(status, 'forced_endpoint').text = 'true' if self.backoffice_info_text: ET.SubElement(status, 'backoffice_info_text').text = force_text( self.backoffice_info_text, charset) visibility_node = ET.SubElement(status, 'visibility') for role in self.visibility or []: ET.SubElement(visibility_node, 'role').text = str(role) items = ET.SubElement(status, 'items') for item in self.items: items.append(item.export_to_xml(charset=charset, include_id=include_id)) return status def init_with_xml(self, elem, charset, include_id=False): self.id = xml_node_text(elem.find('id')) self.name = xml_node_text(elem.find('name')) if elem.find('colour') is not None: self.colour = xml_node_text(elem.find('colour')) if elem.find('extra_css_class') is not None: self.extra_css_class = xml_node_text(elem.find('extra_css_class')) if elem.find('forced_endpoint') is not None: self.forced_endpoint = (elem.find('forced_endpoint').text == 'true') if elem.find('backoffice_info_text') is not None: self.backoffice_info_text = xml_node_text(elem.find('backoffice_info_text')) self.visibility = [] for visibility_role in elem.findall('visibility/role'): self.visibility.append(visibility_role.text) self.items = [] for item in elem.find('items'): item_type = item.attrib['type'] self.append_item(item_type) item_o = self.items[-1] item_o.parent = self item_o.init_with_xml(item, charset, include_id=include_id) def __repr__(self): return '<%s %s %r>' % (self.__class__.__name__, self.id, self.name) class WorkflowStatusItem(XmlSerialisable): node_name = 'item' description = 'XX' category = None # (key, label) id = None condition = None endpoint = True # means it's not possible to interact, and/or cause a status change waitpoint = False # means it's possible to wait (user interaction, or other event) ok_in_global_action = True # means it can be used in a global action directory_name = None directory_class = None support_substitution_variables = False @classmethod def init(cls): pass @classmethod def is_available(cls, workflow=None): return True def migrate(self): changed = False for roles_attribute in ('to', 'by'): attribute_value = getattr(self, roles_attribute, []) or [] if any((x for x in attribute_value if type(x) is int)): setattr(self, roles_attribute, [str(x) for x in attribute_value]) changed = True return changed def render_as_line(self): label = _(self.description) details = self.get_line_details() if details: label += ' (%s)' % details if self.condition and self.condition.get('value'): label += ' (%s)' % _('conditional') return label def get_line_details(self): return None def render_list_of_roles(self, roles): return self.parent.parent.render_list_of_roles(roles) def get_list_of_roles(self, include_logged_in_users=True): return self.parent.parent.get_list_of_roles(include_logged_in_users=include_logged_in_users) def perform(self, formdata): pass def fill_form(self, form, formdata, user, **kwargs): pass def evaluate_live_form(self, form, formdata, user): pass def submit_form(self, form, formdata, user, evo): pass def check_auth(self, formdata, user): if not hasattr(self, 'by'): return True for role in self.by or []: if user and role == logged_users_role().id: return True if role == '_submitter': t = formdata.is_submitter(user) if t is True: return True continue if not user: continue role = get_role_translation(formdata, role) if role in user.get_roles(): return True return False def check_condition(self, formdata): context = {'formdata': formdata, 'status_item': self} try: return Condition(self.condition, context).evaluate() except RuntimeError: return False def add_parameters_widgets(self, form, parameters, prefix='', formdef=None): if 'condition' in parameters: form.add(ConditionWidget, '%scondition' % prefix, title=_('Condition of execution of the action'), value=self.condition, size=40, advanced=not(self.condition)) if 'attachments' in parameters: attachments_options, attachments = self.get_attachments_options() if len(attachments_options) > 1: form.add(WidgetList, '%sattachments' % prefix, title=_('Attachments'), element_type=SingleSelectWidgetWithOther, value=attachments, add_element_label=_('Add attachment'), element_kwargs={'render_br': False, 'options': attachments_options}) else: form.add(WidgetList, '%sattachments' % prefix, title=_('Attachments (Python expressions)'), element_type=StringWidget, value=attachments, add_element_label=_('Add attachment'), element_kwargs={'render_br': False, 'size': 50}, advanced=not(bool(attachments))) def get_parameters(self): return ('condition',) def get_parameters_view(self): r = TemplateIO(html=True) form = Form() parameters = [x for x in self.get_parameters() if getattr(self, x, None) is not None] for parameter in parameters: self.add_parameters_widgets(form, [parameter]) r += htmltext('

') return r.getvalue() def get_backoffice_info_text_parameter_view_value(self): return htmltext('
%s
') % self.backoffice_info_text def get_by_parameter_view_value(self): return self.render_list_of_roles(self.by) def get_timeout_parameter_view_value(self): try: return seconds2humanduration(int(self.timeout or 0)) except ValueError: return self.timeout # probably an expression def get_status_parameter_view_value(self): for status in self.parent.parent.possible_status: if status.id == self.status: return htmltext('%s') % (status.id, status.name) return _('Unknown (%s)') % self.status def get_parameter_view_value(self, widget, parameter): if hasattr(self, 'get_%s_parameter_view_value' % parameter): return getattr(self, 'get_%s_parameter_view_value' % parameter)() value = getattr(self, parameter) if type(value) is bool: return _('Yes') if value else _('No') elif hasattr(widget, 'options') and value: for option in widget.options: if isinstance(option, tuple): if option[0] == value: return option[1] else: if option == value: return option return '-' else: return str(value) def fill_admin_form(self, form): for parameter in self.get_parameters(): self.add_parameters_widgets(form, [parameter]) def submit_admin_form(self, form): for f in self.get_parameters(): widget = form.get_widget(f) if widget: value = widget.parse() if hasattr(self, '%s_parse' % f): value = getattr(self, '%s_parse' % f)(value) setattr(self, f, value) @classmethod def get_expression(cls, var): if not var: expression_type = 'text' expression_value = '' elif var.startswith('='): expression_type = 'python' expression_value = var[1:] elif '{{' in var or '{%' in var or '[' in var: expression_type = 'template' expression_value = var else: expression_type = 'text' expression_value = var return {'type': expression_type, 'value': expression_value} @classmethod def compute(cls, var, render=True, raises=False, context=None, formdata=None, status_item=None): if not isinstance(var, six.string_types): return var expression = cls.get_expression(var) if expression['type'] != 'python' and not render: return var if expression['type'] == 'text': return expression['value'] vars = get_publisher().substitutions.get_context_variables( 'lazy' if expression['type'] == 'template' else None) vars.update(context or {}) def log_exception(exception): from wcs.logged_errors import LoggedError if expression['type'] == 'template': summary = _('Failed to compute template') else: summary = _('Failed to compute Python expression') LoggedError.record(summary, formdata=formdata, status_item=status_item, expression=expression['value'], expression_type=expression['type'], exception=exception) if expression['type'] == 'template': try: return Template(expression['value'], raises=raises, autoescape=False).render(vars) except TemplateError as e: log_exception(e) if raises: raise return var try: return eval(expression['value'], get_publisher().get_global_eval_dict(), vars) except Exception as e: log_exception(e) if raises: raise return var def get_computed_role_id(self, role_id): new_role_id = self.compute(str(role_id)) if Role.has_key(new_role_id): return new_role_id # computed value, not an id, try to get role by slug new_role = Role.get_on_index(new_role_id, 'slug', ignore_errors=True) if new_role: return new_role.id # fallback to role label for role in Role.select(): if role.name == new_role_id: return role.id return None def get_substitution_variables(self, formdata): return {} def get_target_status(self, formdata=None): """Returns a list of status this item can lead to.""" if not getattr(self, 'status', None): return [] if self.status == '_previous': if formdata is None: # must be in a formdata to compute destination, just give a # fake status for presentation purpose return [WorkflowStatus(_('Previously Marked Status'))] previous_status = formdata.pop_previous_marked_status() if previous_status: return [previous_status] return [] targets = [x for x in self.parent.parent.possible_status if x.id == self.status] if not targets and formdata: # do not log in presentation context: formdata is needed from wcs.logged_errors import LoggedError message = _('reference to invalid status %(target)s in status %(status)s, ' 'action %(status_item)s') % { 'target': self.status, 'status': self.parent.name, 'status_item': _(self.description) } LoggedError.record(message, formdata=formdata, status_item=self) return targets def get_jump_label(self, target_id): '''Return the label to use on a workflow graph arrow''' if getattr(self, 'label', None): label = self.label if getattr(self, 'by', None): roles = self.parent.parent.render_list_of_roles(self.by) label += ' %s %s' % (_('by'), roles) if getattr(self, 'status', None) == '_previous': label += ' ' + _('(to last marker)') if getattr(self, 'set_marker_on_status', False): label += ' ' + _('(and set marker)') else: label = self.render_as_line() return label def get_backoffice_filefield_options(self): options = [] for field in self.parent.parent.get_backoffice_fields(): if field.key == 'file': options.append((field.id, field.label, field.id)) return options def store_in_backoffice_filefield(self, formdata, backoffice_filefield_id, filename, content_type, content): filefield = [x for x in self.parent.parent.get_backoffice_fields() if x.id == backoffice_filefield_id and x.key == 'file'] if filefield: upload = PicklableUpload(filename, content_type) upload.receive([content]) formdata.data[backoffice_filefield_id] = upload formdata.store() def by_export_to_xml(self, item, charset, include_id=False): self._roles_export_to_xml('by', item, charset, include_id=include_id) def by_init_with_xml(self, elem, charset, include_id=False): self._roles_init_with_xml('by', elem, charset, include_id=include_id) def to_export_to_xml(self, item, charset, include_id=False): self._roles_export_to_xml('to', item, charset, include_id=include_id) def to_init_with_xml(self, elem, charset, include_id=False): self._roles_init_with_xml('to', elem, charset, include_id) def condition_init_with_xml(self, node, charset, include_id=False): self.condition = None if node is None: return if node.findall('type'): self.condition = { 'type': xml_node_text(node.find('type')), 'value': xml_node_text(node.find('value')), } elif node.text: # backward compatibility self.condition = {'type': 'python', 'value': xml_node_text(node)} def q_admin_lookup(self, workflow, status, component, html_top): return None def __getstate__(self): odict = self.__dict__.copy() if 'parent' in odict: del odict['parent'] return odict def attachments_init_with_xml(self, elem, charset, include_id=False): if elem is None: self.attachments = None else: self.attachments = [xml_node_text(item) for item in elem.findall('attachment')] def get_attachments_options(self): attachments_options = [(None, '---', None)] varnameless = [] for field in self.parent.parent.get_backoffice_fields(): if field.key != 'file': continue if field.varname: codename = 'form_var_%s_raw' % field.varname else: codename = 'form_f%s' % field.id.replace('-', '_') # = form_fbo<...> varnameless.append(codename) attachments_options.append((codename, field.label, codename)) # filter: do not consider removed fields without varname attachments = [attachment for attachment in self.attachments or [] if ((not attachment.startswith('form_fbo')) or (attachment in varnameless))] return attachments_options, attachments def convert_attachments_to_uploads(self): uploads = [] if self.attachments: global_eval_dict = get_publisher().get_global_eval_dict() local_eval_dict = get_publisher().substitutions.get_context_variables() for attachment in self.attachments: if attachment.startswith('form_fbo') and '-' in attachment: # detect varname-less backoffice fields that were set # before #33366 was fixed, and fix them. attachment = attachment.replace('-', '_') try: # execute any Python expression # and magically convert string like 'form_var_*_raw' to a PicklableUpload picklableupload = eval(attachment, global_eval_dict, local_eval_dict) except: get_publisher().notify_of_exception(sys.exc_info(), context='[workflow/attachments]') continue if not picklableupload: continue try: # convert any value to a PicklableUpload; it will ususally # be a dict like one provided by qommon/evalutils:attachment() picklableupload = FileField.convert_value_from_anything(picklableupload) except ValueError: get_publisher().notify_of_exception(sys.exc_info(), context='[workflow/attachments]') continue uploads.append(picklableupload) return uploads def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.id) class WorkflowStatusJumpItem(WorkflowStatusItem): status = None endpoint = False set_marker_on_status = False category = 'status-change' def add_parameters_widgets(self, form, parameters, prefix='', formdef=None): super(WorkflowStatusJumpItem, self).add_parameters_widgets( form, parameters, prefix=prefix, formdef=formdef) if 'status' in parameters: destinations = [(x.id, x.name) for x in self.parent.parent.possible_status] # look for existing jumps that are dropping a mark workflow = self.parent.parent statuses = getattr(workflow, 'possible_status') or [] global_actions = getattr(workflow, 'global_actions') or [] for status in statuses + global_actions: for item in status.items: if getattr(item, 'set_marker_on_status', False): destinations.append(('_previous', _('Previously Marked Status'))) break else: continue break form.add(SingleSelectWidget, '%sstatus' % prefix, title = _('Status'), value = self.status, options = [(None, '---')] + destinations) if 'set_marker_on_status' in parameters: form.add(CheckboxWidget, '%sset_marker_on_status' % prefix, title=_('Set marker to jump back to current status'), value=self.set_marker_on_status, advanced=not(self.set_marker_on_status)) def handle_markers_stack(self, formdata): if self.set_marker_on_status: if formdata.workflow_data and '_markers_stack' in formdata.workflow_data: markers_stack = formdata.workflow_data.get('_markers_stack') else: markers_stack = [] markers_stack.append({'status_id': formdata.status[3:]}) formdata.update_workflow_data({'_markers_stack': markers_stack}) def get_parameters(self): return ('status', 'set_marker_on_status', 'condition') def get_role_translation(formdata, role_name): if role_name == '_submitter': raise Exception('_submitter is not a valid role') elif str(role_name).startswith('_'): role_id = None if formdata.workflow_roles: role_id = formdata.workflow_roles.get(role_name) if not role_id and formdata.formdef.workflow_roles: role_id = formdata.formdef.workflow_roles.get(role_name) if role_id is None: return role_id return str(role_id) else: return str(role_name) def get_role_translation_label(workflow, role_id): if role_id == logged_users_role().id: return logged_users_role().name if role_id == '_submitter': return C_('role|User') if str(role_id).startswith('_'): return workflow.roles.get(role_id) else: try: return Role.get(role_id).name except KeyError: return def render_list_of_roles(workflow, roles): t = [] for r in roles: role_label = get_role_translation_label(workflow, r) if role_label: t.append(role_label) return ', '.join(t) item_classes = [] def register_item_class(klass): if not klass.key in [x.key for x in item_classes]: item_classes.append(klass) klass.init() class CommentableWorkflowStatusItem(WorkflowStatusItem): description = N_('Comment') key = 'commentable' category = 'interaction' endpoint = False waitpoint = True ok_in_global_action = False required = False varname = None label = None button_label = 0 # hack to handle legacy commentable items hint = None by = [] backoffice_info_text = None def get_line_details(self): if self.by: return _('by %s') % self.render_list_of_roles(self.by) else: return _('not completed') def fill_form(self, form, formdata, user, **kwargs): if not 'comment' in [x.name for x in form.widgets]: if self.label is None: title = _('Comment') else: title = self.label form.add(TextWidget, 'comment', title=title, required=self.required, cols=40, rows=10, hint=self.hint) form.get_widget('comment').attrs['class'] = 'comment' if self.button_label == 0: form.add_submit('button%s' % self.id, _('Add Comment')) elif self.button_label: form.add_submit('button%s' % self.id, self.button_label) if form.get_widget('button%s' % self.id): form.get_widget('button%s' % self.id).backoffice_info_text = self.backoffice_info_text def submit_form(self, form, formdata, user, evo): if form.get_widget('comment'): evo.comment = form.get_widget('comment').parse() if self.varname: workflow_data = {'comment_%s' % self.varname: evo.comment} formdata.update_workflow_data(workflow_data) def submit_admin_form(self, form): for f in self.get_parameters(): widget = form.get_widget(f) if widget: setattr(self, f, widget.parse()) def fill_admin_form(self, form): if self.by and not type(self.by) is list: self.by = None return super(CommentableWorkflowStatusItem, self).fill_admin_form(form) def get_parameters(self): return ('label', 'button_label', 'hint', 'by', 'varname', 'backoffice_info_text', 'required', 'condition') def add_parameters_widgets(self, form, parameters, prefix='', formdef=None): super(CommentableWorkflowStatusItem, self).add_parameters_widgets( form, parameters, prefix=prefix, formdef=formdef) if 'label' in parameters: if self.label is None: self.label = _('Comment') form.add(StringWidget, '%slabel' % prefix, size=40, title=_('Label'), value=self.label) if 'button_label' in parameters: if self.button_label == 0: self.button_label = _('Add Comment') form.add(StringWidget, '%sbutton_label' % prefix, title=_('Button Label'), hint=_('(empty to disable the button)'), value=self.button_label) if 'hint' in parameters: form.add(StringWidget, '%shint' % prefix, size=40, title=_('Hint'), value=self.hint) if 'by' in parameters: form.add(WidgetList, '%sby' % prefix, title = _('By'), element_type=SingleSelectWidget, value=self.by, add_element_label=_('Add Role'), element_kwargs={'render_br': False, 'options': [(None, '---', None)] + self.get_list_of_roles()}) if 'varname' in parameters: form.add(VarnameWidget, '%svarname' % prefix, title=_('Identifier'), value=self.varname, hint=_('This will make the comment available in a variable named comment_ + identifier.')) if 'required' in parameters: form.add(CheckboxWidget, '%srequired' % prefix, title=_('Required'), value=self.required) if 'backoffice_info_text' in parameters: form.add(WysiwygTextWidget, '%sbackoffice_info_text' % prefix, title=_('Information Text for Backoffice'), value=self.backoffice_info_text) def button_label_export_to_xml(self, xml_item, charset, include_id=False): if self.button_label == 0: pass elif self.button_label is None: # button_label being None is a special case meaning "no button", it # should be handled differently than the "not filled" case el = ET.SubElement(xml_item, 'button_label') else: el = ET.SubElement(xml_item, 'button_label') el.text = self.button_label def button_label_init_with_xml(self, element, charset, include_id=False): if element is None: return # this can be None if element is self-closing, , which # then maps to None, meaning "no button". self.button_label = xml_node_text(element) register_item_class(CommentableWorkflowStatusItem) class ChoiceWorkflowStatusItem(WorkflowStatusJumpItem): description = N_('Manual Jump') key = 'choice' endpoint = False waitpoint = True ok_in_global_action = False label = None identifier = None by = [] backoffice_info_text = None require_confirmation = False ignore_form_errors = False def get_label(self): expression = self.get_expression(self.label) if expression['type'] == 'text': return expression['value'] return _('computed label') def get_line_details(self): if self.label: more = '' if self.set_marker_on_status: more += ' ' + _('(and set marker)') if self.by: return _('"%(label)s" by %(by)s%(more)s') % { 'label' : self.get_label(), 'by' : self.render_list_of_roles(self.by), 'more': more } else: return _('"%(label)s"%(more)s') % { 'label': self.get_label(), 'more': more } else: return _('not completed') def fill_form(self, form, formdata, user, **kwargs): label = self.compute(self.label) if not label: return widget = form.add_submit('button%s' % self.id, label) if self.identifier: widget.extra_css_class = 'button-%s' % self.identifier if self.require_confirmation: get_response().add_javascript(['jquery.js', '../../i18n.js', 'qommon.js']) widget.attrs = {'data-ask-for-confirmation': 'true'} widget.backoffice_info_text = self.backoffice_info_text widget.ignore_form_errors = self.ignore_form_errors if self.ignore_form_errors: widget.attrs['formnovalidate'] = 'formnovalidate' def submit_form(self, form, formdata, user, evo): if form.get_submit() == 'button%s' % self.id: wf_status = self.get_target_status(formdata) if wf_status: evo.status = 'wf-%s' % wf_status[0].id self.handle_markers_stack(formdata) form.clear_errors() return True # get out of processing loop def add_parameters_widgets(self, form, parameters, prefix='', formdef=None): super(ChoiceWorkflowStatusItem, self).add_parameters_widgets( form, parameters, prefix=prefix, formdef=formdef) if 'label' in parameters: form.add(ComputedExpressionWidget, '%slabel' % prefix, title=_('Label'), value=self.label) if 'by' in parameters: form.add(WidgetList, '%sby' % prefix, title = _('By'), element_type = SingleSelectWidget, value = self.by, add_element_label = _('Add Role'), element_kwargs={'render_br': False, 'options': [(None, '---', None)] + self.get_list_of_roles()}) if 'require_confirmation' in parameters: form.add(CheckboxWidget, '%srequire_confirmation' % prefix, title=_('Require confirmation'), value=self.require_confirmation) if 'backoffice_info_text' in parameters: form.add(WysiwygTextWidget, '%sbackoffice_info_text' % prefix, title=_('Information Text for Backoffice'), value=self.backoffice_info_text) if 'identifier' in parameters: form.add(VarnameWidget, '%sidentifier' % prefix, title=_('Identifier'), value=self.identifier, advanced=True) if 'ignore_form_errors' in parameters: form.add(CheckboxWidget, '%signore_form_errors' % prefix, title=_('Ignore form errors'), value=self.ignore_form_errors, advanced=True) def get_parameters(self): return ('label', 'by', 'status', 'require_confirmation', 'backoffice_info_text', 'ignore_form_errors', 'set_marker_on_status', 'condition', 'identifier',) register_item_class(ChoiceWorkflowStatusItem) class JumpOnSubmitWorkflowStatusItem(WorkflowStatusJumpItem): description = N_('On Submit Jump') key = 'jumponsubmit' ok_in_global_action = False def get_line_details(self): if self.status: if self.get_target_status(): return _('to %s') % self.get_target_status()[0].name else: return _('broken') else: return _('not completed') def submit_form(self, form, formdata, user, evo): if form.is_submitted() and not form.has_errors(): wf_status = self.get_target_status(formdata) if wf_status: evo.status = 'wf-%s' % wf_status[0].id self.handle_markers_stack(formdata) def get_parameters(self): return ('status', 'set_marker_on_status', 'condition') register_item_class(JumpOnSubmitWorkflowStatusItem) class SendmailWorkflowStatusItem(WorkflowStatusItem): description = N_('Email') key = 'sendmail' category = 'interaction' support_substitution_variables = True to = [] subject = None mail_template = None body = None custom_from = None attachments = None comment = None def _get_role_id_from_xml(self, elem, charset, include_id=False): # override to allow for destination set with computed values. if elem is None: return None value = xml_node_text(elem) if self.get_expression(value)['type'] != 'text' or '@' in value: return value return super(SendmailWorkflowStatusItem, self)._get_role_id_from_xml( elem, charset, include_id=include_id) def render_list_of_roles_or_emails(self, roles): t = [] for r in roles: expression = self.get_expression(r) if expression['type'] in ('python', 'template'): t.append(_('computed value')) elif '@' in expression['value']: t.append(expression['value']) else: role_label = get_role_translation_label(self.parent.parent, r) if role_label: t.append(role_label) return ', '.join(t) def get_to_parameter_view_value(self): return self.render_list_of_roles_or_emails(self.to) def get_line_details(self): if self.to: return _('to %s') % self.render_list_of_roles_or_emails(self.to) else: return _('not completed') def get_parameters(self): return ('to', 'mail_template', 'subject', 'body', 'attachments', 'custom_from', 'condition') def add_parameters_widgets(self, form, parameters, prefix='', formdef=None): super(SendmailWorkflowStatusItem, self).add_parameters_widgets( form, parameters, prefix=prefix, formdef=formdef) subject_body_attrs = {} if 'subject' in parameters or 'body' in parameters: if get_publisher().has_site_option('mail-templates') and MailTemplate.count(): subject_body_attrs = { 'data-dynamic-display-value': '', 'data-dynamic-display-child-of': '%smail_template' % prefix, } if 'to' in parameters: form.add(WidgetList, '%sto' % prefix, title=_('To'), element_type=SingleSelectWidgetWithOther, value=self.to, add_element_label=_('Add Role'), element_kwargs={'render_br': False, 'options': [(None, '---', None)] + self.get_list_of_roles(include_logged_in_users=False)}) if 'subject' in parameters: form.add(StringWidget, '%ssubject' % prefix, title=_('Subject'), validation_function=ComputedExpressionWidget.validate_template, value=self.subject, size=40, attrs=subject_body_attrs) if 'mail_template' in parameters and get_publisher().has_site_option('mail-templates') and MailTemplate.count(): form.add(SingleSelectWidget, '%smail_template' % prefix, title=_('Mail Template'), value=self.mail_template, options=[(None, '', '')] + MailTemplate.get_as_options_list(), attrs={'data-dynamic-display-parent': 'true'} ) if 'body' in parameters: form.add(TextWidget, '%sbody' % prefix, title=_('Body'), value=self.body, cols=80, rows=10, validation_function=ComputedExpressionWidget.validate_template, attrs=subject_body_attrs) if 'custom_from' in parameters: form.add(ComputedExpressionWidget, '%scustom_from' % prefix, title=_('Custom From Address'), value=self.custom_from, advanced=not(bool(self.custom_from))) def get_body_parameter_view_value(self): return htmltext('
%s
') % self.body def perform(self, formdata): if not self.to: return if not (self.subject and self.body) and not self.mail_template: return url = formdata.get_url() body = self.body subject = self.subject if self.mail_template: mail_template = MailTemplate.get_by_slug(self.mail_template) if mail_template: body = mail_template.body subject = mail_template.subject else: from wcs.logged_errors import LoggedError message = _('reference to invalid mail template %(mail_template)s in status %(status)s') % { 'status': self.parent.name, 'mail_template': self.mail_template, } LoggedError.record(message, formdata=formdata, status_item=self) return try: mail_body = template_on_formdata(formdata, self.compute(body, render=False), autoescape=body.startswith('<')) except TemplateError as e: get_logger().error('error in template for email body [%s], ' 'mail could not be generated: %s' % (url, str(e))) return try: mail_subject = template_on_formdata(formdata, self.compute(subject, render=False), autoescape=False) except TemplateError as e: get_logger().error('error in template for email subject [%s], ' 'mail could not be generated: %s' % (url, str(e))) return users_cfg = get_cfg('users', {}) # this works around the fact that parametric workflows only support # string values, so if we get set a string, we convert it here to an # array. if isinstance(self.to, six.string_types): self.to = [self.to] addresses = [] for dest in self.to: try: dest = self.compute(dest, raises=True) except: continue if not dest: continue if isinstance(dest, list): addresses.extend(dest) continue elif isinstance(dest, six.string_types) and ',' in dest: # if the email contains a comma consider it as a serie of # emails addresses.extend([x.strip() for x in dest.split(',')]) continue if '@' in str(dest): addresses.append(dest) continue if dest == '_submitter': submitter_email = formdata.formdef.get_submitter_email(formdata) if submitter_email: addresses.append(submitter_email) continue dest = get_role_translation(formdata, dest) try: role = Role.get(dest) except KeyError: continue addresses.extend(role.get_emails()) if not addresses: return email_from = None if self.custom_from: email_from = self.compute(self.custom_from) attachments = self.convert_attachments_to_uploads() if len(addresses) > 1: emails.email(mail_subject, mail_body, email_rcpt=None, bcc=addresses, email_from=email_from, exclude_current_user=False, attachments=attachments, fire_and_forget=True) else: emails.email(mail_subject, mail_body, email_rcpt=addresses, email_from=email_from, exclude_current_user=False, attachments=attachments, fire_and_forget=True) register_item_class(SendmailWorkflowStatusItem) def get_formdata_template_context(formdata=None): ctx = get_publisher().substitutions.get_context_variables('lazy') if formdata: ctx['url'] = formdata.get_url() ctx['url_status'] = '%sstatus' % formdata.get_url() ctx['details'] = formdata.formdef.get_detailed_email_form(formdata, ctx['url']) ctx['name'] = formdata.formdef.name ctx['number'] = formdata.id if formdata.evolution and formdata.evolution[-1].comment: ctx['comment'] = formdata.evolution[-1].comment else: ctx['comment'] = '' ctx.update(formdata.get_as_dict()) # compatibility vars ctx['before'] = ctx.get('form_previous_status') ctx['after'] = ctx.get('form_status') ctx['evolution'] = ctx.get('form_evolution') return ctx def template_on_html_string(template): return template_on_formdata(None, template, ezt_format=ezt.FORMAT_HTML) def template_on_formdata(formdata=None, template=None, **kwargs): assert template is not None if not Template.is_template_string(template): # no tags, no variables: don't even process formdata return template context = get_formdata_template_context(formdata) return template_on_context(context, template, **kwargs) def template_on_context(context=None, template=None, **kwargs): assert template is not None if not Template.is_template_string(template): return template return Template(template, **kwargs).render(context) class SendSMSWorkflowStatusItem(WorkflowStatusItem): description = N_('SMS') key = 'sendsms' category = 'interaction' support_substitution_variables = True to = [] body = None # don't use roles (de)serializer for "to" field to_export_to_xml = None to_init_with_xml = None @classmethod def is_available(cls, workflow=None): sms_mode = get_cfg('sms', {}).get('mode') or 'none' return sms_mode != 'none' def get_parameters(self): return ('to', 'body', 'condition') def add_parameters_widgets(self, form, parameters, prefix='', formdef=None): super(SendSMSWorkflowStatusItem, self).add_parameters_widgets( form, parameters, prefix=prefix, formdef=formdef) if 'to' in parameters: form.add(WidgetList, '%sto' % prefix, title=_('To'), element_type=ComputedExpressionWidget, value=self.to, add_element_label=_('Add Number'), element_kwargs = {'render_br': False}) if 'body' in parameters: form.add(TextWidget, '%sbody' % prefix, title=_('Body'), value=self.body, cols=80, rows=10) def perform(self, formdata): if not self.is_available(): return if not self.to: return if not self.body: return destinations = [self.compute(x) for x in self.to] destinations = [x for x in destinations if x] # ignore empty elements if not destinations: return try: sms_body = template_on_formdata(formdata, self.compute(self.body, render=False)) except TemplateError as e: url = formdata.get_url() get_logger().error('error in template for sms [%s], ' 'sms could not be generated' % (url, str(e))) return from .qommon import sms sms_cfg = get_cfg('sms', {}) sender = sms_cfg.get('sender', 'AuQuotidien')[:11] mode = sms_cfg.get('mode', 'none') try: sms.SMS.get_sms_class(mode).send(sender, destinations, sms_body) except errors.SMSError as e: get_logger().error(e) register_item_class(SendSMSWorkflowStatusItem) class DisplayMessageWorkflowStatusItem(WorkflowStatusItem): description = N_('Alert') key = 'displaymsg' category = 'interaction' support_substitution_variables = True ok_in_global_action = False to = None position = 'top' level = None message = None def get_line_details(self): parts = [] if self.position == 'top': parts.append(_('top of page')) elif self.position == 'bottom': parts.append(_('bottom of page')) elif self.position == 'actions': parts.append(_('with actions')) if self.to: parts.append(_('for %s') % self.render_list_of_roles(self.to)) return ', '.join(parts) def is_for_current_user(self, filled): if not self.to: return True if not get_request(): return False user = get_request().user for role in self.to or []: if role == '_submitter': if filled.is_submitter(user): return True elif user: role = get_role_translation(filled, role) if role in user.get_roles(): return True return False def get_message(self, filled, position='top'): if not (self.message and self.position == position and self.is_for_current_user(filled)): return '' dict = copy.copy(get_publisher().substitutions.get_context_variables('lazy')) dict['date'] = misc.localstrftime(filled.receipt_time) dict['number'] = filled.id handling_role = filled.get_handling_role() if handling_role and handling_role.details: dict['receiver'] = handling_role.details.replace('\n', '
') message = self.message if self.level: message = '
%s
' % (self.level, message) return Template(message, ezt_format=ezt.FORMAT_HTML).render(dict) def add_parameters_widgets(self, form, parameters, prefix='', formdef=None): super(DisplayMessageWorkflowStatusItem, self).add_parameters_widgets( form, parameters, prefix=prefix, formdef=formdef) if 'message' in parameters: form.add(TextWidget, '%smessage' % prefix, title = _('Message'), value=self.message, cols=80, rows=10, validation_function=ComputedExpressionWidget.validate_template) if 'level' in parameters: form.add(SingleSelectWidget, '%slevel' % prefix, title=_('Level'), value=self.level, options=[(None, ''), ('success', _('Success')), ('info', _('Information')), ('warning', _('Warning')), ('error', _('Error'))]) if 'position' in parameters: form.add(SingleSelectWidget, '%sposition' % prefix, title=_('Position'), value=self.position, options=[('top', _('Top of page')), ('bottom', _('Bottom of page')), #('actions', _('With actions')) "too complicated" ]) if 'to' in parameters: form.add(WidgetList, '%sto' % prefix, title=_('To'), element_type=SingleSelectWidget, value=self.to or [], add_element_label=_('Add Role'), element_kwargs={'render_br': False, 'options': [(None, '---', None)] + self.get_list_of_roles(include_logged_in_users=False)}) def get_parameters(self): return ('message', 'level', 'position', 'to', 'condition') def get_message_parameter_view_value(self): if self.message.startswith('<'): return htmltext(self.message) return htmltext('
%s
') % self.message register_item_class(DisplayMessageWorkflowStatusItem) class RedirectToStatusWorkflowStatusItem(WorkflowStatusItem): description = N_('Status Page Redirection') key = 'redirectstatus' ok_in_global_action = False backoffice = False def perform(self, formdata): return formdata.get_url(self.backoffice) def add_parameters_widgets(self, form, parameters, prefix='', formdef=None): super(RedirectToStatusWorkflowStatusItem, self).add_parameters_widgets( form, parameters, prefix=prefix, formdef=formdef) if 'backoffice' in parameters: form.add(CheckboxWidget, '%sbackoffice' % prefix, title = _('Redirect to backoffice page'), value = self.backoffice) def get_parameters(self): return ('backoffice',) # RedirectToStatusWorkflowStatusItem is not registered as the class kept for # backward compatibility only and should not be exposed to the user. (#3031) class EditableWorkflowStatusItem(WorkflowStatusItem): description = N_('Edition') key = 'editable' category = 'formdata-action' endpoint = False waitpoint = True ok_in_global_action = False by = [] status = None label = None backoffice_info_text = None def get_line_details(self): if self.by: return _('by %s') % self.render_list_of_roles(self.by) else: return _('not completed') def fill_form(self, form, formdata, user, **kwargs): label = self.label if not label: label = _('Edit Form') form.add_submit('button%s' % self.id, label) form.get_widget('button%s' % self.id).backoffice_info_text = self.backoffice_info_text def submit_form(self, form, formdata, user, evo): if form.get_submit() == 'button%s' % self.id: return formdata.get_url(backoffice=get_request().is_in_backoffice()) + 'wfedit-%s' % self.id def add_parameters_widgets(self, form, parameters, prefix='', formdef=None): super(EditableWorkflowStatusItem, self).add_parameters_widgets( form, parameters, prefix=prefix, formdef=formdef) if 'by' in parameters: form.add(WidgetList, '%sby' % prefix, title = _('By'), element_type = SingleSelectWidget, value = self.by, add_element_label = _('Add Role'), element_kwargs={'render_br': False, 'options': [(None, '---', None)] + self.get_list_of_roles()}) if 'status' in parameters: form.add(SingleSelectWidget, '%sstatus' % prefix, title = _('Status After Edit'), value = self.status, hint = _("Don't select any if you don't want status change processing"), options = [(None, '---')] + [(x.id, x.name) for x in self.parent.parent.possible_status]) if 'label' in parameters: form.add(StringWidget, '%slabel' % prefix, title = _('Button Label'), value = self.label) if 'backoffice_info_text' in parameters: form.add(WysiwygTextWidget, '%sbackoffice_info_text' % prefix, title=_('Information Text for Backoffice'), value=self.backoffice_info_text) def get_parameters(self): return ('by', 'status', 'label', 'backoffice_info_text', 'condition') register_item_class(EditableWorkflowStatusItem) def load_extra(): from .wf import aggregation_email from .wf import timeout_jump from .wf import jump from .wf import attachment from .wf import remove from .wf import roles from .wf import dispatch from .wf import geolocate from .wf import wscall from .wf import form from .wf import register_comment from .wf import anonymise from .wf import export_to_model from .wf import resubmit from .wf import criticality from .wf import profile from .wf import backoffice_fields from .wf import redirect_to_url from .wf import notification from .wf import create_formdata from .wf import create_carddata from .wf.export_to_model import ExportToModel