wcs/wcs/workflows.py

3515 lines
130 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import base64
import collections
import copy
import datetime
import itertools
import os
import random
import time
import uuid
import xml.etree.ElementTree as ET
from django.utils.encoding import force_text
from quixote import get_publisher, get_request, get_response
from quixote.html import TemplateIO, htmltext
from .carddef import CardDef
from .categories import WorkflowCategory
from .conditions import Condition
from .fields import FileField
from .formdata import Evolution
from .formdef import FormDef, FormdefImportError
from .mail_templates import MailTemplate
from .qommon import _, emails, errors, ezt, force_str, get_cfg, misc
from .qommon.form import (
CheckboxWidget,
ComputedExpressionWidget,
ConditionWidget,
Form,
SingleSelectWidget,
SingleSelectWidgetWithOther,
StringWidget,
TextWidget,
ValidatedStringWidget,
VarnameWidget,
WidgetList,
WidgetListOfRoles,
WysiwygTextWidget,
)
from .qommon.humantime import seconds2humanduration
from .qommon.misc import C_, file_digest, get_as_datetime, xml_node_text
from .qommon.storage import Contains, NotEqual, Null, StorableObject, atomic_write, pickle_2to3_conversion
from .qommon.template import Template, TemplateError
from .qommon.upload_storage import PicklableUpload, get_storage_object
from .roles import get_user_roles, logged_users_role
if not __name__.startswith('wcs.') and __name__ != "__main__":
raise ImportError('Import of workflows module must be absolute (import wcs.workflows)')
def lax_int(s):
try:
return int(s)
except (ValueError, TypeError):
return -1
def perform_items(items, formdata, depth=20, event=None, global_action=False):
if depth == 0: # prevents infinite loops
return
url = None
old_status = formdata.status
performed_actions = []
for item in items:
if getattr(item.perform, 'noop', False):
continue
if not item.check_condition(formdata):
continue
performed_actions.append((datetime.datetime.now(), item.key, item.id))
try:
url = item.perform(formdata) or url
except AbortActionException as e:
url = e.url or url
break
if formdata.status != old_status:
break
if performed_actions:
latest_evolution = formdata.evolution[-1] if formdata.evolution else None
if latest_evolution:
latest_evolution.add_part(ActionsTracingEvolutionPart(event, performed_actions))
if formdata.id:
# don't save formdata it has been removed
formdata.store()
if formdata.status != old_status or (global_action and 'jump' in [x[1] for x in performed_actions]):
if not formdata.evolution:
formdata.evolution = []
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
formdata.evolution.append(evo)
formdata.store()
# performs the items of the new status
wf_status = formdata.get_status()
url = perform_items(wf_status.items, formdata, depth=depth - 1, event='continuation') or url
if url:
# hack around webtest as it checks type(url) is str and
# this won't work on django safe strings (isinstance would work);
# adding '' makes sure we get a "real" str object.
url = url + ''
return url
class WorkflowImportError(Exception):
def __init__(self, msg, msg_args=None, details=None):
self.msg = msg
self.msg_args = msg_args or ()
self.details = details
class AbortActionException(Exception):
def __init__(self, url=None):
self.url = url
class RedisplayFormException(Exception):
pass
class AttachmentSubstitutionProxy:
def __init__(self, formdata, attachment_evolution_part):
self.formdata = formdata
self.attachment_evolution_part = attachment_evolution_part
@property
def filename(self):
return self.attachment_evolution_part.orig_filename
@property
def content_type(self):
return self.attachment_evolution_part.content_type
@property
def content(self):
fp = self.attachment_evolution_part.get_file_pointer()
if fp:
return fp.read()
return b''
@property
def b64_content(self):
return base64.b64encode(self.content)
@property
def url(self):
return '%sattachment?f=%s' % (
self.formdata.get_url(),
os.path.basename(self.attachment_evolution_part.filename),
)
class NamedAttachmentsSubstitutionProxy:
def __init__(self, formdata, parts):
self.formdata = formdata
self.parts = parts[:]
self.parts.reverse()
def __len__(self):
return len(self.parts)
def __getattr__(self, name):
return getattr(self[0], name)
def __getitem__(self, i):
return AttachmentSubstitutionProxy(self.formdata, self.parts[i])
class AttachmentsSubstitutionProxy:
def __init__(self, formdata, deprecated_usage=False):
self.formdata = formdata
self.deprecated_usage = deprecated_usage
def __getattr__(self, name):
if name.startswith('__'):
raise AttributeError(name)
def has_varname_attachment(part):
return isinstance(part, AttachmentEvolutionPart) and getattr(part, 'varname', None) == name
parts = [part for part in self.formdata.iter_evolution_parts() if has_varname_attachment(part)]
if parts:
if self.deprecated_usage:
error_summary = _('Usage of "attachments" detected in "attachments_%s" expression') % name
get_publisher().record_deprecated_usage(error_summary, formdata=self.formdata)
return NamedAttachmentsSubstitutionProxy(self.formdata, parts)
raise AttributeError(name)
class EvolutionPart:
to = None
is_hidden = None
view = None
def render_for_fts(self):
if not self.view or self.to:
# don't include parts with no content or restricted visibility
return ''
return misc.html2text(self.view() or '')
class AttachmentEvolutionPart(EvolutionPart):
orig_filename = None
base_filename = None
content_type = None
charset = None
varname = None
render_for_fts = None
storage = None
storage_attrs = None
def __init__(
self,
base_filename,
fp,
orig_filename=None,
content_type=None,
charset=None,
varname=None,
storage=None,
storage_attrs=None,
to=None,
):
self.base_filename = base_filename
self.orig_filename = orig_filename or base_filename
self.content_type = content_type
self.charset = charset
self.fp = fp
self.varname = varname
self.storage = storage
self.storage_attrs = storage_attrs
self.to = to
@classmethod
def from_upload(cls, upload, varname=None, to=None):
return AttachmentEvolutionPart(
upload.base_filename,
getattr(upload, 'fp', None),
upload.orig_filename,
upload.content_type,
upload.charset,
varname=varname,
storage=getattr(upload, 'storage', None),
storage_attrs=getattr(upload, 'storage_attrs', None),
to=to,
)
def get_file_pointer(self):
if self.filename.startswith('uuid-'):
return None
return open(self.filename, 'rb') # pylint: disable=consider-using-with
def __getstate__(self):
odict = self.__dict__.copy()
if not odict.get('fp'):
if 'filename' not in odict:
# we need a filename as an identifier: create one from nothing
# instead of file_digest(self.fp) (see below)
odict['filename'] = 'uuid-%s' % uuid.uuid4()
self.filename = odict['filename']
return odict
del odict['fp']
dirname = os.path.join(get_publisher().app_dir, 'attachments')
if not os.path.exists(dirname):
os.mkdir(dirname)
# there is not filename, or it was a temporary one: create it
if 'filename' not in odict or odict['filename'].startswith('uuid-'):
filename = file_digest(self.fp)
dirname = os.path.join(dirname, filename[:4])
if not os.path.exists(dirname):
os.mkdir(dirname)
odict['filename'] = os.path.join(dirname, filename)
self.filename = odict['filename']
self.fp.seek(0)
atomic_write(self.filename, self.fp)
return odict
def view(self):
show_link = True
if self.has_redirect_url():
is_in_backoffice = bool(get_request() and get_request().is_in_backoffice())
show_link = bool(self.get_redirect_url(backoffice=is_in_backoffice))
if show_link:
return htmltext(
'<p class="wf-attachment"><a href="attachment?f=%s">%s</a></p>'
% (os.path.basename(self.filename), self.orig_filename)
)
else:
return htmltext('<p class="wf-attachment">%s</p>' % self.orig_filename)
def get_json_export_dict(self, anonymise=False, include_files=True):
if not include_files or anonymise:
return None
d = {
'type': 'workflow-attachment',
'content_type': self.content_type,
'filename': self.base_filename,
'to': self.to,
}
fd = self.get_file_pointer()
if fd:
d['content'] = base64.encodebytes(fd.read())
fd.close()
return d
@classmethod
def get_substitution_variables(cls, formdata):
return {
'attachments': AttachmentsSubstitutionProxy(formdata, deprecated_usage=True),
'form_attachments': AttachmentsSubstitutionProxy(formdata),
}
# mimic PicklableUpload methods:
def can_thumbnail(self):
return get_storage_object(getattr(self, 'storage', None)).can_thumbnail(self)
def has_redirect_url(self):
return get_storage_object(getattr(self, 'storage', None)).has_redirect_url(self)
def get_redirect_url(self, backoffice=False):
return get_storage_object(getattr(self, 'storage', None)).get_redirect_url(
self, backoffice=backoffice
)
class ActionsTracingEvolutionPart(EvolutionPart):
def __init__(self, event, actions):
if isinstance(event, tuple):
self.event = event[0]
self.event_args = event[1:]
else:
self.event = event
self.event_args = None
self.actions = actions
def get_event_label(self):
return {
'api-created': _('Created (by API)'),
'api-post-edit-action': _('Actions after edit action (by API)'),
'api-trigger': _('API Trigger'),
'backoffice-created': _('Created (backoffice submission)'),
'continuation': _('Continuation'),
'csv-import-created': _('Created (by CSV import)'),
'edit-action': _('Actions after edit action'),
'frontoffice-created': _('Created (frontoffice submission)'),
'global-action-button': _('Click on a global action button'),
'global-action': _('Global action'),
'global-action-timeout': _('Global action timeout'),
'timeout-jump': _('Timeout jump'),
'workflow-created': _('Created (by workflow action)'),
'workflow-form-submit': _('Action in workflow form'),
}.get(self.event, self.event)
def is_global_event(self):
return bool(self.event and self.event.startswith('global-'))
def get_base_url(self, workflow, status_id):
if self.is_global_event():
return '%sglobal-actions/%s/' % (workflow.get_admin_url(), self.event_args[0])
status = workflow.get_status(status_id)
return status.get_admin_url()
class DuplicateGlobalActionNameError(Exception):
pass
class DuplicateStatusNameError(Exception):
pass
class WorkflowVariablesFieldsFormDef(FormDef):
"""Class to handle workflow variables, it loads and saves from/to
the workflow object 'variables_formdef' attribute."""
lightweight = False
def __init__(self, workflow):
self.id = None
self.workflow = workflow
if self.workflow.is_readonly():
self.readonly = True
if workflow.variables_formdef and workflow.variables_formdef.fields:
self.fields = self.workflow.variables_formdef.fields
self.max_field_id = max(lax_int(x.id) for x in self.fields or [])
else:
self.fields = []
@property
def name(self):
return _('Options of workflow "%s"') % self.workflow.name
def get_admin_url(self):
base_url = get_publisher().get_backoffice_url()
return '%s/workflows/%s/variables/fields/' % (base_url, self.workflow.id)
def store(self, comment=None, *args, **kwargs):
for field in self.fields:
if hasattr(field, 'widget_class'):
if not field.varname:
field.varname = misc.simplify(field.label, space='_')
self.workflow.variables_formdef = self
self.workflow.store(comment=comment, *args, **kwargs)
class WorkflowBackofficeFieldsFormDef(FormDef):
"""Class to handle workflow backoffice fields, it loads and saves from/to
the workflow object 'backoffice_fields_formdef' attribute."""
lightweight = False
field_prefix = 'bo'
def __init__(self, workflow):
self.id = None
self.workflow = workflow
if workflow.backoffice_fields_formdef and workflow.backoffice_fields_formdef.fields:
self.fields = self.workflow.backoffice_fields_formdef.fields
else:
self.fields = []
@property
def name(self):
return _('Backoffice fields of workflow "%s"') % self.workflow.name
def get_admin_url(self):
base_url = get_publisher().get_backoffice_url()
return '%s/workflows/%s/backoffice-fields/fields/' % (base_url, self.workflow.id)
def get_new_field_id(self):
return '%s%s' % (self.field_prefix, str(uuid.uuid4()))
def store(self, comment=None):
self.workflow.backoffice_fields_formdef = self
self.workflow.store(comment=comment)
class Workflow(StorableObject):
_names = 'workflows'
xml_root_node = 'workflow'
name = None
possible_status = None
roles = None
variables_formdef = None
backoffice_fields_formdef = None
global_actions = None
criticality_levels = None
category_id = None
def __init__(self, name=None):
StorableObject.__init__(self)
self.name = name
self.possible_status = []
self.roles = {'_receiver': force_text(_('Recipient'))}
self.global_actions = []
self.criticality_levels = []
def migrate(self):
changed = False
if 'roles' not in self.__dict__ or self.roles is None:
self.roles = {'_receiver': force_text(_('Recipient'))}
changed = True
for status in self.possible_status:
changed |= status.migrate()
if self.backoffice_fields_formdef and self.backoffice_fields_formdef.fields:
for field in self.backoffice_fields_formdef.fields:
changed |= field.migrate()
if not self.global_actions:
self.global_actions = []
if changed:
self.store()
@property
def category(self):
return WorkflowCategory.get(self.category_id, ignore_errors=True)
@category.setter
def category(self, category):
if category:
self.category_id = category.id
elif self.category_id:
self.category_id = None
def store(self, comment=None, *args, **kwargs):
assert not self.is_readonly()
must_update = False
if self.id:
old_self = self.get(self.id, ignore_errors=True, ignore_migration=True)
if old_self:
old_endpoints = {x.id for x in old_self.get_endpoint_status()}
if old_endpoints != {x.id for x in self.get_endpoint_status()}:
must_update = True
old_criticality_levels = len(old_self.criticality_levels or [0])
if old_criticality_levels != len(self.criticality_levels or [0]):
must_update = True
try:
old_backoffice_fields = old_self.backoffice_fields_formdef.fields
except AttributeError:
old_backoffice_fields = []
try:
new_backoffice_fields = self.backoffice_fields_formdef.fields
except AttributeError:
new_backoffice_fields = []
if len(old_backoffice_fields) != len(new_backoffice_fields):
must_update = True
elif self.backoffice_fields_formdef:
must_update = True
StorableObject.store(self, *args, **kwargs)
if get_publisher().snapshot_class:
get_publisher().snapshot_class.snap(instance=self, comment=comment)
def update(job=None):
# instruct all related carddefs/formdefs to update.
for form in itertools.chain(
self.formdefs(ignore_migration=True, order_by='id'),
self.carddefs(ignore_migration=True, order_by='id'),
):
if must_update:
form.update_storage()
form.data_class().rebuild_security()
if get_response():
get_response().add_after_job(_('Reindexing cards and forms after workflow change'), update)
else:
update()
def get_admin_url(self):
base_url = get_publisher().get_backoffice_url()
return '%s/workflows/%s/' % (base_url, self.id)
@classmethod
def get(cls, id, ignore_errors=False, ignore_migration=False):
if id == '_default':
return cls.get_default_workflow()
elif id == '_carddef_default':
from wcs.carddef import CardDef
return CardDef.get_default_workflow()
return super().get(id, ignore_errors=ignore_errors, ignore_migration=ignore_migration)
def add_status(self, name, id=None):
if [x for x in self.possible_status if x.name == name]:
raise DuplicateStatusNameError()
status = WorkflowStatus(name)
status.parent = self
if id is None:
if self.possible_status:
status.id = str(max(lax_int(x.id) for x in self.possible_status) + 1)
else:
status.id = '1'
else:
status.id = id
self.possible_status.append(status)
return status
def get_status(self, id):
if id.startswith('wf-'):
id = id[3:]
for status in self.possible_status:
if status.id == id:
return status
raise KeyError()
def get_backoffice_fields(self):
if self.backoffice_fields_formdef:
return self.backoffice_fields_formdef.fields or []
return []
def get_all_items(self):
for status in self.possible_status or []:
yield from status.items or []
for action in self.global_actions or []:
yield from action.items or []
def add_global_action(self, name, id=None):
if [x for x in self.global_actions if x.name == name]:
raise DuplicateGlobalActionNameError()
action = WorkflowGlobalAction(name)
action.parent = self
action.append_trigger('manual')
if id is None:
if self.global_actions:
action.id = str(max(lax_int(x.id) for x in self.global_actions) + 1)
else:
action.id = '1'
else:
action.id = id
self.global_actions.append(action)
return action
def get_global_manual_actions(self):
actions = []
for action in self.global_actions or []:
roles = []
for trigger in action.triggers or []:
if not isinstance(trigger, WorkflowGlobalActionManualTrigger):
continue
roles.extend(trigger.roles or [])
functions = [x for x in roles if x in self.roles]
roles = [x for x in roles if x not in self.roles]
if functions or roles:
actions.append({'action': action, 'roles': roles, 'functions': functions})
return actions
def get_global_actions_for_user(self, formdata, user):
actions = []
for action in self.global_actions or []:
for trigger in action.triggers or []:
if isinstance(trigger, WorkflowGlobalActionManualTrigger):
if '_submitter' in (trigger.roles or []) and formdata.is_submitter(user):
actions.append(action)
break
if not user:
continue
roles = set()
for role_id in trigger.roles or []:
if role_id == '_submitter':
continue
roles |= formdata.get_function_roles(role_id)
if roles.intersection(user.get_roles()):
actions.append(action)
break
return actions
def get_subdirectories(self, formdata):
wf_status = formdata.get_status()
if not wf_status: # draft
return []
directories = []
for action in self.global_actions:
for trigger in action.triggers or []:
directories.extend(trigger.get_subdirectories(formdata))
directories.extend(wf_status.get_subdirectories(formdata))
return directories
def __setstate__(self, dict):
self.__dict__.update(dict)
pickle_2to3_conversion(self)
for s in self.possible_status + (self.global_actions or []):
s.parent = self
triggers = getattr(s, 'triggers', None) or []
for i, item in enumerate(s.items + triggers):
item.parent = s
if not item.id:
item.id = '%d' % (i + 1)
if self.variables_formdef:
self.variables_formdef.workflow = self
if self.backoffice_fields_formdef:
self.backoffice_fields_formdef.workflow = self
self.backoffice_fields_formdef.__class__ = WorkflowBackofficeFieldsFormDef
def get_waitpoint_status(self):
return [x for x in self.possible_status if x.is_waitpoint()]
def get_endpoint_status(self):
return [x for x in self.possible_status if x.is_endpoint()]
def get_not_endpoint_status(self):
return [x for x in self.possible_status if not x.is_endpoint()]
def has_options(self):
for status in self.possible_status:
for item in status.items:
for parameter in item.get_parameters():
if not getattr(item, parameter):
return True
return False
def remove_self(self):
for form in self.formdefs():
form.workflow_id = None
form.store()
StorableObject.remove_self(self)
def export_to_xml(self, include_id=False):
charset = get_publisher().site_charset
root = ET.Element('workflow')
if include_id and self.id and not str(self.id).startswith('_'):
root.attrib['id'] = str(self.id)
ET.SubElement(root, 'name').text = force_text(self.name, charset)
if self.category:
elem = ET.SubElement(root, 'category')
elem.text = force_text(self.category.name, charset)
if include_id:
elem.attrib['category_id'] = str(self.category.id)
roles_node = ET.SubElement(root, 'roles')
if self.roles:
for role_id, role_label in sorted(self.roles.items()):
role_node = ET.SubElement(roles_node, 'role')
role_node.attrib['id'] = role_id
role_node.text = force_text(role_label, charset)
possible_status = ET.SubElement(root, 'possible_status')
for status in self.possible_status:
possible_status.append(status.export_to_xml(charset=charset, include_id=include_id))
if self.global_actions:
global_actions = ET.SubElement(root, 'global_actions')
for action in self.global_actions:
global_actions.append(action.export_to_xml(charset=charset, include_id=include_id))
if self.criticality_levels:
criticality_levels = ET.SubElement(root, 'criticality_levels')
for level in self.criticality_levels:
criticality_levels.append(level.export_to_xml(charset=charset))
if self.variables_formdef:
variables = ET.SubElement(root, 'variables')
formdef = ET.SubElement(variables, 'formdef')
ET.SubElement(formdef, 'name').text = '-' # required by formdef xml import
fields = ET.SubElement(formdef, 'fields')
for field in self.variables_formdef.fields:
fields.append(field.export_to_xml(charset=charset, include_id=include_id))
if self.backoffice_fields_formdef:
variables = ET.SubElement(root, 'backoffice-fields')
formdef = ET.SubElement(variables, 'formdef')
ET.SubElement(formdef, 'name').text = '-' # required by formdef xml import
fields = ET.SubElement(formdef, 'fields')
for field in self.backoffice_fields_formdef.fields:
fields.append(field.export_to_xml(charset=charset, include_id=include_id))
return root
@classmethod
def import_from_xml(cls, fd, include_id=False, check_datasources=True):
try:
tree = ET.parse(fd)
except Exception:
raise ValueError()
return cls.import_from_xml_tree(tree, include_id=include_id, check_datasources=check_datasources)
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, snapshot=False, check_datasources=True):
charset = get_publisher().site_charset
workflow = cls()
if tree.find('name') is None or not tree.find('name').text:
raise WorkflowImportError(_('Missing name'))
# if the tree we get is actually a ElementTree for real, we get its
# root element and go on happily.
if not ET.iselement(tree):
tree = tree.getroot()
if tree.tag != 'workflow':
raise WorkflowImportError(_('Not a workflow'))
if include_id and tree.attrib.get('id'):
workflow.id = tree.attrib.get('id')
workflow.name = xml_node_text(tree.find('name'))
if tree.find('category') is not None:
category_node = tree.find('category')
if include_id and category_node.attrib.get('category_id'):
category_id = str(category_node.attrib.get('category_id'))
if WorkflowCategory.has_key(category_id):
workflow.category_id = category_id
else:
category = xml_node_text(category_node)
for c in WorkflowCategory.select():
if c.name == category:
workflow.category_id = c.id
break
if tree.find('roles') is not None:
workflow.roles = {}
for role_node in tree.findall('roles/role'):
workflow.roles[role_node.attrib['id']] = xml_node_text(role_node)
workflow.possible_status = []
for status in tree.find('possible_status'):
status_o = WorkflowStatus()
status_o.parent = workflow
try:
status_o.init_with_xml(
status,
charset,
include_id=include_id,
snapshot=snapshot,
check_datasources=check_datasources,
)
except FormdefImportError as e:
raise WorkflowImportError(e.msg, details=e.details)
workflow.possible_status.append(status_o)
workflow.global_actions = []
global_actions = tree.find('global_actions')
if global_actions is not None:
for action in global_actions:
action_o = WorkflowGlobalAction()
action_o.parent = workflow
action_o.init_with_xml(action, charset, include_id=include_id, snapshot=snapshot)
workflow.global_actions.append(action_o)
workflow.criticality_levels = []
criticality_levels = tree.find('criticality_levels')
if criticality_levels is not None:
for level in criticality_levels:
level_o = WorkflowCriticalityLevel()
level_o.init_with_xml(level, charset)
workflow.criticality_levels.append(level_o)
variables = tree.find('variables')
if variables is not None:
formdef = variables.find('formdef')
try:
imported_formdef = FormDef.import_from_xml_tree(
formdef, include_id=True, snapshot=snapshot, check_datasources=check_datasources
)
except FormdefImportError as e:
raise WorkflowImportError(e.msg, details=e.details)
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow)
workflow.variables_formdef.fields = imported_formdef.fields
variables = tree.find('backoffice-fields')
if variables is not None:
formdef = variables.find('formdef')
try:
imported_formdef = FormDef.import_from_xml_tree(
formdef, include_id=True, snapshot=snapshot, check_datasources=check_datasources
)
except FormdefImportError as e:
raise WorkflowImportError(e.msg, details=e.details)
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow=workflow)
workflow.backoffice_fields_formdef.fields = imported_formdef.fields
return workflow
def get_list_of_roles(self, include_logged_in_users=True):
t = []
t.append(('_submitter', C_('role|User'), '_submitter'))
for workflow_role in self.roles.items():
t.append(list(workflow_role) + [workflow_role[0]])
if include_logged_in_users:
t.append((logged_users_role().id, logged_users_role().name, logged_users_role().id))
include_roles = not (get_publisher().has_site_option('workflow-functions-only'))
if include_roles and get_user_roles():
t.append((None, '----', None))
t.extend(get_user_roles())
return t
def get_add_role_label(self):
if get_publisher().has_site_option('workflow-functions-only'):
return _('Add Function')
return _('Add Function or Role')
def render_list_of_roles(self, roles):
return render_list_of_roles(self, roles)
def get_json_export_dict(self, include_id=False):
charset = get_publisher().site_charset
root = {}
root['name'] = force_text(self.name, charset)
if include_id and self.id:
root['id'] = str(self.id)
roles = root['functions'] = {}
for role, label in self.roles.items():
roles[role] = force_text(label, charset)
statuses = root['statuses'] = []
endpoint_status_ids = [s.id for s in self.get_endpoint_status()]
waitpoint_status_ids = [s.id for s in self.get_waitpoint_status()]
for status in self.possible_status:
statuses.append(
{
'id': status.id,
'name': force_text(status.name, charset),
'forced_endpoint': status.forced_endpoint,
'endpoint': status.id in endpoint_status_ids,
'waitpoint': status.id in waitpoint_status_ids,
}
)
root['fields'] = []
for field in self.get_backoffice_fields():
root['fields'].append(field.export_to_json(include_id=include_id))
return root
@classmethod
def get_unknown_workflow(cls):
workflow = Workflow(name=_('Unknown'))
workflow.id = '_unknown'
return workflow
@classmethod
def get_default_workflow(cls):
from .qommon.admin.emails import EmailsDirectory
# force_text() is used on lazy gettext calls as the default workflow is used
# in tests as the basis for other ones and lazy gettext would fail pickling.
workflow = Workflow(name=force_text(_('Default')))
workflow.id = '_default'
workflow.roles = {'_receiver': force_text(_('Recipient'))}
just_submitted_status = workflow.add_status(force_text(_('Just Submitted')), 'just_submitted')
just_submitted_status.visibility = ['_receiver']
new_status = workflow.add_status(force_text(_('New')), 'new')
new_status.colour = '66FF00'
rejected_status = workflow.add_status(force_text(_('Rejected')), 'rejected')
rejected_status.colour = 'FF3300'
accepted_status = workflow.add_status(force_text(_('Accepted')), 'accepted')
accepted_status.colour = '66CCFF'
finished_status = workflow.add_status(force_text(_('Finished')), 'finished')
finished_status.colour = 'CCCCCC'
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
from wcs.wf.jump import JumpWorkflowStatusItem
jump_to_new = JumpWorkflowStatusItem()
jump_to_new.id = '_jump_to_new'
jump_to_new.status = new_status.id
jump_to_new.parent = just_submitted_status
notify_new_receiver_email = SendmailWorkflowStatusItem()
notify_new_receiver_email.id = '_notify_new_receiver_email'
notify_new_receiver_email.to = ['_receiver']
notify_new_receiver_email.subject = EmailsDirectory.get_subject('new_receiver')
notify_new_receiver_email.body = EmailsDirectory.get_body('new_receiver')
if not EmailsDirectory.is_enabled('new_receiver'):
notify_new_receiver_email = None
notify_new_user_email = SendmailWorkflowStatusItem()
notify_new_user_email.id = '_notify_new_user_email'
notify_new_user_email.to = ['_submitter']
notify_new_user_email.subject = EmailsDirectory.get_subject('new_user')
notify_new_user_email.body = EmailsDirectory.get_body('new_user')
if not EmailsDirectory.is_enabled('new_user'):
notify_change_user_email = None
notify_change_receiver_email = SendmailWorkflowStatusItem()
notify_change_receiver_email.id = '_notify_change_receiver_email'
notify_change_receiver_email.to = ['_receiver']
notify_change_receiver_email.subject = EmailsDirectory.get_subject('change_receiver')
notify_change_receiver_email.body = EmailsDirectory.get_body('change_receiver')
if not EmailsDirectory.is_enabled('change_receiver'):
notify_change_receiver_email = None
notify_change_user_email = SendmailWorkflowStatusItem()
notify_change_user_email.id = '_notify_change_user_email'
notify_change_user_email.to = ['_submitter']
notify_change_user_email.subject = EmailsDirectory.get_subject('change_user')
notify_change_user_email.body = EmailsDirectory.get_body('change_user')
if not EmailsDirectory.is_enabled('change_user'):
notify_change_user_email = None
if notify_new_receiver_email:
notify_new_receiver_email.parent = just_submitted_status
just_submitted_status.items.append(notify_new_receiver_email)
if notify_new_user_email:
notify_new_user_email.parent = just_submitted_status
just_submitted_status.items.append(notify_new_user_email)
just_submitted_status.items.append(jump_to_new)
if notify_change_receiver_email:
accepted_status.items.append(notify_change_receiver_email)
notify_change_receiver_email.parent = accepted_status
notify_change_receiver_email = copy.copy(notify_change_receiver_email)
rejected_status.items.append(notify_change_receiver_email)
notify_change_receiver_email.parent = rejected_status
notify_change_receiver_email = copy.copy(notify_change_receiver_email)
finished_status.items.append(notify_change_receiver_email)
notify_change_receiver_email.parent = finished_status
if notify_change_user_email:
accepted_status.items.append(notify_change_user_email)
notify_change_user_email.parent = accepted_status
notify_change_user_email = copy.copy(notify_change_user_email)
rejected_status.items.append(notify_change_user_email)
notify_change_user_email.parent = rejected_status
notify_change_user_email = copy.copy(notify_change_user_email)
finished_status.items.append(notify_change_user_email)
notify_change_user_email.parent = finished_status
new_status.items.append(commentable)
commentable.parent = new_status
commentable = copy.copy(commentable)
accepted_status.items.append(commentable)
commentable.parent = accepted_status
accept = ChoiceWorkflowStatusItem()
accept.id = '_accept'
accept.label = force_text(_('Accept'))
accept.by = ['_receiver']
accept.status = accepted_status.id
accept.parent = new_status
new_status.items.append(accept)
reject = ChoiceWorkflowStatusItem()
reject.id = '_reject'
reject.label = force_text(_('Reject'))
reject.by = ['_receiver']
reject.status = rejected_status.id
reject.parent = new_status
new_status.items.append(reject)
finish = ChoiceWorkflowStatusItem()
finish.id = '_finish'
finish.label = force_text(_('Finish'))
finish.by = ['_receiver']
finish.status = finished_status.id
finish.parent = accepted_status
accepted_status.items.append(finish)
return workflow
def is_default(self):
return str(self.id).startswith('_')
def is_readonly(self):
return self.is_default() or super().is_readonly()
def formdefs(self, **kwargs):
order_by = kwargs.pop('order_by', 'name')
return list(FormDef.select(lambda x: x.workflow_id == self.id, order_by=order_by, **kwargs))
def carddefs(self, **kwargs):
order_by = kwargs.pop('order_by', 'name')
return list(CardDef.select(lambda x: x.workflow_id == self.id, order_by=order_by, **kwargs))
def mail_templates(self):
slugs = [x.mail_template for x in self.get_all_items() if x.key == 'sendmail' and x.mail_template]
criterias = [Contains('slug', slugs)]
return list(MailTemplate.select(criterias, order_by='name'))
class XmlSerialisable:
node_name = None
key = None
def export_to_xml(self, charset, include_id=False):
node = ET.Element(self.node_name)
if self.key:
node.attrib['type'] = self.key
if include_id and getattr(self, 'id', None):
node.attrib['id'] = self.id
for attribute in self.get_parameters():
if getattr(self, '%s_export_to_xml' % attribute, None):
getattr(self, '%s_export_to_xml' % attribute)(node, charset, include_id=include_id)
continue
if hasattr(self, attribute) and getattr(self, attribute) is not None:
el = ET.SubElement(node, attribute)
val = getattr(self, attribute)
if isinstance(val, dict):
for k, v in val.items():
ET.SubElement(el, k).text = force_text(v, charset, errors='replace')
elif isinstance(val, list):
if attribute[-1] == 's':
atname = attribute[:-1]
else:
atname = 'item'
for v in val:
ET.SubElement(el, atname).text = force_text(str(v), charset, errors='replace')
elif isinstance(val, str):
el.text = force_text(val, charset, errors='replace')
else:
el.text = str(val)
return node
def init_with_xml(self, elem, charset, include_id=False, snapshot=False, check_datasources=True):
if include_id and elem.attrib.get('id'):
self.id = elem.attrib.get('id')
for attribute in self.get_parameters():
el = elem.find(attribute)
if getattr(self, '%s_init_with_xml' % attribute, None):
getattr(self, '%s_init_with_xml' % attribute)(
el, charset, include_id=include_id, snapshot=snapshot
)
continue
if el is None:
continue
if list(el):
if isinstance(getattr(self, attribute), list):
v = [xml_node_text(x) or '' for x in el]
elif isinstance(getattr(self, attribute), dict):
v = {}
for e in el:
v[e.tag] = xml_node_text(e)
else:
# ???
raise AssertionError
setattr(self, attribute, v)
else:
if el.text is None:
setattr(self, attribute, None)
elif el.text in ('False', 'True') and not isinstance(getattr(self, attribute), str):
# booleans
setattr(self, attribute, el.text == 'True')
elif isinstance(getattr(self, attribute), int):
setattr(self, attribute, int(el.text))
else:
setattr(self, attribute, xml_node_text(el))
def _roles_export_to_xml(self, attribute, item, charset, include_id=False):
if not hasattr(self, attribute) or not getattr(self, attribute):
return
el = ET.SubElement(item, attribute)
for role_id in getattr(self, attribute):
if role_id is None:
continue
role_id = str(role_id)
role = get_role_name(role_id, charset)
sub = ET.SubElement(el, 'item')
sub.attrib['role_id'] = role_id
sub.text = role
def _roles_init_with_xml(self, attribute, elem, charset, include_id=False, snapshot=False):
if elem is None:
setattr(self, attribute, [])
else:
imported_roles = []
for child in elem:
imported_roles.append(self._get_role_id_from_xml(child, charset, include_id=include_id))
setattr(self, attribute, imported_roles)
def _role_export_to_xml(self, attribute, item, charset, include_id=False):
if not hasattr(self, attribute) or not getattr(self, attribute):
return
role_id = str(getattr(self, attribute))
role = get_role_name(role_id, charset)
sub = ET.SubElement(item, attribute)
if include_id:
sub.attrib['role_id'] = role_id
sub.text = role
def _get_role_id_from_xml(self, elem, charset, include_id=False):
if elem is None:
return None
value = xml_node_text(elem) or ''
# look for known static values
if value.startswith('_') or value == 'logged-users':
return value
# if we import using id, look at the role_id attribute
if include_id and 'role_id' in elem.attrib:
role_id = force_str(elem.attrib['role_id'])
if get_publisher().role_class.get(role_id, ignore_errors=True):
return role_id
if WorkflowStatusItem.get_expression(role_id)['type'] in ('python', 'template'):
return role_id
# if not using id, look up on the name
for role in get_publisher().role_class.select(ignore_errors=True):
if role.name == value:
return role.id
# if a computed value is possible and value looks like
# an expression, use it
if WorkflowStatusItem.get_expression(value)['type'] in ('python', 'template'):
return value
# if the roles are managed by the idp, don't try further.
if get_publisher() and get_cfg('sp', {}).get('idp-manage-roles') is True:
raise WorkflowImportError(_('Unknown referenced role (%s)'), (value,))
# and if there's no match, create a new role
role = get_publisher().role_class()
role.name = value
role.store()
return role.id
def _role_init_with_xml(self, attribute, elem, charset, include_id=False, snapshot=False):
setattr(self, attribute, self._get_role_id_from_xml(elem, charset, include_id=include_id))
class WorkflowGlobalActionTrigger(XmlSerialisable):
node_name = 'trigger'
def submit_admin_form(self, form):
for f in self.get_parameters():
widget = form.get_widget(f)
if widget:
value = widget.parse()
if hasattr(self, '%s_parse' % f):
value = getattr(self, '%s_parse' % f)(value)
setattr(self, f, value)
def get_subdirectories(self, formdata):
return []
class WorkflowGlobalActionManualTrigger(WorkflowGlobalActionTrigger):
key = 'manual'
roles = None
def get_parameters(self):
return ('roles',)
def render_as_line(self):
if self.roles:
return _('Manual by %s') % render_list_of_roles(self.parent.parent, self.roles)
else:
return _('Manual (not assigned)')
def form(self, workflow):
form = Form(enctype='multipart/form-data')
options = [(None, '---', None)]
options += workflow.get_list_of_roles(include_logged_in_users=False)
form.add(
WidgetList,
'roles',
title=_('Roles'),
element_type=SingleSelectWidget,
value=self.roles,
add_element_label=workflow.get_add_role_label(),
element_kwargs={'render_br': False, 'options': options},
)
return form
def roles_export_to_xml(self, item, charset, include_id=False):
self._roles_export_to_xml('roles', item, charset, include_id=include_id)
def roles_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self._roles_init_with_xml('roles', elem, charset, include_id=include_id, snapshot=snapshot)
class WorkflowGlobalActionTimeoutTriggerMarker(EvolutionPart):
def __init__(self, timeout_id):
self.timeout_id = timeout_id
class WorkflowGlobalActionTimeoutTrigger(WorkflowGlobalActionTrigger):
key = 'timeout'
anchor = None
anchor_expression = ''
anchor_template = ''
anchor_status_first = None
anchor_status_latest = None
timeout = None
def get_parameters(self):
return (
'anchor',
'anchor_expression',
'anchor_template',
'anchor_status_first',
'anchor_status_latest',
'timeout',
)
def get_anchor_labels(self):
options = [
('creation', _('Creation')),
('1st-arrival', _('First arrival in status')),
('latest-arrival', _('Latest arrival in status')),
('finalized', _('Arrival in final status')),
('template', _('String / Template')),
]
if not get_publisher().has_site_option('disable-python-expressions'):
options.append(('python', _('Python expression')))
return collections.OrderedDict(options)
def properly_configured(self):
workflow = self.parent.parent
if not (self.anchor and self.timeout):
return False
if self.anchor == '1st-arrival' and self.anchor_status_first:
try:
workflow.get_status(self.anchor_status_first)
except KeyError:
return False
if self.anchor == 'latest-arrival' and self.anchor_status_latest:
try:
workflow.get_status(self.anchor_status_latest)
except KeyError:
return False
return True
def render_as_line(self):
if self.properly_configured():
return _('Automatic, %(timeout)s, relative to: %(anchor)s') % {
'anchor': self.get_anchor_labels().get(self.anchor).lower(),
'timeout': _('%s days') % self.timeout,
}
else:
return _('Automatic (not configured)')
def form(self, workflow):
form = Form(enctype='multipart/form-data')
options = list(self.get_anchor_labels().items())
form.add(
SingleSelectWidget,
'anchor',
title=_('Reference Date'),
options=options,
value=self.anchor,
required=True,
attrs={'data-dynamic-display-parent': 'true'},
)
form.add(
StringWidget,
'anchor_expression',
title=_('Python Expression to get reference date'),
size=80,
value=self.anchor_expression,
hint=_('This should produce a date; it will only apply to open forms.'),
attrs={
'data-dynamic-display-child-of': 'anchor',
'data-dynamic-display-value': _('Python expression'),
},
)
form.add(
StringWidget,
'anchor_template',
title=_('String / Template with reference date'),
size=80,
value=self.anchor_template,
hint=_('This should be a date; it will only apply to open forms.'),
attrs={
'data-dynamic-display-child-of': 'anchor',
'data-dynamic-display-value': _('String / Template'),
},
)
possible_status = [(None, _('Current Status'), None)]
possible_status.extend([('wf-%s' % x.id, x.name, x.id) for x in workflow.possible_status])
form.add(
SingleSelectWidget,
'anchor_status_first',
title=_('Status'),
options=possible_status,
value=self.anchor_status_first,
attrs={
'data-dynamic-display-child-of': 'anchor',
'data-dynamic-display-value': _('First arrival in status'),
},
)
form.add(
SingleSelectWidget,
'anchor_status_latest',
title=_('Status'),
options=possible_status,
value=self.anchor_status_latest,
attrs={
'data-dynamic-display-child-of': 'anchor',
'data-dynamic-display-value': _('Latest arrival in status'),
},
)
form.add(
ValidatedStringWidget,
'timeout',
title=_('Delay (in days)'),
value=self.timeout,
regex=r'^-?\d+$',
required=True,
hint=_(
'''
Number of days relative to the reference date. If the
reference date is computed from an expression, a negative
delay is accepted to trigger the action before the
date.'''
),
)
return form
def must_trigger(self, formdata, endpoint_status_ids):
if formdata.status in endpoint_status_ids:
if not (
(self.anchor == '1st-arrival' and self.anchor_status_first in endpoint_status_ids)
or (self.anchor == 'latest-arrival' and self.anchor_status_latest in endpoint_status_ids)
or (self.anchor == 'finalized')
):
# don't trigger on finalized formdata (unless explicit anchor point)
return False
anchor_date = None
if self.anchor == 'creation':
anchor_date = formdata.receipt_time
elif self.anchor == '1st-arrival':
anchor_status = self.anchor_status_first or formdata.status
for evolution in formdata.evolution:
if evolution.status == anchor_status:
anchor_date = evolution.last_jump_datetime or evolution.time
break
elif self.anchor == 'latest-arrival':
anchor_status = self.anchor_status_latest or formdata.status
latest_no_status_evolution = None
for evolution in reversed(formdata.evolution):
if evolution.status == anchor_status:
if latest_no_status_evolution:
evolution = latest_no_status_evolution
anchor_date = evolution.last_jump_datetime or evolution.time
break
if evolution.status:
latest_no_status_evolution = None
elif latest_no_status_evolution is None:
latest_no_status_evolution = evolution
elif self.anchor == 'finalized':
if formdata.status in endpoint_status_ids:
for evolution in reversed(formdata.evolution):
if not evolution.status:
continue
if evolution.status in endpoint_status_ids:
anchor_date = evolution.time
else:
break
elif self.anchor == 'template' and self.anchor_template:
variables = get_publisher().substitutions.get_context_variables(mode='lazy')
anchor_date = Template(self.anchor_template, autoescape=False).render(variables)
elif self.anchor == 'python':
variables = get_publisher().substitutions.get_context_variables()
try:
# noqa pylint: disable=eval-used
anchor_date = eval(self.anchor_expression, get_publisher().get_global_eval_dict(), variables)
except Exception as e:
# get the variables in the locals() namespace so they are
# displayed within the trace.
expression = self.anchor_expression # noqa pylint: disable=unused-variable
# noqa pylint: disable=unused-variable
global_variables = get_publisher().get_global_eval_dict()
get_publisher().record_error(exception=e, context='[TIMEOUTS]', notify=True)
# convert anchor_date to datetime.datetime()
if isinstance(anchor_date, datetime.datetime):
pass
elif isinstance(anchor_date, datetime.date):
anchor_date = datetime.datetime(
year=anchor_date.year, month=anchor_date.month, day=anchor_date.day
)
elif isinstance(anchor_date, time.struct_time):
anchor_date = datetime.datetime.fromtimestamp(time.mktime(anchor_date))
elif isinstance(anchor_date, str) and anchor_date:
try:
anchor_date = get_as_datetime(anchor_date)
except ValueError as e:
get_publisher().record_error(exception=e, context='[TIMEOUTS]', notify=True)
anchor_date = None
elif anchor_date:
# timestamp
try:
anchor_date = datetime.datetime.fromtimestamp(anchor_date)
except TypeError as e:
get_publisher().record_error(exception=e, context='[TIMEOUTS]', notify=True)
anchor_date = None
if not anchor_date:
return False
anchor_date = anchor_date + datetime.timedelta(days=int(self.timeout))
return bool(datetime.datetime.now() > anchor_date)
@classmethod
def apply(cls, workflow):
triggers = []
for action in workflow.global_actions or []:
triggers.extend(
[
(action, x)
for x in action.triggers or []
if isinstance(x, WorkflowGlobalActionTimeoutTrigger) and x.properly_configured()
]
)
if not triggers:
return
not_endpoint_status = workflow.get_not_endpoint_status()
not_endpoint_status_ids = ['wf-%s' % x.id for x in not_endpoint_status]
endpoint_status = workflow.get_endpoint_status()
endpoint_status_ids = ['wf-%s' % x.id for x in endpoint_status]
# check if triggers are defined relative to terminal status
run_on_finalized = False
for action, trigger in triggers:
if trigger.anchor == 'finalized':
run_on_finalized = True
elif (
trigger.anchor == 'creation'
and workflow.possible_status
and workflow.possible_status[0] in endpoint_status
):
run_on_finalized = True
elif (
trigger.anchor == '1st-arrival'
and trigger.anchor_status_first
and workflow.get_status(trigger.anchor_status_first) in endpoint_status
):
run_on_finalized = True
elif (
trigger.anchor == 'latest-arrival'
and trigger.anchor_status_latest
and workflow.get_status(trigger.anchor_status_latest) in endpoint_status
):
run_on_finalized = True
criterias = [NotEqual('status', 'draft'), Null('anonymised')]
if not run_on_finalized:
# limit to formdata that are not finalized
criterias.append(Contains('status', not_endpoint_status_ids))
for formdef in workflow.formdefs():
data_class = formdef.data_class()
for formdata in data_class.select(criterias, iterator=True):
get_publisher().substitutions.reset()
get_publisher().substitutions.feed(get_publisher())
get_publisher().substitutions.feed(formdef)
get_publisher().substitutions.feed(formdata)
seen_triggers = []
for part in formdata.iter_evolution_parts():
if not isinstance(part, WorkflowGlobalActionTimeoutTriggerMarker):
continue
seen_triggers.append(part.timeout_id)
for action, trigger in triggers:
if trigger.id in seen_triggers:
continue # already triggered
if trigger.must_trigger(formdata, endpoint_status_ids):
if not formdata.evolution:
continue
formdata.evolution[-1].add_part(WorkflowGlobalActionTimeoutTriggerMarker(trigger.id))
formdata.store()
perform_items(
action.items,
formdata,
event=('global-action-timeout', action.id, trigger.id),
)
break
class WorkflowGlobalActionWebserviceTrigger(WorkflowGlobalActionManualTrigger):
key = 'webservice'
identifier = None
roles = None
def get_parameters(self):
return ('identifier', 'roles')
def render_as_line(self):
if self.identifier:
return _('External call (%s)') % self.identifier
else:
return _('External call (not configured)')
def form(self, workflow):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'identifier', title=_('Identifier'), required=True, value=self.identifier)
options = [(None, '---', None)]
options += workflow.get_list_of_roles(include_logged_in_users=True)
form.add(
WidgetList,
'roles',
title=_('Roles'),
element_type=SingleSelectWidget,
value=self.roles,
add_element_label=workflow.get_add_role_label(),
element_kwargs={'render_br': False, 'options': options},
)
return form
def get_subdirectories(self, formdata):
from wcs.forms.workflows import WorkflowGlobalActionWebserviceHooksDirectory
return [('hooks', WorkflowGlobalActionWebserviceHooksDirectory(formdata))]
class WorkflowGlobalAction:
id = None
name = None
items = None
triggers = None
backoffice_info_text = None
def __init__(self, name=None):
self.name = name
self.items = []
def append_item(self, type):
for klass in item_classes:
if klass.key == type:
o = klass()
if self.items:
o.id = str(max(lax_int(x.id) for x in self.items) + 1)
else:
o.id = '1'
self.items.append(o)
return o
raise KeyError()
def append_trigger(self, type):
trigger_types = {
'manual': WorkflowGlobalActionManualTrigger,
'timeout': WorkflowGlobalActionTimeoutTrigger,
'webservice': WorkflowGlobalActionWebserviceTrigger,
}
o = trigger_types.get(type)()
if not self.triggers:
self.triggers = []
o.id = str(uuid.uuid4())
self.triggers.append(o)
return o
def export_to_xml(self, charset, include_id=False):
status = ET.Element('action')
ET.SubElement(status, 'id').text = force_text(self.id, charset)
ET.SubElement(status, 'name').text = force_text(self.name, charset)
if self.backoffice_info_text:
ET.SubElement(status, 'backoffice_info_text').text = force_text(
self.backoffice_info_text, charset
)
items = ET.SubElement(status, 'items')
for item in self.items:
items.append(item.export_to_xml(charset=charset, include_id=include_id))
triggers = ET.SubElement(status, 'triggers')
for trigger in self.triggers or []:
triggers.append(trigger.export_to_xml(charset=charset, include_id=include_id))
return status
def init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self.id = xml_node_text(elem.find('id'))
self.name = xml_node_text(elem.find('name'))
if elem.find('backoffice_info_text') is not None:
self.backoffice_info_text = xml_node_text(elem.find('backoffice_info_text'))
self.items = []
for item in elem.find('items'):
item_type = item.attrib['type']
self.append_item(item_type)
item_o = self.items[-1]
item_o.parent = self
item_o.init_with_xml(item, charset, include_id=include_id, snapshot=snapshot)
self.triggers = []
for trigger in elem.find('triggers'):
trigger_type = trigger.attrib['type']
self.append_trigger(trigger_type)
trigger_o = self.triggers[-1]
trigger_o.parent = self
trigger_o.init_with_xml(trigger, charset, include_id=include_id, snapshot=snapshot)
class WorkflowCriticalityLevel:
id = None
name = None
colour = None
def __init__(self, name=None, colour=None):
self.name = name
self.colour = colour
self.id = str(random.randint(0, 100000))
def export_to_xml(self, charset, include_id=False):
level = ET.Element('criticality-level')
ET.SubElement(level, 'id').text = force_text(self.id, charset) if self.id else ''
ET.SubElement(level, 'name').text = force_text(self.name, charset)
if self.colour:
ET.SubElement(level, 'colour').text = force_text(self.colour, charset)
return level
def init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self.id = xml_node_text(elem.find('id'))
self.name = xml_node_text(elem.find('name'))
if elem.find('colour') is not None:
self.colour = xml_node_text(elem.find('colour'))
class WorkflowStatus:
id = None
name = None
items = None
visibility = None
forced_endpoint = False
colour = 'FFFFFF'
backoffice_info_text = None
extra_css_class = ''
def __init__(self, name=None):
self.name = name
self.items = []
def __eq__(self, other):
if other is None:
return False
# this assumes both status are from the same workflow
if isinstance(other, str):
other_id = other
else:
other_id = other.id
return self.id == other_id
def migrate(self):
changed = False
for item in self.items:
changed |= item.migrate()
return changed
def append_item(self, type):
for klass in item_classes:
if klass.key == type:
o = klass()
if self.items:
o.id = str(max(lax_int(x.id) for x in self.items) + 1)
else:
o.id = '1'
self.items.append(o)
break
else:
raise KeyError()
def get_item(self, id):
for item in self.items:
if item.id == id:
return item
raise KeyError()
def get_action_form(self, filled, user, displayed_fields=None):
form = Form(enctype='multipart/form-data', use_tokens=False)
form.attrs['id'] = 'wf-actions'
for item in self.items:
if not item.check_auth(filled, user):
continue
if not item.check_condition(filled):
continue
item.fill_form(form, filled, user, displayed_fields=displayed_fields)
for action in filled.formdef.workflow.get_global_actions_for_user(filled, user):
form.add_submit('button-action-%s' % action.id, action.name)
widget = form.get_widget('button-action-%s' % action.id)
if widget:
widget.backoffice_info_text = action.backoffice_info_text
widget.ignore_form_errors = True
widget.attrs['formnovalidate'] = 'formnovalidate'
if form.widgets or form.submit_widgets:
return form
else:
return None
def get_active_items(self, form, filled, user):
for item in self.items:
if hasattr(item, 'by'):
for role in item.by or []:
if role == logged_users_role().id:
break
if role == '_submitter':
if filled.is_submitter(user):
break
continue
if user is None:
continue
if filled.get_function_roles(role).intersection(user.get_roles()):
break
else:
continue
if not item.check_condition(filled):
continue
yield item
def get_admin_url(self):
return self.parent.get_admin_url() + 'status/%s/' % self.id
def evaluate_live_form(self, form, filled, user):
for item in self.get_active_items(form, filled, user):
item.evaluate_live_form(form, filled, user)
def handle_form(self, form, filled, user):
# check for global actions
for action in filled.formdef.workflow.get_global_actions_for_user(filled, user):
if 'button-action-%s' % action.id in get_request().form:
url = filled.perform_global_action(action.id, user, event_name='global-action-button')
if url:
return url
return
evo = Evolution()
evo.time = time.localtime()
if user:
if filled.is_submitter(user):
evo.who = '_submitter'
else:
evo.who = user.id
if not filled.evolution:
filled.evolution = []
for item in self.get_active_items(form, filled, user):
next_url = item.submit_form(form, filled, user, evo)
if next_url is True:
break
if next_url:
if not form.has_errors():
if evo.parts or evo.status or evo.comment or evo.status:
# add evolution entry only if there's some content
# within, i.e. do not register anything in the case of
# a single edit action (where the evolution should be
# appended only after successful edit).
filled.evolution.append(evo)
if evo.status:
filled.status = evo.status
filled.store()
return next_url
if form.has_errors():
return
filled.evolution.append(evo)
if evo.status:
filled.status = evo.status
filled.store()
url = filled.perform_workflow(event='workflow-form-submit')
if url:
return url
def get_subdirectories(self, formdata):
subdirectories = []
for item in self.items:
if item.directory_name:
subdirectories.append((item.directory_name, item.directory_class(formdata, item, self)))
return subdirectories
def get_visibility_restricted_roles(self):
if not self.visibility: # no restriction -> visible
return []
return self.visibility
def is_visible(self, formdata, user):
if not self.visibility: # no restriction -> visible
return True
if get_request() and get_request().is_in_frontoffice():
# always hide in front
return False
if user and user.is_admin:
return True
if user:
user_roles = set(user.get_roles())
user_roles.add(logged_users_role().id)
else:
user_roles = set()
visibility_roles = self.visibility[:]
for item in self.items or []:
if not hasattr(item, 'by') or not item.by:
continue
visibility_roles.extend(item.by)
for role in visibility_roles:
if role != '_submitter':
if formdata.get_function_roles(role).intersection(user_roles):
return True
return False
def is_endpoint(self):
# an endpoint status is a status that marks the end of the workflow; it
# can either be computed automatically (if there's no way out of the
# status) or be set manually (to mark the expected end while still
# allowing to go back and re-enter the workflow).
if self.forced_endpoint:
return True
endpoint = True
for item in self.items:
endpoint = endpoint and item.endpoint
if endpoint is False:
break
return endpoint
def is_waitpoint(self):
# a waitpoint status is a status waiting for an event (be it user
# interaction or something else), but can also be an endpoint (where
# the user would wait, infinitely).
waitpoint = False
endpoint = True
if self.forced_endpoint:
endpoint = True
else:
for item in self.items:
endpoint = item.endpoint and endpoint
waitpoint = item.waitpoint or waitpoint
return bool(endpoint or waitpoint)
def get_status_manual_actions(self):
actions = []
status_id = self.id
class StatusAction:
def __init__(self, action):
self.id = 'st-%s' % action.identifier
self.status_id = status_id
self.action_id = action.identifier
self.name = action.get_label()
self.status_action = True
self.require_confirmation = action.require_confirmation
self.action = action
for action in self.items or []:
if not isinstance(action, ChoiceWorkflowStatusItem):
continue
if not action.identifier:
continue
roles = action.by or []
functions = [x for x in roles if x in (self.parent.roles or [])]
roles = [x for x in roles if x not in (self.parent.roles or [])]
if functions or roles:
actions.append({'action': StatusAction(action), 'roles': roles, 'functions': functions})
return actions
def get_contrast_color(self):
colour = self.colour or 'ffffff'
return misc.get_foreground_colour(colour)
def __getstate__(self):
odict = self.__dict__.copy()
if 'parent' in odict:
del odict['parent']
return odict
def export_to_xml(self, charset, include_id=False):
status = ET.Element('status')
ET.SubElement(status, 'id').text = force_text(self.id, charset)
ET.SubElement(status, 'name').text = force_text(self.name, charset)
ET.SubElement(status, 'colour').text = force_text(self.colour, charset)
if self.extra_css_class:
ET.SubElement(status, 'extra_css_class').text = force_text(self.extra_css_class, charset)
if self.forced_endpoint:
ET.SubElement(status, 'forced_endpoint').text = 'true'
if self.backoffice_info_text:
ET.SubElement(status, 'backoffice_info_text').text = force_text(
self.backoffice_info_text, charset
)
visibility_node = ET.SubElement(status, 'visibility')
for role in self.visibility or []:
ET.SubElement(visibility_node, 'role').text = str(role)
items = ET.SubElement(status, 'items')
for item in self.items:
items.append(item.export_to_xml(charset=charset, include_id=include_id))
return status
def init_with_xml(self, elem, charset, include_id=False, snapshot=False, check_datasources=True):
self.id = xml_node_text(elem.find('id'))
self.name = xml_node_text(elem.find('name'))
if elem.find('colour') is not None:
self.colour = xml_node_text(elem.find('colour'))
if elem.find('extra_css_class') is not None:
self.extra_css_class = xml_node_text(elem.find('extra_css_class'))
if elem.find('forced_endpoint') is not None:
self.forced_endpoint = elem.find('forced_endpoint').text == 'true'
if elem.find('backoffice_info_text') is not None:
self.backoffice_info_text = xml_node_text(elem.find('backoffice_info_text'))
self.visibility = []
for visibility_role in elem.findall('visibility/role'):
self.visibility.append(visibility_role.text)
self.items = []
for item in elem.find('items'):
item_type = item.attrib['type']
self.append_item(item_type)
item_o = self.items[-1]
item_o.parent = self
item_o.init_with_xml(
item, charset, include_id=include_id, snapshot=snapshot, check_datasources=check_datasources
)
def __repr__(self):
return '<%s %s %r>' % (self.__class__.__name__, self.id, self.name)
def noop_mark(func):
# mark method as not executing anything
func.noop = True
return func
class WorkflowStatusItem(XmlSerialisable):
node_name = 'item'
description = 'XX'
category = None # (key, label)
id = None
condition = None
endpoint = True # means it's not possible to interact, and/or cause a status change
waitpoint = False # means it's possible to wait (user interaction, or other event)
ok_in_global_action = True # means it can be used in a global action
directory_name = None
directory_class = None
support_substitution_variables = False
@classmethod
def init(cls):
pass
@classmethod
def is_available(cls, workflow=None):
return True
def migrate(self):
changed = False
for roles_attribute in ('to', 'by'):
attribute_value = getattr(self, roles_attribute, []) or []
if any(x for x in attribute_value if isinstance(x, int)):
setattr(self, roles_attribute, [str(x) for x in attribute_value])
changed = True
return changed
def render_as_line(self):
label = self.description
details = self.get_line_details()
if details:
label += ' (%s)' % details
if self.condition and self.condition.get('value'):
label += ' (%s)' % _('conditional')
return label
def get_line_details(self):
return ''
def render_list_of_roles(self, roles):
return self.parent.parent.render_list_of_roles(roles)
def get_list_of_roles(self, include_logged_in_users=True):
return self.parent.parent.get_list_of_roles(include_logged_in_users=include_logged_in_users)
def get_add_role_label(self):
return self.parent.parent.get_add_role_label()
@noop_mark
def perform(self, formdata):
pass
def fill_form(self, form, formdata, user, **kwargs):
pass
def evaluate_live_form(self, form, formdata, user):
pass
def submit_form(self, form, formdata, user, evo):
pass
def check_auth(self, formdata, user):
if not hasattr(self, 'by'):
return True
for role in self.by or []:
if user and role == logged_users_role().id:
return True
if role == '_submitter':
t = formdata.is_submitter(user)
if t is True:
return True
continue
if not user:
continue
if formdata.get_function_roles(role).intersection(user.get_roles()):
return True
return False
def check_condition(self, formdata, record_errors=True, log_errors=False):
context = {'formdata': formdata, 'status_item': self}
try:
return Condition(
self.condition, context, record_errors=record_errors, log_errors=log_errors
).evaluate()
except RuntimeError:
return False
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
if 'condition' in parameters:
form.add(
ConditionWidget,
'%scondition' % prefix,
title=_('Condition of execution of the action'),
value=self.condition,
size=40,
advanced=not (self.condition),
)
if 'attachments' in parameters:
attachments_options, attachments = self.get_attachments_options()
if len(attachments_options) > 1:
form.add(
WidgetList,
'%sattachments' % prefix,
title=_('Attachments'),
element_type=SingleSelectWidgetWithOther,
value=attachments,
add_element_label=_('Add attachment'),
element_kwargs={'render_br': False, 'options': attachments_options},
)
else:
form.add(
WidgetList,
'%sattachments' % prefix,
title=_('Attachments (templates or Python expressions)'),
element_type=StringWidget,
value=attachments,
add_element_label=_('Add attachment'),
element_kwargs={'render_br': False, 'size': 50},
advanced=not (bool(attachments)),
)
def get_parameters(self):
return ('condition',)
def get_parameters_view(self):
r = TemplateIO(html=True)
form = Form()
parameters = [x for x in self.get_parameters() if getattr(self, x, None) is not None]
for parameter in parameters:
self.add_parameters_widgets(form, [parameter])
r += htmltext('<ul>')
for parameter in parameters:
widget = form.get_widget(parameter)
if not widget:
continue
r += htmltext('<li>')
r += htmltext('<span class="parameter">%s</span> ') % _('%s:') % widget.get_title()
r += self.get_parameter_view_value(widget, parameter)
r += htmltext('</li>')
r += htmltext('</ul>')
return r.getvalue()
def get_backoffice_info_text_parameter_view_value(self):
return htmltext('<pre>%s</pre>') % self.backoffice_info_text
def get_by_parameter_view_value(self):
return self.render_list_of_roles(self.by)
def get_timeout_parameter_view_value(self):
try:
return seconds2humanduration(int(self.timeout or 0))
except ValueError:
return self.timeout # probably an expression
def get_status_parameter_view_value(self):
for status in self.parent.parent.possible_status:
if status.id == self.status:
return htmltext('<a href="#status-%s">%s</a>') % (status.id, status.name)
return _('Unknown (%s)') % self.status
def get_parameter_view_value(self, widget, parameter):
if hasattr(self, 'get_%s_parameter_view_value' % parameter):
return getattr(self, 'get_%s_parameter_view_value' % parameter)()
value = getattr(self, parameter)
if isinstance(value, bool):
return _('Yes') if value else _('No')
elif hasattr(widget, 'options') and value:
for option in widget.options:
if isinstance(option, tuple):
if option[0] == value:
return option[1]
else:
if option == value:
return option
return '-'
else:
return str(value)
def fill_admin_form(self, form):
for parameter in self.get_parameters():
self.add_parameters_widgets(form, [parameter])
def submit_admin_form(self, form):
for f in self.get_parameters():
widget = form.get_widget(f)
if widget:
if hasattr(self, 'clean_%s' % f):
has_error = getattr(self, 'clean_%s' % f)(form)
if has_error:
continue
value = widget.parse()
if hasattr(self, '%s_parse' % f):
value = getattr(self, '%s_parse' % f)(value)
setattr(self, f, value)
@classmethod
def get_expression(cls, var, allow_python=True):
if not var:
expression_type = 'text'
expression_value = ''
elif var.startswith('=') and allow_python:
expression_type = 'python'
expression_value = var[1:]
elif '{{' in var or '{%' in var or '[' in var:
expression_type = 'template'
expression_value = var
else:
expression_type = 'text'
expression_value = var
return {'type': expression_type, 'value': expression_value}
@classmethod
def compute(
cls,
var,
render=True,
raises=False,
record_errors=True,
allow_complex=False,
context=None,
formdata=None,
status_item=None,
):
if not isinstance(var, str):
return var
expression = cls.get_expression(var)
if expression['type'] != 'python' and not render:
return var
if expression['type'] == 'text':
return expression['value']
vars = get_publisher().substitutions.get_context_variables(
'lazy' if expression['type'] == 'template' else None
)
vars.update(context or {})
def log_exception(exception):
if expression['type'] == 'template':
summary = _('Failed to compute template')
else:
summary = _('Failed to compute Python expression')
get_publisher().record_error(
summary,
formdata=formdata,
status_item=status_item,
expression=expression['value'],
expression_type=expression['type'],
exception=exception,
)
if expression['type'] == 'template':
vars['allow_complex'] = allow_complex
try:
return Template(expression['value'], raises=raises, autoescape=False).render(vars)
except TemplateError as e:
if record_errors:
log_exception(e)
if raises:
raise
return var
try:
# noqa pylint: disable=eval-used
return eval(expression['value'], get_publisher().get_global_eval_dict(), vars)
except Exception as e:
if record_errors:
log_exception(e)
if raises:
raise
return var
def get_computed_role_id(self, role_id):
new_role_id = self.compute(str(role_id))
if get_publisher().role_class.get(new_role_id, ignore_errors=True):
return new_role_id
# computed value, not an id, try to get role by slug
new_role = get_publisher().role_class.get_on_index(new_role_id, 'slug', ignore_errors=True)
if new_role:
return new_role.id
# fallback to role label
for role in get_publisher().role_class.select():
if role.name == new_role_id:
return role.id
return None
def get_substitution_variables(self, formdata):
return {}
def get_target_status_url(self):
if not getattr(self, 'status', None) or self.status == '_previous':
return None
targets = [x for x in self.parent.parent.possible_status if x.id == self.status]
if not targets:
return None
return targets[0].get_admin_url()
def get_target_status(self, formdata=None):
"""Returns a list of status this item can lead to."""
if not getattr(self, 'status', None):
return []
if self.status == '_previous':
if formdata is None:
# must be in a formdata to compute destination, just give a
# fake status for presentation purpose
return [WorkflowStatus(_('Previously Marked Status'))]
previous_status = formdata.pop_previous_marked_status()
if previous_status:
return [previous_status]
return []
targets = [x for x in self.parent.parent.possible_status if x.id == self.status]
if not targets and formdata: # do not log in presentation context: formdata is needed
message = _(
'reference to invalid status %(target)s in status %(status)s, ' 'action %(status_item)s'
) % {'target': self.status, 'status': self.parent.name, 'status_item': self.description}
get_publisher().record_error(message, formdata=formdata, status_item=self)
return targets
def get_jump_label(self, target_id):
'''Return the label to use on a workflow graph arrow'''
if getattr(self, 'label', None):
label = self.label
if getattr(self, 'by', None):
roles = self.parent.parent.render_list_of_roles(self.by)
label += ' %s %s' % (_('by'), roles)
if getattr(self, 'status', None) == '_previous':
label += ' ' + str(_('(to last marker)'))
if getattr(self, 'set_marker_on_status', False):
label += ' ' + str(_('(and set marker)'))
if getattr(self, 'condition', None):
label += ' ' + str(_('(conditional)'))
else:
label = self.render_as_line()
return label
def get_backoffice_filefield_options(self):
options = []
for field in self.parent.parent.get_backoffice_fields():
if field.key == 'file':
options.append((field.id, field.label, field.id))
return options
def store_in_backoffice_filefield(
self, formdata, backoffice_filefield_id, filename, content_type, content
):
filefield = [
x
for x in self.parent.parent.get_backoffice_fields()
if x.id == backoffice_filefield_id and x.key == 'file'
]
if filefield:
upload = PicklableUpload(filename, content_type)
upload.receive([content])
formdata.data[backoffice_filefield_id] = upload
formdata.store()
def by_export_to_xml(self, item, charset, include_id=False):
self._roles_export_to_xml('by', item, charset, include_id=include_id)
def by_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self._roles_init_with_xml('by', elem, charset, include_id=include_id, snapshot=snapshot)
def to_export_to_xml(self, item, charset, include_id=False):
self._roles_export_to_xml('to', item, charset, include_id=include_id)
def to_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self._roles_init_with_xml('to', elem, charset, include_id, snapshot=snapshot)
def condition_init_with_xml(self, node, charset, include_id=False, snapshot=False):
self.condition = None
if node is None:
return
if node.findall('type'):
self.condition = {
'type': xml_node_text(node.find('type')),
'value': xml_node_text(node.find('value')),
}
elif node.text:
# backward compatibility
self.condition = {'type': 'python', 'value': xml_node_text(node)}
def q_admin_lookup(self, workflow, status, component, html_top):
return None
def __getstate__(self):
odict = self.__dict__.copy()
if 'parent' in odict:
del odict['parent']
return odict
def mail_template_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
if elem is None:
self.mail_template = None
return
value = xml_node_text(elem)
mail_template = MailTemplate.get_by_slug(value)
if not mail_template:
raise WorkflowImportError(_('Unknown referenced mail template (%s)'), (value,))
self.mail_template = value
return
def attachments_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
if elem is None:
self.attachments = None
else:
self.attachments = [xml_node_text(item) for item in elem.findall('attachment')]
def get_attachments_options(self):
attachments_options = [(None, '---', None)]
varnameless = []
for field in self.parent.parent.get_backoffice_fields():
if field.key != 'file':
continue
if field.varname:
codename = 'form_var_%s_raw' % field.varname
else:
codename = 'form_f%s' % field.id.replace('-', '_') # = form_fbo<...>
varnameless.append(codename)
attachments_options.append((codename, field.label, codename))
# filter: do not consider removed fields without varname
attachments = [
attachment
for attachment in self.attachments or []
if ((not attachment.startswith('form_fbo')) or (attachment in varnameless))
]
return attachments_options, attachments
def convert_attachments_to_uploads(self, extra_attachments=None):
uploads = []
attachments = []
attachments.extend(self.attachments or [])
attachments.extend(extra_attachments or [])
# 1. attachments defined as templates
with get_publisher().complex_data():
for attachment in attachments[:]:
if '{%' not in attachment and '{{' not in attachment:
continue
attachments.remove(attachment)
try:
attachment = WorkflowStatusItem.compute(attachment, allow_complex=True, raises=True)
except Exception as e:
get_publisher().record_error(exception=e, context='[workflow/attachments]', notify=True)
else:
if attachment:
complex_value = get_publisher().get_cached_complex_data(attachment)
if complex_value:
uploads.append(complex_value)
# 2. python expressions
if attachments:
global_eval_dict = get_publisher().get_global_eval_dict()
local_eval_dict = get_publisher().substitutions.get_context_variables()
for attachment in attachments:
if attachment.startswith('form_fbo') and '-' in attachment:
# detect varname-less backoffice fields that were set
# before #33366 was fixed, and fix them.
attachment = attachment.replace('-', '_')
try:
# execute any Python expression
# and magically convert string like 'form_var_*_raw' to a PicklableUpload
# noqa pylint: disable=eval-used
picklableupload = eval(attachment, global_eval_dict, local_eval_dict)
except Exception as e:
get_publisher().record_error(exception=e, context='[workflow/attachments]', notify=True)
continue
if not picklableupload:
continue
uploads.append(picklableupload)
# 3. convert any value to a PicklableUpload; this allows for
# dicts like those provided by qommon/evalutils:attachment()
for upload in uploads:
if not isinstance(upload, PicklableUpload):
try:
upload = FileField.convert_value_from_anything(upload)
except ValueError as e:
get_publisher().record_error(exception=e, context='[workflow/attachments]', notify=True)
continue
yield upload
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, self.id)
class WorkflowStatusJumpItem(WorkflowStatusItem):
status = None
endpoint = False
set_marker_on_status = False
category = 'status-change'
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'status' in parameters:
destinations = [(x.id, x.name) for x in self.parent.parent.possible_status]
# look for existing jumps that are dropping a mark
workflow = self.parent.parent
statuses = getattr(workflow, 'possible_status') or []
global_actions = getattr(workflow, 'global_actions') or []
for status in statuses + global_actions:
for item in status.items:
if getattr(item, 'set_marker_on_status', False):
destinations.append(('_previous', _('Previously Marked Status')))
break
else:
continue
break
form.add(
SingleSelectWidget,
'%sstatus' % prefix,
title=_('Status'),
value=self.status,
options=[(None, '---')] + destinations,
)
if 'set_marker_on_status' in parameters:
form.add(
CheckboxWidget,
'%sset_marker_on_status' % prefix,
title=_('Set marker to jump back to current status'),
value=self.set_marker_on_status,
advanced=not (self.set_marker_on_status),
)
def handle_markers_stack(self, formdata):
if self.set_marker_on_status:
if formdata.workflow_data and '_markers_stack' in formdata.workflow_data:
markers_stack = formdata.workflow_data.get('_markers_stack')
else:
markers_stack = []
markers_stack.append({'status_id': formdata.status[3:]})
formdata.update_workflow_data({'_markers_stack': markers_stack})
def get_parameters(self):
return ('status', 'set_marker_on_status', 'condition')
def get_role_translation_label(workflow, role_id):
if role_id == logged_users_role().id:
return logged_users_role().name
if role_id == '_submitter':
return C_('role|User')
if str(role_id).startswith('_'):
return workflow.roles.get(role_id)
else:
try:
return get_publisher().role_class.get(role_id).name
except KeyError:
return
def get_role_name(role_id, charset=None):
role_id = str(role_id)
if role_id.startswith('_') or role_id == 'logged-users':
return force_text(role_id, charset)
try:
return force_text(get_publisher().role_class.get(role_id).name, charset)
except KeyError:
return force_text(role_id, charset)
def render_list_of_roles(workflow, roles):
t = []
for r in roles:
role_label = get_role_translation_label(workflow, r)
if role_label:
t.append(role_label)
return ', '.join([str(x) for x in t])
item_classes = []
def register_item_class(klass):
if klass.key not in [x.key for x in item_classes]:
item_classes.append(klass)
klass.init()
class CommentableWorkflowStatusItem(WorkflowStatusItem):
description = _('Comment')
key = 'commentable'
category = 'interaction'
endpoint = False
waitpoint = True
ok_in_global_action = False
required = False
varname = None
label = None
button_label = 0 # hack to handle legacy commentable items
hint = None
by = []
backoffice_info_text = None
def get_line_details(self):
if self.by:
return _('by %s') % self.render_list_of_roles(self.by)
else:
return _('not completed')
def fill_form(self, form, formdata, user, **kwargs):
if 'comment' not in [x.name for x in form.widgets]:
if self.label is None:
title = _('Comment')
else:
title = self.label
form.add(
TextWidget, 'comment', title=title, required=self.required, cols=40, rows=10, hint=self.hint
)
form.get_widget('comment').attrs['class'] = 'comment'
if self.button_label == 0:
form.add_submit('button%s' % self.id, _('Add Comment'))
elif self.button_label:
form.add_submit('button%s' % self.id, self.button_label)
if form.get_widget('button%s' % self.id):
form.get_widget('button%s' % self.id).backoffice_info_text = self.backoffice_info_text
def submit_form(self, form, formdata, user, evo):
if form.get_widget('comment'):
evo.comment = form.get_widget('comment').parse()
if self.varname:
workflow_data = {'comment_%s' % self.varname: evo.comment}
formdata.update_workflow_data(workflow_data)
def submit_admin_form(self, form):
for f in self.get_parameters():
widget = form.get_widget(f)
setattr(self, f, widget.parse())
def fill_admin_form(self, form):
if self.by and not isinstance(self.by, list):
self.by = None
return super().fill_admin_form(form)
def get_parameters(self):
return (
'label',
'button_label',
'hint',
'by',
'varname',
'backoffice_info_text',
'required',
'condition',
)
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'label' in parameters:
if self.label is None:
self.label = str(_('Comment'))
form.add(StringWidget, '%slabel' % prefix, size=40, title=_('Label'), value=self.label)
if 'button_label' in parameters:
if self.button_label == 0:
self.button_label = str(_('Add Comment'))
form.add(
StringWidget,
'%sbutton_label' % prefix,
title=_('Button Label'),
hint=_('(empty to disable the button)'),
value=self.button_label,
)
if 'hint' in parameters:
form.add(StringWidget, '%shint' % prefix, size=40, title=_('Hint'), value=self.hint)
if 'by' in parameters:
form.add(
WidgetList,
'%sby' % prefix,
title=_('By'),
element_type=SingleSelectWidget,
value=self.by,
add_element_label=self.get_add_role_label(),
element_kwargs={
'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles(),
},
)
if 'varname' in parameters:
form.add(
VarnameWidget,
'%svarname' % prefix,
title=_('Identifier'),
value=self.varname,
hint=_('This will make the comment available in a variable named comment_ + identifier.'),
)
if 'required' in parameters:
form.add(CheckboxWidget, '%srequired' % prefix, title=_('Required'), value=self.required)
if 'backoffice_info_text' in parameters:
form.add(
WysiwygTextWidget,
'%sbackoffice_info_text' % prefix,
title=_('Information Text for Backoffice'),
value=self.backoffice_info_text,
)
def button_label_export_to_xml(self, xml_item, charset, include_id=False):
if self.button_label == 0:
pass
elif self.button_label is None:
# button_label being None is a special case meaning "no button", it
# should be handled differently than the "not filled" case
el = ET.SubElement(xml_item, 'button_label')
else:
el = ET.SubElement(xml_item, 'button_label')
el.text = self.button_label
def button_label_init_with_xml(self, element, charset, include_id=False, snapshot=False):
if element is None:
return
# this can be None if element is self-closing, <button_label />, which
# then maps to None, meaning "no button".
self.button_label = xml_node_text(element)
register_item_class(CommentableWorkflowStatusItem)
class ChoiceWorkflowStatusItem(WorkflowStatusJumpItem):
description = _('Manual Jump')
key = 'choice'
endpoint = False
waitpoint = True
ok_in_global_action = False
label = None
identifier = None
by = []
backoffice_info_text = None
require_confirmation = False
ignore_form_errors = False
def get_label(self):
expression = self.get_expression(self.label, allow_python=False)
if expression['type'] == 'text':
return expression['value']
return _('computed label')
def get_line_details(self):
to_status = None
if self.status == '_previous':
to_status = WorkflowStatus(_('previously marked status'))
elif self.status:
try:
to_status = self.parent.parent.get_status(self.status)
except KeyError:
return _('broken, missing destination status')
if self.label and to_status:
more = ''
if self.set_marker_on_status:
more += ' ' + str(_('(and set marker)'))
if self.by:
return _('"%(label)s", to %(to)s, by %(by)s%(more)s') % {
'label': self.get_label(),
'to': to_status.name,
'by': self.render_list_of_roles(self.by),
'more': more,
}
else:
return _('"%(label)s", to %(to)s%(more)s') % {
'label': self.get_label(),
'to': to_status.name,
'more': more,
}
else:
return _('not completed')
def fill_form(self, form, formdata, user, **kwargs):
label = self.compute(self.label)
if not label:
return
widget = form.add_submit('button%s' % self.id, label)
if self.identifier:
widget.extra_css_class = 'button-%s' % self.identifier
if self.require_confirmation:
get_response().add_javascript(['jquery.js', '../../i18n.js', 'qommon.js'])
widget.attrs = {'data-ask-for-confirmation': 'true'}
widget.backoffice_info_text = self.backoffice_info_text
widget.ignore_form_errors = self.ignore_form_errors
if self.ignore_form_errors:
widget.attrs['formnovalidate'] = 'formnovalidate'
def submit_form(self, form, formdata, user, evo):
if form.get_submit() == 'button%s' % self.id:
wf_status = self.get_target_status(formdata)
if wf_status:
evo.status = 'wf-%s' % wf_status[0].id
self.handle_markers_stack(formdata)
form.clear_errors()
return True # get out of processing loop
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'label' in parameters:
form.add(
ComputedExpressionWidget,
'%slabel' % prefix,
title=_('Label'),
value=self.label,
allow_python=False,
)
if 'by' in parameters:
form.add(
WidgetList,
'%sby' % prefix,
title=_('By'),
element_type=SingleSelectWidget,
value=self.by,
add_element_label=self.get_add_role_label(),
element_kwargs={
'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles(),
},
)
if 'require_confirmation' in parameters:
form.add(
CheckboxWidget,
'%srequire_confirmation' % prefix,
title=_('Require confirmation'),
value=self.require_confirmation,
)
if 'backoffice_info_text' in parameters:
form.add(
WysiwygTextWidget,
'%sbackoffice_info_text' % prefix,
title=_('Information Text for Backoffice'),
value=self.backoffice_info_text,
)
if 'identifier' in parameters:
form.add(
VarnameWidget,
'%sidentifier' % prefix,
title=_('Identifier'),
value=self.identifier,
advanced=True,
)
if 'ignore_form_errors' in parameters:
form.add(
CheckboxWidget,
'%signore_form_errors' % prefix,
title=_('Ignore form errors'),
value=self.ignore_form_errors,
advanced=True,
)
def get_parameters(self):
return (
'label',
'by',
'status',
'require_confirmation',
'backoffice_info_text',
'ignore_form_errors',
'set_marker_on_status',
'condition',
'identifier',
)
register_item_class(ChoiceWorkflowStatusItem)
class JumpOnSubmitWorkflowStatusItem(WorkflowStatusJumpItem):
description = _('On Submit Jump')
key = 'jumponsubmit'
ok_in_global_action = False
def get_line_details(self):
if self.status:
if self.get_target_status():
return _('to %s') % self.get_target_status()[0].name
else:
return _('broken')
else:
return _('not completed')
def submit_form(self, form, formdata, user, evo):
if form.is_submitted() and not form.has_errors():
wf_status = self.get_target_status(formdata)
if wf_status:
evo.status = 'wf-%s' % wf_status[0].id
self.handle_markers_stack(formdata)
def get_parameters(self):
return ('status', 'set_marker_on_status', 'condition')
register_item_class(JumpOnSubmitWorkflowStatusItem)
class SendmailWorkflowStatusItem(WorkflowStatusItem):
description = _('Email')
key = 'sendmail'
category = 'interaction'
support_substitution_variables = True
to = []
subject = None
mail_template = None
body = None
custom_from = None
attachments = None
comment = None
def _get_role_id_from_xml(self, elem, charset, include_id=False):
# override to allow for destination set with computed values.
if elem is None:
return None
value = xml_node_text(elem)
if self.get_expression(value)['type'] != 'text' or '@' in value:
return value
return super()._get_role_id_from_xml(elem, charset, include_id=include_id)
def render_list_of_roles_or_emails(self, roles):
t = []
for r in roles:
expression = self.get_expression(r)
if expression['type'] in ('python', 'template'):
t.append(_('computed value'))
elif '@' in expression['value']:
t.append(expression['value'])
else:
role_label = get_role_translation_label(self.parent.parent, r)
if role_label:
t.append(role_label)
return ', '.join([str(x) for x in t])
def get_to_parameter_view_value(self):
return self.render_list_of_roles_or_emails(self.to)
def get_line_details(self):
if self.to:
return _('to %s') % self.render_list_of_roles_or_emails(self.to)
else:
return _('not completed')
def get_parameters(self):
parameters = ('to', 'mail_template', 'subject', 'body', 'attachments', 'custom_from', 'condition')
if (
not get_publisher().has_site_option('include-sendmail-custom-from-option')
and not self.custom_from
):
parameters = tuple(x for x in parameters if x != 'custom_from')
return parameters
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
subject_body_attrs = {}
if 'subject' in parameters or 'body' in parameters:
if MailTemplate.count():
subject_body_attrs = {
'data-dynamic-display-value': '',
'data-dynamic-display-child-of': '%smail_template' % prefix,
}
if 'to' in parameters:
form.add(
WidgetList,
'%sto' % prefix,
title=_('To'),
element_type=SingleSelectWidgetWithOther,
value=self.to,
add_element_label=self.get_add_role_label(),
element_kwargs={
'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles(include_logged_in_users=False),
},
)
if 'subject' in parameters:
form.add(
StringWidget,
'%ssubject' % prefix,
title=_('Subject'),
validation_function=ComputedExpressionWidget.validate_template,
value=self.subject,
size=40,
attrs=subject_body_attrs,
)
if 'mail_template' in parameters and MailTemplate.count():
form.add(
SingleSelectWidget,
'%smail_template' % prefix,
title=_('Mail Template'),
value=self.mail_template,
options=[(None, '', '')] + MailTemplate.get_as_options_list(),
attrs={'data-dynamic-display-parent': 'true'},
)
if 'body' in parameters:
form.add(
TextWidget,
'%sbody' % prefix,
title=_('Body'),
value=self.body,
cols=80,
rows=10,
validation_function=ComputedExpressionWidget.validate_template,
attrs=subject_body_attrs,
)
if 'custom_from' in parameters:
form.add(
ComputedExpressionWidget,
'%scustom_from' % prefix,
title=_('Custom From Address'),
value=self.custom_from,
advanced=not (bool(self.custom_from)),
)
def get_body_parameter_view_value(self):
return htmltext('<pre class="wrapping-pre">%s</pre>') % self.body
def perform(self, formdata):
if not self.to:
return
if not (self.subject and self.body) and not self.mail_template:
return
body = self.body
subject = self.subject
extra_attachments = None
if self.mail_template:
mail_template = MailTemplate.get_by_slug(self.mail_template)
if mail_template:
body = mail_template.body
subject = mail_template.subject
extra_attachments = mail_template.attachments
else:
message = _('reference to invalid mail template %(mail_template)s in status %(status)s') % {
'status': self.parent.name,
'mail_template': self.mail_template,
}
get_publisher().record_error(message, formdata=formdata, status_item=self)
return
try:
mail_body = template_on_formdata(
formdata, self.compute(body, render=False), autoescape=body.startswith('<')
)
except TemplateError as e:
get_publisher().record_error(
_('Error in body template, mail could not be generated'), formdata=formdata, exception=e
)
return
try:
mail_subject = template_on_formdata(
formdata, self.compute(subject, render=False), autoescape=False
)
except TemplateError as e:
get_publisher().record_error(
_('Error in subject template, mail could not be generated'), formdata=formdata, exception=e
)
return
# this works around the fact that parametric workflows only support
# string values, so if we get set a string, we convert it here to an
# array.
if isinstance(self.to, str):
self.to = [self.to]
addresses = []
for dest in self.to:
try:
dest = self.compute(dest, raises=True)
except Exception:
continue
if not dest:
continue
if isinstance(dest, list):
addresses.extend(dest)
continue
if isinstance(dest, str) and ',' in dest:
# if the email contains a comma consider it as a serie of
# emails
addresses.extend([x.strip() for x in dest.split(',')])
continue
if '@' in str(dest):
addresses.append(dest)
continue
if dest == '_submitter':
submitter_email = formdata.formdef.get_submitter_email(formdata)
if submitter_email:
addresses.append(submitter_email)
continue
for real_dest in formdata.get_function_roles(dest):
if real_dest.startswith('_user:'):
try:
user = get_publisher().user_class.get(real_dest.split(':')[1])
except KeyError:
continue
if user.email:
addresses.append(user.email)
continue
try:
role = get_publisher().role_class.get(real_dest)
except KeyError:
continue
addresses.extend(role.get_emails())
if not addresses:
return
email_from = None
if self.custom_from:
email_from = self.compute(self.custom_from)
attachments = self.convert_attachments_to_uploads(extra_attachments)
if len(addresses) > 1:
emails.email(
mail_subject,
mail_body,
email_rcpt=None,
bcc=addresses,
email_from=email_from,
attachments=attachments,
fire_and_forget=True,
)
else:
emails.email(
mail_subject,
mail_body,
email_rcpt=addresses,
email_from=email_from,
attachments=attachments,
fire_and_forget=True,
)
register_item_class(SendmailWorkflowStatusItem)
def get_formdata_template_context(formdata=None):
ctx = get_publisher().substitutions.get_context_variables('lazy')
if formdata:
ctx['url'] = formdata.get_url()
ctx['url_status'] = '%sstatus' % formdata.get_url()
ctx['details'] = formdata.formdef.get_detailed_email_form(formdata, ctx['url'])
ctx['name'] = formdata.formdef.name
ctx['number'] = formdata.id
if formdata.evolution and formdata.evolution[-1].comment:
ctx['comment'] = formdata.evolution[-1].comment
else:
ctx['comment'] = ''
ctx.update(formdata.get_as_dict())
# compatibility vars
ctx['before'] = ctx.get('form_previous_status')
ctx['after'] = ctx.get('form_status')
ctx['evolution'] = ctx.get('form_evolution')
return ctx
def template_on_html_string(template):
return template_on_formdata(None, template, ezt_format=ezt.FORMAT_HTML)
def template_on_formdata(formdata=None, template=None, **kwargs):
assert template is not None
if not Template.is_template_string(template):
# no tags, no variables: don't even process formdata
return template
context = get_formdata_template_context(formdata)
return template_on_context(context, template, **kwargs)
def template_on_context(context=None, template=None, **kwargs):
assert template is not None
if not Template.is_template_string(template):
return template
return Template(template, **kwargs).render(context)
class SendSMSWorkflowStatusItem(WorkflowStatusItem):
description = _('SMS')
key = 'sendsms'
category = 'interaction'
support_substitution_variables = True
to = []
body = None
# don't use roles (de)serializer for "to" field
to_export_to_xml = None
to_init_with_xml = None
@classmethod
def is_available(cls, workflow=None):
sms_cfg = get_cfg('sms', {})
return bool(sms_cfg.get('sender') and sms_cfg.get('passerelle_url'))
def get_parameters(self):
return ('to', 'body', 'condition')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'to' in parameters:
form.add(
WidgetList,
'%sto' % prefix,
title=_('To'),
element_type=ComputedExpressionWidget,
value=self.to,
add_element_label=_('Add Number'),
element_kwargs={'render_br': False},
)
if 'body' in parameters:
form.add(TextWidget, '%sbody' % prefix, title=_('Body'), value=self.body, cols=80, rows=10)
def perform(self, formdata):
if not self.is_available():
return
if not self.to:
return
if not self.body:
return
destinations = [self.compute(x) for x in self.to]
destinations = [x for x in destinations if x] # ignore empty elements
if not destinations:
return
try:
sms_body = template_on_formdata(formdata, self.compute(self.body, render=False))
except TemplateError as e:
get_publisher().record_error(
_('Error in template, sms could not be generated'), formdata=formdata, exception=e
)
return
from .qommon import sms
sms_cfg = get_cfg('sms', {})
sender = sms_cfg.get('sender', 'AuQuotidien')[:11]
try:
sms.SMS.get_sms_class().send(sender, destinations, sms_body)
except errors.SMSError as e:
get_publisher().record_error(_('Could not send SMS'), formdata=formdata, exception=e)
register_item_class(SendSMSWorkflowStatusItem)
class DisplayMessageWorkflowStatusItem(WorkflowStatusItem):
description = _('Alert')
key = 'displaymsg'
category = 'interaction'
support_substitution_variables = True
ok_in_global_action = False
to = None
position = 'top'
level = None
message = None
def get_line_details(self):
parts = []
if self.position == 'top':
parts.append(_('top of page'))
elif self.position == 'bottom':
parts.append(_('bottom of page'))
elif self.position == 'actions':
parts.append(_('with actions'))
if self.to:
parts.append(_('for %s') % self.render_list_of_roles(self.to))
return ', '.join([str(x) for x in parts])
def get_message(self, filled, position='top'):
if not (self.message and self.position == position and filled.is_for_current_user(self.to)):
return ''
dict = copy.copy(get_publisher().substitutions.get_context_variables('lazy'))
dict['date'] = misc.localstrftime(filled.receipt_time)
dict['number'] = filled.id
handling_role = filled.get_handling_role()
if handling_role and handling_role.details:
dict['receiver'] = handling_role.details.replace('\n', '<br />')
message = self.message
if self.level:
message = '<div class="%snotice">%s</div>' % (self.level, message)
return Template(message, ezt_format=ezt.FORMAT_HTML).render(dict)
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'message' in parameters:
form.add(
TextWidget,
'%smessage' % prefix,
title=_('Message'),
value=self.message,
cols=80,
rows=10,
validation_function=ComputedExpressionWidget.validate_template,
)
if 'level' in parameters:
form.add(
SingleSelectWidget,
'%slevel' % prefix,
title=_('Level'),
value=self.level,
options=[
(None, ''),
('success', _('Success')),
('info', _('Information')),
('warning', _('Warning')),
('error', _('Error')),
],
)
if 'position' in parameters:
form.add(
SingleSelectWidget,
'%sposition' % prefix,
title=_('Position'),
value=self.position,
options=[
('top', _('Top of page')),
('bottom', _('Bottom of page')),
# ('actions', _('With actions')) "too complicated"
],
)
if 'to' in parameters:
form.add(
WidgetListOfRoles,
'%sto' % prefix,
title=_('To'),
value=self.to or [],
add_element_label=self.get_add_role_label(),
first_element_empty_label=_('Everybody'),
roles=self.get_list_of_roles(include_logged_in_users=False),
)
def get_parameters(self):
return ('message', 'level', 'position', 'to', 'condition')
def get_message_parameter_view_value(self):
if self.message.startswith('<'):
return htmltext(self.message)
return htmltext('<pre>%s</pre>') % self.message
register_item_class(DisplayMessageWorkflowStatusItem)
class RedirectToStatusWorkflowStatusItem(WorkflowStatusItem):
description = _('Status Page Redirection')
key = 'redirectstatus'
ok_in_global_action = False
backoffice = False
def perform(self, formdata):
return formdata.get_url(self.backoffice)
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'backoffice' in parameters:
form.add(
CheckboxWidget,
'%sbackoffice' % prefix,
title=_('Redirect to backoffice page'),
value=self.backoffice,
)
def get_parameters(self):
return ('backoffice',)
# RedirectToStatusWorkflowStatusItem is not registered as the class kept for
# backward compatibility only and should not be exposed to the user. (#3031)
class EditableWorkflowStatusItem(WorkflowStatusItem):
description = _('Edition')
key = 'editable'
category = 'formdata-action'
endpoint = False
waitpoint = True
ok_in_global_action = False
by = []
status = None
label = None
backoffice_info_text = None
def get_line_details(self):
if self.by:
return _('by %s') % self.render_list_of_roles(self.by)
else:
return _('not completed')
def fill_form(self, form, formdata, user, **kwargs):
label = self.label
if not label:
label = _('Edit Form')
form.add_submit('button%s' % self.id, label)
form.get_widget('button%s' % self.id).backoffice_info_text = self.backoffice_info_text
def submit_form(self, form, formdata, user, evo):
if form.get_submit() == 'button%s' % self.id:
return formdata.get_url(backoffice=get_request().is_in_backoffice()) + 'wfedit-%s' % self.id
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'by' in parameters:
form.add(
WidgetList,
'%sby' % prefix,
title=_('By'),
element_type=SingleSelectWidget,
value=self.by,
add_element_label=self.get_add_role_label(),
element_kwargs={
'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles(),
},
)
if 'status' in parameters:
form.add(
SingleSelectWidget,
'%sstatus' % prefix,
title=_('Status After Edit'),
value=self.status,
hint=_("Don't select any if you don't want status change processing"),
options=[(None, '---')] + [(x.id, x.name) for x in self.parent.parent.possible_status],
)
if 'label' in parameters:
form.add(StringWidget, '%slabel' % prefix, title=_('Button Label'), value=self.label)
if 'backoffice_info_text' in parameters:
form.add(
WysiwygTextWidget,
'%sbackoffice_info_text' % prefix,
title=_('Information Text for Backoffice'),
value=self.backoffice_info_text,
)
def get_parameters(self):
return ('by', 'status', 'label', 'backoffice_info_text', 'condition')
register_item_class(EditableWorkflowStatusItem)
def load_extra():
from .wf import aggregation_email # noqa pylint: disable=unused-import
from .wf import anonymise # noqa pylint: disable=unused-import
from .wf import attachment # noqa pylint: disable=unused-import
from .wf import backoffice_fields # noqa pylint: disable=unused-import
from .wf import create_carddata # noqa pylint: disable=unused-import
from .wf import create_formdata # noqa pylint: disable=unused-import
from .wf import criticality # noqa pylint: disable=unused-import
from .wf import dispatch # noqa pylint: disable=unused-import
from .wf import edit_carddata # noqa pylint: disable=unused-import
from .wf import export_to_model # noqa pylint: disable=unused-import
from .wf import external_workflow # noqa pylint: disable=unused-import
from .wf import form # noqa pylint: disable=unused-import
from .wf import geolocate # noqa pylint: disable=unused-import
from .wf import jump # noqa pylint: disable=unused-import
from .wf import notification # noqa pylint: disable=unused-import
from .wf import profile # noqa pylint: disable=unused-import
from .wf import redirect_to_url # noqa pylint: disable=unused-import
from .wf import register_comment # noqa pylint: disable=unused-import
from .wf import remove # noqa pylint: disable=unused-import
from .wf import resubmit # noqa pylint: disable=unused-import
from .wf import roles # noqa pylint: disable=unused-import
from .wf import timeout_jump # noqa pylint: disable=unused-import
from .wf import wscall # noqa pylint: disable=unused-import
aggregation_email.register_cronjob()
jump.register_cronjob()
from .wf.export_to_model import ExportToModel # noqa pylint: disable=unused-import,wrong-import-position