wcs/wcs/workflows.py

3459 lines
127 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import base64
import collections
import copy
import datetime
import itertools
import os
import random
import time
import uuid
import xml.etree.ElementTree as ET
from django.utils.encoding import force_text
from quixote import get_publisher, get_request, get_response
from quixote.html import TemplateIO, htmltext
from .carddef import CardDef
from .conditions import Condition
from .fields import FileField
from .formdata import Evolution
from .formdef import FormDef, FormdefImportError
from .mail_templates import MailTemplate
from .qommon import _, emails, errors, ezt, force_str, get_cfg, get_logger, misc
from .qommon.form import (
CheckboxWidget,
ComputedExpressionWidget,
ConditionWidget,
Form,
SingleSelectWidget,
SingleSelectWidgetWithOther,
StringWidget,
TextWidget,
ValidatedStringWidget,
VarnameWidget,
WidgetList,
WysiwygTextWidget,
)
from .qommon.humantime import seconds2humanduration
from .qommon.misc import C_, file_digest, get_as_datetime, xml_node_text
from .qommon.storage import Contains, NotEqual, Null, StorableObject, atomic_write, pickle_2to3_conversion
from .qommon.template import Template, TemplateError
from .qommon.upload_storage import PicklableUpload, get_storage_object
from .roles import get_user_roles, logged_users_role
if not __name__.startswith('wcs.') and __name__ != "__main__":
raise ImportError('Import of workflows module must be absolute (import wcs.workflows)')
def lax_int(s):
try:
return int(s)
except (ValueError, TypeError):
return -1
def perform_items(items, formdata, depth=20, event=None):
if depth == 0: # prevents infinite loops
return
url = None
old_status = formdata.status
performed_actions = []
for item in items:
if getattr(item.perform, 'noop', False):
continue
if not item.check_condition(formdata):
continue
performed_actions.append((datetime.datetime.now(), item.key, item.id))
try:
url = item.perform(formdata) or url
except AbortActionException as e:
url = e.url or url
break
if formdata.status != old_status:
break
if performed_actions:
latest_evolution = formdata.evolution[-1] if formdata.evolution else None
if latest_evolution:
latest_evolution.add_part(ActionsTracingEvolutionPart(event, performed_actions))
if formdata.id:
# don't save formdata it has been removed
formdata.store()
if formdata.status != old_status:
if not formdata.evolution:
formdata.evolution = []
evo = Evolution()
evo.time = time.localtime()
evo.status = formdata.status
formdata.evolution.append(evo)
formdata.store()
# performs the items of the new status
wf_status = formdata.get_status()
url = perform_items(wf_status.items, formdata, depth=depth - 1, event='continuation') or url
if url:
# hack around webtest as it checks type(url) is str and
# this won't work on django safe strings (isinstance would work);
# adding '' makes sure we get a "real" str object.
url = url + ''
return url
class WorkflowImportError(Exception):
def __init__(self, msg, msg_args=None, details=None):
self.msg = msg
self.msg_args = msg_args or ()
self.details = details
class AbortActionException(Exception):
def __init__(self, url=None):
self.url = url
class RedisplayFormException(Exception):
pass
class AttachmentSubstitutionProxy:
def __init__(self, formdata, attachment_evolution_part):
self.formdata = formdata
self.attachment_evolution_part = attachment_evolution_part
@property
def filename(self):
return self.attachment_evolution_part.orig_filename
@property
def content_type(self):
return self.attachment_evolution_part.content_type
@property
def content(self):
fp = self.attachment_evolution_part.get_file_pointer()
if fp:
return fp.read()
return b''
@property
def b64_content(self):
return base64.b64encode(self.content)
@property
def url(self):
return '%sattachment?f=%s' % (
self.formdata.get_url(),
os.path.basename(self.attachment_evolution_part.filename),
)
class NamedAttachmentsSubstitutionProxy:
def __init__(self, formdata, parts):
self.formdata = formdata
self.parts = parts[:]
self.parts.reverse()
def __len__(self):
return len(self.parts)
def __getattr__(self, name):
return getattr(self[0], name)
def __getitem__(self, i):
return AttachmentSubstitutionProxy(self.formdata, self.parts[i])
class AttachmentsSubstitutionProxy:
def __init__(self, formdata):
self.formdata = formdata
def __getattr__(self, name):
if name.startswith('__'):
raise AttributeError(name)
def has_varname_attachment(part):
return isinstance(part, AttachmentEvolutionPart) and getattr(part, 'varname', None) == name
parts = [part for part in self.formdata.iter_evolution_parts() if has_varname_attachment(part)]
if parts:
return NamedAttachmentsSubstitutionProxy(self.formdata, parts)
raise AttributeError(name)
class EvolutionPart:
to = None
is_hidden = None
view = None
def render_for_fts(self):
if not self.view or self.to:
# don't include parts with no content or restricted visibility
return ''
return misc.html2text(self.view() or '')
class AttachmentEvolutionPart(EvolutionPart):
orig_filename = None
base_filename = None
content_type = None
charset = None
varname = None
render_for_fts = None
storage = None
storage_attrs = None
def __init__(
self,
base_filename,
fp,
orig_filename=None,
content_type=None,
charset=None,
varname=None,
storage=None,
storage_attrs=None,
to=None,
):
self.base_filename = base_filename
self.orig_filename = orig_filename or base_filename
self.content_type = content_type
self.charset = charset
self.fp = fp
self.varname = varname
self.storage = storage
self.storage_attrs = storage_attrs
self.to = to
@classmethod
def from_upload(cls, upload, varname=None, to=None):
return AttachmentEvolutionPart(
upload.base_filename,
getattr(upload, 'fp', None),
upload.orig_filename,
upload.content_type,
upload.charset,
varname=varname,
storage=getattr(upload, 'storage', None),
storage_attrs=getattr(upload, 'storage_attrs', None),
to=to,
)
def get_file_pointer(self):
if self.filename.startswith('uuid-'):
return None
return open(self.filename, 'rb') # pylint: disable=consider-using-with
def __getstate__(self):
odict = self.__dict__.copy()
if not odict.get('fp'):
if 'filename' not in odict:
# we need a filename as an identifier: create one from nothing
# instead of file_digest(self.fp) (see below)
odict['filename'] = 'uuid-%s' % uuid.uuid4()
self.filename = odict['filename']
return odict
del odict['fp']
dirname = os.path.join(get_publisher().app_dir, 'attachments')
if not os.path.exists(dirname):
os.mkdir(dirname)
# there is not filename, or it was a temporary one: create it
if 'filename' not in odict or odict['filename'].startswith('uuid-'):
filename = file_digest(self.fp)
dirname = os.path.join(dirname, filename[:4])
if not os.path.exists(dirname):
os.mkdir(dirname)
odict['filename'] = os.path.join(dirname, filename)
self.filename = odict['filename']
self.fp.seek(0)
atomic_write(self.filename, self.fp)
return odict
def view(self):
show_link = True
if self.has_redirect_url():
is_in_backoffice = bool(get_request() and get_request().is_in_backoffice())
show_link = bool(self.get_redirect_url(backoffice=is_in_backoffice))
if show_link:
return htmltext(
'<p class="wf-attachment"><a href="attachment?f=%s">%s</a></p>'
% (os.path.basename(self.filename), self.orig_filename)
)
else:
return htmltext('<p class="wf-attachment">%s</p>' % self.orig_filename)
def get_json_export_dict(self, anonymise=False, include_files=True):
if not include_files or anonymise:
return None
d = {
'type': 'workflow-attachment',
'content_type': self.content_type,
'filename': self.base_filename,
'to': self.to,
}
fd = self.get_file_pointer()
if fd:
d['content'] = base64.encodebytes(fd.read())
fd.close()
return d
@classmethod
def get_substitution_variables(cls, formdata):
return {
'attachments': AttachmentsSubstitutionProxy(formdata),
'form_attachments': AttachmentsSubstitutionProxy(formdata),
}
# mimic PicklableUpload methods:
def can_thumbnail(self):
return get_storage_object(getattr(self, 'storage', None)).can_thumbnail(self)
def has_redirect_url(self):
return get_storage_object(getattr(self, 'storage', None)).has_redirect_url(self)
def get_redirect_url(self, backoffice=False):
return get_storage_object(getattr(self, 'storage', None)).get_redirect_url(
self, backoffice=backoffice
)
class ActionsTracingEvolutionPart(EvolutionPart):
def __init__(self, event, actions):
if isinstance(event, tuple):
self.event = event[0]
self.event_args = event[1:]
else:
self.event = event
self.event_args = None
self.actions = actions
def get_event_label(self):
return {
'api-created': _('Created (by API)'),
'api-post-edit-action': _('Actions after edit action (by API)'),
'api-trigger': _('API Trigger'),
'backoffice-created': _('Created (backoffice submission)'),
'continuation': _('Continuation'),
'csv-import-created': _('Created (by CSV import)'),
'edit-action': _('Actions after edit action'),
'frontoffice-created': _('Created (frontoffice submission)'),
'global-action-button': _('Click on a global action button'),
'global-action': _('Global action'),
'global-action-timeout': _('Global action timeout'),
'timeout-jump': _('Timeout jump'),
'workflow-created': _('Created (by workflow action)'),
'workflow-form-submit': _('Action in workflow form'),
}.get(self.event, self.event)
def is_global_event(self):
return bool(self.event and self.event.startswith('global-'))
def get_base_url(self, workflow, status_id):
if self.is_global_event():
return '%sglobal-actions/%s/' % (workflow.get_admin_url(), self.event_args[0])
status = workflow.get_status(status_id)
return status.get_admin_url()
class DuplicateGlobalActionNameError(Exception):
pass
class DuplicateStatusNameError(Exception):
pass
class WorkflowVariablesFieldsFormDef(FormDef):
"""Class to handle workflow variables, it loads and saves from/to
the workflow object 'variables_formdef' attribute."""
lightweight = False
def __init__(self, workflow):
self.id = None
self.workflow = workflow
if workflow.variables_formdef and workflow.variables_formdef.fields:
self.fields = self.workflow.variables_formdef.fields
self.max_field_id = max(lax_int(x.id) for x in self.fields or [])
else:
self.fields = []
@property
def name(self):
return _('Options of workflow "%s"') % self.workflow.name
def get_admin_url(self):
base_url = get_publisher().get_backoffice_url()
return '%s/workflows/%s/variables/fields/' % (base_url, self.workflow.id)
def store(self, comment=None, *args, **kwargs):
for field in self.fields:
if hasattr(field, 'widget_class'):
if not field.varname:
field.varname = misc.simplify(field.label, space='_')
self.workflow.variables_formdef = self
self.workflow.store(comment=comment, *args, **kwargs)
class WorkflowBackofficeFieldsFormDef(FormDef):
"""Class to handle workflow backoffice fields, it loads and saves from/to
the workflow object 'backoffice_fields_formdef' attribute."""
lightweight = False
field_prefix = 'bo'
def __init__(self, workflow):
self.id = None
self.workflow = workflow
if workflow.backoffice_fields_formdef and workflow.backoffice_fields_formdef.fields:
self.fields = self.workflow.backoffice_fields_formdef.fields
else:
self.fields = []
@property
def name(self):
return _('Backoffice fields of workflow "%s"') % self.workflow.name
def get_admin_url(self):
base_url = get_publisher().get_backoffice_url()
return '%s/workflows/%s/backoffice-fields/fields/' % (base_url, self.workflow.id)
def get_new_field_id(self):
return '%s%s' % (self.field_prefix, str(uuid.uuid4()))
def store(self, comment=None):
self.workflow.backoffice_fields_formdef = self
self.workflow.store(comment=comment)
class Workflow(StorableObject):
_names = 'workflows'
xml_root_node = 'workflow'
name = None
possible_status = None
roles = None
variables_formdef = None
backoffice_fields_formdef = None
global_actions = None
criticality_levels = None
def __init__(self, name=None):
StorableObject.__init__(self)
self.name = name
self.possible_status = []
self.roles = {'_receiver': force_text(_('Recipient'))}
self.global_actions = []
self.criticality_levels = []
def migrate(self):
changed = False
if 'roles' not in self.__dict__ or self.roles is None:
self.roles = {'_receiver': force_text(_('Recipient'))}
changed = True
for status in self.possible_status:
changed |= status.migrate()
if self.backoffice_fields_formdef and self.backoffice_fields_formdef.fields:
for field in self.backoffice_fields_formdef.fields:
changed |= field.migrate()
if not self.global_actions:
self.global_actions = []
if changed:
self.store()
def store(self, comment=None, *args, **kwargs):
assert not self.is_readonly()
must_update = False
if self.id:
old_self = self.get(self.id, ignore_errors=True, ignore_migration=True)
if old_self:
old_endpoints = {x.id for x in old_self.get_endpoint_status()}
if old_endpoints != {x.id for x in self.get_endpoint_status()}:
must_update = True
old_criticality_levels = len(old_self.criticality_levels or [0])
if old_criticality_levels != len(self.criticality_levels or [0]):
must_update = True
try:
old_backoffice_fields = old_self.backoffice_fields_formdef.fields
except AttributeError:
old_backoffice_fields = []
try:
new_backoffice_fields = self.backoffice_fields_formdef.fields
except AttributeError:
new_backoffice_fields = []
if len(old_backoffice_fields) != len(new_backoffice_fields):
must_update = True
elif self.backoffice_fields_formdef:
must_update = True
StorableObject.store(self, *args, **kwargs)
if get_publisher().snapshot_class:
get_publisher().snapshot_class.snap(instance=self, comment=comment)
def update(job=None):
# instruct all related carddefs/formdefs to update.
for form in itertools.chain(
self.formdefs(ignore_migration=True, order_by='id'),
self.carddefs(ignore_migration=True, order_by='id'),
):
form.data_class().rebuild_security()
if must_update:
form.rebuild()
if get_response():
get_response().add_after_job(_('Reindexing cards and forms after workflow change'), update)
else:
update()
def get_admin_url(self):
base_url = get_publisher().get_backoffice_url()
return '%s/workflows/%s/' % (base_url, self.id)
@classmethod
def get(cls, id, ignore_errors=False, ignore_migration=False):
if id == '_default':
return cls.get_default_workflow()
elif id == '_carddef_default':
from wcs.carddef import CardDef
return CardDef.get_default_workflow()
return super().get(id, ignore_errors=ignore_errors, ignore_migration=ignore_migration)
def add_status(self, name, id=None):
if [x for x in self.possible_status if x.name == name]:
raise DuplicateStatusNameError()
status = WorkflowStatus(name)
status.parent = self
if id is None:
if self.possible_status:
status.id = str(max(lax_int(x.id) for x in self.possible_status) + 1)
else:
status.id = '1'
else:
status.id = id
self.possible_status.append(status)
return status
def get_status(self, id):
if id.startswith('wf-'):
id = id[3:]
for status in self.possible_status:
if status.id == id:
return status
raise KeyError()
def get_backoffice_fields(self):
if self.backoffice_fields_formdef:
return self.backoffice_fields_formdef.fields or []
return []
def get_all_items(self):
for status in self.possible_status or []:
yield from status.items or []
for action in self.global_actions or []:
yield from action.items or []
def add_global_action(self, name, id=None):
if [x for x in self.global_actions if x.name == name]:
raise DuplicateGlobalActionNameError()
action = WorkflowGlobalAction(name)
action.parent = self
action.append_trigger('manual')
if id is None:
if self.global_actions:
action.id = str(max(lax_int(x.id) for x in self.global_actions) + 1)
else:
action.id = '1'
else:
action.id = id
self.global_actions.append(action)
return action
def get_global_manual_actions(self):
actions = []
for action in self.global_actions or []:
roles = []
for trigger in action.triggers or []:
if not isinstance(trigger, WorkflowGlobalActionManualTrigger):
continue
roles.extend(trigger.roles or [])
functions = [x for x in roles if x in self.roles]
roles = [x for x in roles if x not in self.roles]
if functions or roles:
actions.append({'action': action, 'roles': roles, 'functions': functions})
return actions
def get_global_actions_for_user(self, formdata, user):
actions = []
for action in self.global_actions or []:
for trigger in action.triggers or []:
if isinstance(trigger, WorkflowGlobalActionManualTrigger):
if '_submitter' in (trigger.roles or []) and formdata.is_submitter(user):
actions.append(action)
break
if not user:
continue
roles = set()
for role_id in trigger.roles or []:
if role_id == '_submitter':
continue
roles |= formdata.get_function_roles(role_id)
if roles.intersection(user.get_roles()):
actions.append(action)
break
return actions
def get_subdirectories(self, formdata):
wf_status = formdata.get_status()
if not wf_status: # draft
return []
directories = []
for action in self.global_actions:
for trigger in action.triggers or []:
directories.extend(trigger.get_subdirectories(formdata))
directories.extend(wf_status.get_subdirectories(formdata))
return directories
def __setstate__(self, dict):
self.__dict__.update(dict)
pickle_2to3_conversion(self)
for s in self.possible_status + (self.global_actions or []):
s.parent = self
triggers = getattr(s, 'triggers', None) or []
for i, item in enumerate(s.items + triggers):
item.parent = s
if not item.id:
item.id = '%d' % (i + 1)
if self.variables_formdef:
self.variables_formdef.workflow = self
if self.backoffice_fields_formdef:
self.backoffice_fields_formdef.workflow = self
self.backoffice_fields_formdef.__class__ = WorkflowBackofficeFieldsFormDef
def get_waitpoint_status(self):
return [x for x in self.possible_status if x.is_waitpoint()]
def get_endpoint_status(self):
return [x for x in self.possible_status if x.is_endpoint()]
def get_not_endpoint_status(self):
return [x for x in self.possible_status if not x.is_endpoint()]
def has_options(self):
for status in self.possible_status:
for item in status.items:
for parameter in item.get_parameters():
if not getattr(item, parameter):
return True
return False
def remove_self(self):
for form in self.formdefs():
form.workflow_id = None
form.store()
StorableObject.remove_self(self)
def export_to_xml(self, include_id=False):
charset = get_publisher().site_charset
root = ET.Element('workflow')
if include_id and self.id and not str(self.id).startswith('_'):
root.attrib['id'] = str(self.id)
ET.SubElement(root, 'name').text = force_text(self.name, charset)
roles_node = ET.SubElement(root, 'roles')
if self.roles:
for role_id, role_label in sorted(self.roles.items()):
role_node = ET.SubElement(roles_node, 'role')
role_node.attrib['id'] = role_id
role_node.text = force_text(role_label, charset)
possible_status = ET.SubElement(root, 'possible_status')
for status in self.possible_status:
possible_status.append(status.export_to_xml(charset=charset, include_id=include_id))
if self.global_actions:
global_actions = ET.SubElement(root, 'global_actions')
for action in self.global_actions:
global_actions.append(action.export_to_xml(charset=charset, include_id=include_id))
if self.criticality_levels:
criticality_levels = ET.SubElement(root, 'criticality_levels')
for level in self.criticality_levels:
criticality_levels.append(level.export_to_xml(charset=charset))
if self.variables_formdef:
variables = ET.SubElement(root, 'variables')
formdef = ET.SubElement(variables, 'formdef')
ET.SubElement(formdef, 'name').text = '-' # required by formdef xml import
fields = ET.SubElement(formdef, 'fields')
for field in self.variables_formdef.fields:
fields.append(field.export_to_xml(charset=charset, include_id=include_id))
if self.backoffice_fields_formdef:
variables = ET.SubElement(root, 'backoffice-fields')
formdef = ET.SubElement(variables, 'formdef')
ET.SubElement(formdef, 'name').text = '-' # required by formdef xml import
fields = ET.SubElement(formdef, 'fields')
for field in self.backoffice_fields_formdef.fields:
fields.append(field.export_to_xml(charset=charset, include_id=include_id))
return root
@classmethod
def import_from_xml(cls, fd, include_id=False, check_datasources=True):
try:
tree = ET.parse(fd)
except Exception:
raise ValueError()
return cls.import_from_xml_tree(tree, include_id=include_id, check_datasources=check_datasources)
@classmethod
def import_from_xml_tree(cls, tree, include_id=False, snapshot=False, check_datasources=True):
charset = get_publisher().site_charset
workflow = cls()
if tree.find('name') is None or not tree.find('name').text:
raise WorkflowImportError(_('Missing name'))
# if the tree we get is actually a ElementTree for real, we get its
# root element and go on happily.
if not ET.iselement(tree):
tree = tree.getroot()
if tree.tag != 'workflow':
raise WorkflowImportError(_('Not a workflow'))
if include_id and tree.attrib.get('id'):
workflow.id = tree.attrib.get('id')
workflow.name = xml_node_text(tree.find('name'))
if tree.find('roles') is not None:
workflow.roles = {}
for role_node in tree.findall('roles/role'):
workflow.roles[role_node.attrib['id']] = xml_node_text(role_node)
workflow.possible_status = []
for status in tree.find('possible_status'):
status_o = WorkflowStatus()
status_o.parent = workflow
try:
status_o.init_with_xml(
status,
charset,
include_id=include_id,
snapshot=snapshot,
check_datasources=check_datasources,
)
except FormdefImportError as e:
raise WorkflowImportError(e.msg, details=e.details)
workflow.possible_status.append(status_o)
workflow.global_actions = []
global_actions = tree.find('global_actions')
if global_actions is not None:
for action in global_actions:
action_o = WorkflowGlobalAction()
action_o.parent = workflow
action_o.init_with_xml(action, charset, include_id=include_id, snapshot=snapshot)
workflow.global_actions.append(action_o)
workflow.criticality_levels = []
criticality_levels = tree.find('criticality_levels')
if criticality_levels is not None:
for level in criticality_levels:
level_o = WorkflowCriticalityLevel()
level_o.init_with_xml(level, charset)
workflow.criticality_levels.append(level_o)
variables = tree.find('variables')
if variables is not None:
formdef = variables.find('formdef')
try:
imported_formdef = FormDef.import_from_xml_tree(
formdef, include_id=True, snapshot=snapshot, check_datasources=check_datasources
)
except FormdefImportError as e:
raise WorkflowImportError(e.msg, details=e.details)
workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow)
workflow.variables_formdef.fields = imported_formdef.fields
variables = tree.find('backoffice-fields')
if variables is not None:
formdef = variables.find('formdef')
try:
imported_formdef = FormDef.import_from_xml_tree(
formdef, include_id=True, snapshot=snapshot, check_datasources=check_datasources
)
except FormdefImportError as e:
raise WorkflowImportError(e.msg, details=e.details)
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow=workflow)
workflow.backoffice_fields_formdef.fields = imported_formdef.fields
return workflow
def get_list_of_roles(self, include_logged_in_users=True):
t = []
t.append(('_submitter', C_('role|User'), '_submitter'))
for workflow_role in self.roles.items():
t.append(list(workflow_role) + [workflow_role[0]])
if include_logged_in_users:
t.append((logged_users_role().id, logged_users_role().name, logged_users_role().id))
include_roles = not (get_publisher().has_site_option('workflow-functions-only'))
if include_roles and get_user_roles():
t.append((None, '----', None))
t.extend(get_user_roles())
return t
def get_add_role_label(self):
if get_publisher().has_site_option('workflow-functions-only'):
return _('Add Function')
return _('Add Function or Role')
def render_list_of_roles(self, roles):
return render_list_of_roles(self, roles)
def get_json_export_dict(self, include_id=False):
charset = get_publisher().site_charset
root = {}
root['name'] = force_text(self.name, charset)
if include_id and self.id:
root['id'] = str(self.id)
roles = root['functions'] = {}
for role, label in self.roles.items():
roles[role] = force_text(label, charset)
statuses = root['statuses'] = []
endpoint_status_ids = [s.id for s in self.get_endpoint_status()]
waitpoint_status_ids = [s.id for s in self.get_waitpoint_status()]
for status in self.possible_status:
statuses.append(
{
'id': status.id,
'name': force_text(status.name, charset),
'forced_endpoint': status.forced_endpoint,
'endpoint': status.id in endpoint_status_ids,
'waitpoint': status.id in waitpoint_status_ids,
}
)
root['fields'] = []
for field in self.get_backoffice_fields():
root['fields'].append(field.export_to_json(include_id=include_id))
return root
@classmethod
def get_unknown_workflow(cls):
workflow = Workflow(name=_('Unknown'))
workflow.id = '_unknown'
return workflow
@classmethod
def get_default_workflow(cls):
from .qommon.admin.emails import EmailsDirectory
# force_text() is used on lazy gettext calls as the default workflow is used
# in tests as the basis for other ones and lazy gettext would fail pickling.
workflow = Workflow(name=force_text(_('Default')))
workflow.id = '_default'
workflow.roles = {'_receiver': force_text(_('Recipient'))}
just_submitted_status = workflow.add_status(force_text(_('Just Submitted')), 'just_submitted')
just_submitted_status.visibility = ['_receiver']
new_status = workflow.add_status(force_text(_('New')), 'new')
new_status.colour = '66FF00'
rejected_status = workflow.add_status(force_text(_('Rejected')), 'rejected')
rejected_status.colour = 'FF3300'
accepted_status = workflow.add_status(force_text(_('Accepted')), 'accepted')
accepted_status.colour = '66CCFF'
finished_status = workflow.add_status(force_text(_('Finished')), 'finished')
finished_status.colour = 'CCCCCC'
commentable = CommentableWorkflowStatusItem()
commentable.id = '_commentable'
commentable.by = ['_submitter', '_receiver']
from wcs.wf.jump import JumpWorkflowStatusItem
jump_to_new = JumpWorkflowStatusItem()
jump_to_new.id = '_jump_to_new'
jump_to_new.status = new_status.id
jump_to_new.parent = just_submitted_status
notify_new_receiver_email = SendmailWorkflowStatusItem()
notify_new_receiver_email.id = '_notify_new_receiver_email'
notify_new_receiver_email.to = ['_receiver']
notify_new_receiver_email.subject = EmailsDirectory.get_subject('new_receiver')
notify_new_receiver_email.body = EmailsDirectory.get_body('new_receiver')
if not EmailsDirectory.is_enabled('new_receiver'):
notify_new_receiver_email = None
notify_new_user_email = SendmailWorkflowStatusItem()
notify_new_user_email.id = '_notify_new_user_email'
notify_new_user_email.to = ['_submitter']
notify_new_user_email.subject = EmailsDirectory.get_subject('new_user')
notify_new_user_email.body = EmailsDirectory.get_body('new_user')
if not EmailsDirectory.is_enabled('new_user'):
notify_change_user_email = None
notify_change_receiver_email = SendmailWorkflowStatusItem()
notify_change_receiver_email.id = '_notify_change_receiver_email'
notify_change_receiver_email.to = ['_receiver']
notify_change_receiver_email.subject = EmailsDirectory.get_subject('change_receiver')
notify_change_receiver_email.body = EmailsDirectory.get_body('change_receiver')
if not EmailsDirectory.is_enabled('change_receiver'):
notify_change_receiver_email = None
notify_change_user_email = SendmailWorkflowStatusItem()
notify_change_user_email.id = '_notify_change_user_email'
notify_change_user_email.to = ['_submitter']
notify_change_user_email.subject = EmailsDirectory.get_subject('change_user')
notify_change_user_email.body = EmailsDirectory.get_body('change_user')
if not EmailsDirectory.is_enabled('change_user'):
notify_change_user_email = None
if notify_new_receiver_email:
notify_new_receiver_email.parent = just_submitted_status
just_submitted_status.items.append(notify_new_receiver_email)
if notify_new_user_email:
notify_new_user_email.parent = just_submitted_status
just_submitted_status.items.append(notify_new_user_email)
just_submitted_status.items.append(jump_to_new)
if notify_change_receiver_email:
accepted_status.items.append(notify_change_receiver_email)
notify_change_receiver_email.parent = accepted_status
notify_change_receiver_email = copy.copy(notify_change_receiver_email)
rejected_status.items.append(notify_change_receiver_email)
notify_change_receiver_email.parent = rejected_status
notify_change_receiver_email = copy.copy(notify_change_receiver_email)
finished_status.items.append(notify_change_receiver_email)
notify_change_receiver_email.parent = finished_status
if notify_change_user_email:
accepted_status.items.append(notify_change_user_email)
notify_change_user_email.parent = accepted_status
notify_change_user_email = copy.copy(notify_change_user_email)
rejected_status.items.append(notify_change_user_email)
notify_change_user_email.parent = rejected_status
notify_change_user_email = copy.copy(notify_change_user_email)
finished_status.items.append(notify_change_user_email)
notify_change_user_email.parent = finished_status
new_status.items.append(commentable)
commentable.parent = new_status
commentable = copy.copy(commentable)
accepted_status.items.append(commentable)
commentable.parent = accepted_status
accept = ChoiceWorkflowStatusItem()
accept.id = '_accept'
accept.label = force_text(_('Accept'))
accept.by = ['_receiver']
accept.status = accepted_status.id
accept.parent = new_status
new_status.items.append(accept)
reject = ChoiceWorkflowStatusItem()
reject.id = '_reject'
reject.label = force_text(_('Reject'))
reject.by = ['_receiver']
reject.status = rejected_status.id
reject.parent = new_status
new_status.items.append(reject)
finish = ChoiceWorkflowStatusItem()
finish.id = '_finish'
finish.label = force_text(_('Finish'))
finish.by = ['_receiver']
finish.status = finished_status.id
finish.parent = accepted_status
accepted_status.items.append(finish)
return workflow
def is_default(self):
return str(self.id).startswith('_')
def is_readonly(self):
return self.is_default() or super().is_readonly()
def formdefs(self, **kwargs):
order_by = kwargs.pop('order_by', 'name')
return list(FormDef.select(lambda x: x.workflow_id == self.id, order_by=order_by, **kwargs))
def carddefs(self, **kwargs):
order_by = kwargs.pop('order_by', 'name')
return list(CardDef.select(lambda x: x.workflow_id == self.id, order_by=order_by, **kwargs))
def mail_templates(self):
slugs = [x.mail_template for x in self.get_all_items() if x.key == 'sendmail' and x.mail_template]
criterias = [Contains('slug', slugs)]
return list(MailTemplate.select(criterias, order_by='name'))
class XmlSerialisable:
node_name = None
key = None
def export_to_xml(self, charset, include_id=False):
node = ET.Element(self.node_name)
if self.key:
node.attrib['type'] = self.key
if include_id and getattr(self, 'id', None):
node.attrib['id'] = self.id
for attribute in self.get_parameters():
if getattr(self, '%s_export_to_xml' % attribute, None):
getattr(self, '%s_export_to_xml' % attribute)(node, charset, include_id=include_id)
continue
if hasattr(self, attribute) and getattr(self, attribute) is not None:
el = ET.SubElement(node, attribute)
val = getattr(self, attribute)
if isinstance(val, dict):
for k, v in val.items():
ET.SubElement(el, k).text = force_text(v, charset, errors='replace')
elif isinstance(val, list):
if attribute[-1] == 's':
atname = attribute[:-1]
else:
atname = 'item'
for v in val:
ET.SubElement(el, atname).text = force_text(str(v), charset, errors='replace')
elif isinstance(val, str):
el.text = force_text(val, charset, errors='replace')
else:
el.text = str(val)
return node
def init_with_xml(self, elem, charset, include_id=False, snapshot=False, check_datasources=True):
if include_id and elem.attrib.get('id'):
self.id = elem.attrib.get('id')
for attribute in self.get_parameters():
el = elem.find(attribute)
if getattr(self, '%s_init_with_xml' % attribute, None):
getattr(self, '%s_init_with_xml' % attribute)(
el, charset, include_id=include_id, snapshot=snapshot
)
continue
if el is None:
continue
if list(el):
if isinstance(getattr(self, attribute), list):
v = [xml_node_text(x) or '' for x in el]
elif isinstance(getattr(self, attribute), dict):
v = {}
for e in el:
v[e.tag] = xml_node_text(e)
else:
# ???
raise AssertionError
setattr(self, attribute, v)
else:
if el.text is None:
setattr(self, attribute, None)
elif el.text in ('False', 'True') and not isinstance(getattr(self, attribute), str):
# booleans
setattr(self, attribute, el.text == 'True')
elif isinstance(getattr(self, attribute), int):
setattr(self, attribute, int(el.text))
else:
setattr(self, attribute, xml_node_text(el))
def _roles_export_to_xml(self, attribute, item, charset, include_id=False):
if not hasattr(self, attribute) or not getattr(self, attribute):
return
el = ET.SubElement(item, attribute)
for role_id in getattr(self, attribute):
if role_id is None:
continue
role_id = str(role_id)
role = get_role_name(role_id, charset)
sub = ET.SubElement(el, 'item')
sub.attrib['role_id'] = role_id
sub.text = role
def _roles_init_with_xml(self, attribute, elem, charset, include_id=False, snapshot=False):
if elem is None:
setattr(self, attribute, [])
else:
imported_roles = []
for child in elem:
imported_roles.append(self._get_role_id_from_xml(child, charset, include_id=include_id))
setattr(self, attribute, imported_roles)
def _role_export_to_xml(self, attribute, item, charset, include_id=False):
if not hasattr(self, attribute) or not getattr(self, attribute):
return
role_id = str(getattr(self, attribute))
role = get_role_name(role_id, charset)
sub = ET.SubElement(item, attribute)
if include_id:
sub.attrib['role_id'] = role_id
sub.text = role
def _get_role_id_from_xml(self, elem, charset, include_id=False):
if elem is None:
return None
value = xml_node_text(elem) or ''
# look for known static values
if value.startswith('_') or value == 'logged-users':
return value
# if we import using id, look at the role_id attribute
if include_id and 'role_id' in elem.attrib:
role_id = force_str(elem.attrib['role_id'])
if get_publisher().role_class.get(role_id, ignore_errors=True):
return role_id
if WorkflowStatusItem.get_expression(role_id)['type'] in ('python', 'template'):
return role_id
# if not using id, look up on the name
for role in get_publisher().role_class.select(ignore_errors=True):
if role.name == value:
return role.id
# if a computed value is possible and value looks like
# an expression, use it
if WorkflowStatusItem.get_expression(value)['type'] in ('python', 'template'):
return value
# if the roles are managed by the idp, don't try further.
if get_publisher() and get_cfg('sp', {}).get('idp-manage-roles') is True:
raise WorkflowImportError(_('Unknown referenced role (%s)'), (value,))
# and if there's no match, create a new role
role = get_publisher().role_class()
role.name = value
role.store()
return role.id
def _role_init_with_xml(self, attribute, elem, charset, include_id=False, snapshot=False):
setattr(self, attribute, self._get_role_id_from_xml(elem, charset, include_id=include_id))
class WorkflowGlobalActionTrigger(XmlSerialisable):
node_name = 'trigger'
def submit_admin_form(self, form):
for f in self.get_parameters():
widget = form.get_widget(f)
if widget:
value = widget.parse()
if hasattr(self, '%s_parse' % f):
value = getattr(self, '%s_parse' % f)(value)
setattr(self, f, value)
def get_subdirectories(self, formdata):
return []
class WorkflowGlobalActionManualTrigger(WorkflowGlobalActionTrigger):
key = 'manual'
roles = None
def get_parameters(self):
return ('roles',)
def render_as_line(self):
if self.roles:
return _('Manual by %s') % render_list_of_roles(self.parent.parent, self.roles)
else:
return _('Manual (not assigned)')
def form(self, workflow):
form = Form(enctype='multipart/form-data')
options = [(None, '---', None)]
options += workflow.get_list_of_roles(include_logged_in_users=False)
form.add(
WidgetList,
'roles',
title=_('Roles'),
element_type=SingleSelectWidget,
value=self.roles,
add_element_label=workflow.get_add_role_label(),
element_kwargs={'render_br': False, 'options': options},
)
return form
def roles_export_to_xml(self, item, charset, include_id=False):
self._roles_export_to_xml('roles', item, charset, include_id=include_id)
def roles_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self._roles_init_with_xml('roles', elem, charset, include_id=include_id, snapshot=snapshot)
class WorkflowGlobalActionTimeoutTriggerMarker(EvolutionPart):
def __init__(self, timeout_id):
self.timeout_id = timeout_id
class WorkflowGlobalActionTimeoutTrigger(WorkflowGlobalActionTrigger):
key = 'timeout'
anchor = None
anchor_expression = ''
anchor_template = ''
anchor_status_first = None
anchor_status_latest = None
timeout = None
def get_parameters(self):
return (
'anchor',
'anchor_expression',
'anchor_template',
'anchor_status_first',
'anchor_status_latest',
'timeout',
)
def get_anchor_labels(self):
return collections.OrderedDict(
[
('creation', _('Creation')),
('1st-arrival', _('First arrival in status')),
('latest-arrival', _('Latest arrival in status')),
('finalized', _('Arrival in final status')),
('template', _('String / Template')),
('python', _('Python expression')),
]
)
def properly_configured(self):
workflow = self.parent.parent
if not (self.anchor and self.timeout):
return False
if self.anchor == '1st-arrival' and self.anchor_status_first:
try:
workflow.get_status(self.anchor_status_first)
except KeyError:
return False
if self.anchor == 'latest-arrival' and self.anchor_status_latest:
try:
workflow.get_status(self.anchor_status_latest)
except KeyError:
return False
return True
def render_as_line(self):
if self.properly_configured():
return _('Automatic, %(timeout)s, relative to: %(anchor)s') % {
'anchor': self.get_anchor_labels().get(self.anchor).lower(),
'timeout': _('%s days') % self.timeout,
}
else:
return _('Automatic (not configured)')
def form(self, workflow):
form = Form(enctype='multipart/form-data')
options = list(self.get_anchor_labels().items())
form.add(
SingleSelectWidget,
'anchor',
title=_('Reference Date'),
options=options,
value=self.anchor,
required=True,
attrs={'data-dynamic-display-parent': 'true'},
)
form.add(
StringWidget,
'anchor_expression',
title=_('Python Expression to get reference date'),
size=80,
value=self.anchor_expression,
hint=_('This should produce a date; it will only apply to open forms.'),
attrs={
'data-dynamic-display-child-of': 'anchor',
'data-dynamic-display-value': _('Python expression'),
},
)
form.add(
StringWidget,
'anchor_template',
title=_('String / Template with reference date'),
size=80,
value=self.anchor_template,
hint=_('This should be a date; it will only apply to open forms.'),
attrs={
'data-dynamic-display-child-of': 'anchor',
'data-dynamic-display-value': _('String / Template'),
},
)
possible_status = [(None, _('Current Status'), None)]
possible_status.extend([('wf-%s' % x.id, x.name, x.id) for x in workflow.possible_status])
form.add(
SingleSelectWidget,
'anchor_status_first',
title=_('Status'),
options=possible_status,
value=self.anchor_status_first,
attrs={
'data-dynamic-display-child-of': 'anchor',
'data-dynamic-display-value': _('First arrival in status'),
},
)
form.add(
SingleSelectWidget,
'anchor_status_latest',
title=_('Status'),
options=possible_status,
value=self.anchor_status_latest,
attrs={
'data-dynamic-display-child-of': 'anchor',
'data-dynamic-display-value': _('Latest arrival in status'),
},
)
form.add(
ValidatedStringWidget,
'timeout',
title=_('Delay (in days)'),
value=self.timeout,
regex=r'^-?\d+$',
required=True,
hint=_(
'''
Number of days relative to the reference date. If the
reference date is computed from an expression, a negative
delay is accepted to trigger the action before the
date.'''
),
)
return form
def must_trigger(self, formdata, endpoint_status_ids):
if formdata.status in endpoint_status_ids:
if not (
(self.anchor == '1st-arrival' and self.anchor_status_first in endpoint_status_ids)
or (self.anchor == 'latest-arrival' and self.anchor_status_latest in endpoint_status_ids)
or (self.anchor == 'finalized')
):
# don't trigger on finalized formdata (unless explicit anchor point)
return False
anchor_date = None
if self.anchor == 'creation':
anchor_date = formdata.receipt_time
elif self.anchor == '1st-arrival':
anchor_status = self.anchor_status_first or formdata.status
for evolution in formdata.evolution:
if evolution.status == anchor_status:
anchor_date = evolution.last_jump_datetime or evolution.time
break
elif self.anchor == 'latest-arrival':
anchor_status = self.anchor_status_latest or formdata.status
latest_no_status_evolution = None
for evolution in reversed(formdata.evolution):
if evolution.status == anchor_status:
if latest_no_status_evolution:
evolution = latest_no_status_evolution
anchor_date = evolution.last_jump_datetime or evolution.time
break
if evolution.status:
latest_no_status_evolution = None
elif latest_no_status_evolution is None:
latest_no_status_evolution = evolution
elif self.anchor == 'finalized':
if formdata.status in endpoint_status_ids:
for evolution in reversed(formdata.evolution):
if not evolution.status:
continue
if evolution.status in endpoint_status_ids:
anchor_date = evolution.time
else:
break
elif self.anchor == 'template' and self.anchor_template:
variables = get_publisher().substitutions.get_context_variables(mode='lazy')
anchor_date = Template(self.anchor_template, autoescape=False).render(variables)
elif self.anchor == 'python':
variables = get_publisher().substitutions.get_context_variables()
try:
# noqa pylint: disable=eval-used
anchor_date = eval(self.anchor_expression, get_publisher().get_global_eval_dict(), variables)
except Exception as e:
# get the variables in the locals() namespace so they are
# displayed within the trace.
expression = self.anchor_expression # noqa pylint: disable=unused-variable
# noqa pylint: disable=unused-variable
global_variables = get_publisher().get_global_eval_dict()
get_publisher().record_error(exception=e, context='[TIMEOUTS]', notify=True)
# convert anchor_date to datetime.datetime()
if isinstance(anchor_date, datetime.datetime):
pass
elif isinstance(anchor_date, datetime.date):
anchor_date = datetime.datetime(
year=anchor_date.year, month=anchor_date.month, day=anchor_date.day
)
elif isinstance(anchor_date, time.struct_time):
anchor_date = datetime.datetime.fromtimestamp(time.mktime(anchor_date))
elif isinstance(anchor_date, str) and anchor_date:
try:
anchor_date = get_as_datetime(anchor_date)
except ValueError as e:
get_publisher().record_error(exception=e, context='[TIMEOUTS]', notify=True)
anchor_date = None
elif anchor_date:
# timestamp
try:
anchor_date = datetime.datetime.fromtimestamp(anchor_date)
except TypeError as e:
get_publisher().record_error(exception=e, context='[TIMEOUTS]', notify=True)
anchor_date = None
if not anchor_date:
return False
anchor_date = anchor_date + datetime.timedelta(days=int(self.timeout))
return bool(datetime.datetime.now() > anchor_date)
@classmethod
def apply(cls, workflow):
triggers = []
for action in workflow.global_actions or []:
triggers.extend(
[
(action, x)
for x in action.triggers or []
if isinstance(x, WorkflowGlobalActionTimeoutTrigger) and x.properly_configured()
]
)
if not triggers:
return
not_endpoint_status = workflow.get_not_endpoint_status()
not_endpoint_status_ids = ['wf-%s' % x.id for x in not_endpoint_status]
endpoint_status = workflow.get_endpoint_status()
endpoint_status_ids = ['wf-%s' % x.id for x in endpoint_status]
# check if triggers are defined relative to terminal status
run_on_finalized = False
for action, trigger in triggers:
if trigger.anchor == 'finalized':
run_on_finalized = True
elif (
trigger.anchor == 'creation'
and workflow.possible_status
and workflow.possible_status[0] in endpoint_status
):
run_on_finalized = True
elif (
trigger.anchor == '1st-arrival'
and trigger.anchor_status_first
and workflow.get_status(trigger.anchor_status_first) in endpoint_status
):
run_on_finalized = True
elif (
trigger.anchor == 'latest-arrival'
and trigger.anchor_status_latest
and workflow.get_status(trigger.anchor_status_latest) in endpoint_status
):
run_on_finalized = True
criterias = [NotEqual('status', 'draft'), Null('anonymised')]
if not run_on_finalized:
# limit to formdata that are not finalized
criterias.append(Contains('status', not_endpoint_status_ids))
for formdef in workflow.formdefs():
data_class = formdef.data_class()
for formdata in data_class.select(criterias, iterator=True):
get_publisher().substitutions.reset()
get_publisher().substitutions.feed(get_publisher())
get_publisher().substitutions.feed(formdef)
get_publisher().substitutions.feed(formdata)
seen_triggers = []
for part in formdata.iter_evolution_parts():
if not isinstance(part, WorkflowGlobalActionTimeoutTriggerMarker):
continue
seen_triggers.append(part.timeout_id)
for action, trigger in triggers:
if trigger.id in seen_triggers:
continue # already triggered
if trigger.must_trigger(formdata, endpoint_status_ids):
if not formdata.evolution:
continue
formdata.evolution[-1].add_part(WorkflowGlobalActionTimeoutTriggerMarker(trigger.id))
formdata.store()
perform_items(
action.items,
formdata,
event=('global-action-timeout', (action.id, trigger.id)),
)
break
class WorkflowGlobalActionWebserviceTrigger(WorkflowGlobalActionManualTrigger):
key = 'webservice'
identifier = None
roles = None
def get_parameters(self):
return ('identifier', 'roles')
def render_as_line(self):
if self.identifier:
return _('External call (%s)') % self.identifier
else:
return _('External call (not configured)')
def form(self, workflow):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'identifier', title=_('Identifier'), required=True, value=self.identifier)
options = [(None, '---', None)]
options += workflow.get_list_of_roles(include_logged_in_users=True)
form.add(
WidgetList,
'roles',
title=_('Roles'),
element_type=SingleSelectWidget,
value=self.roles,
add_element_label=workflow.get_add_role_label(),
element_kwargs={'render_br': False, 'options': options},
)
return form
def get_subdirectories(self, formdata):
from wcs.forms.workflows import WorkflowGlobalActionWebserviceHooksDirectory
return [('hooks', WorkflowGlobalActionWebserviceHooksDirectory(formdata))]
class WorkflowGlobalAction:
id = None
name = None
items = None
triggers = None
backoffice_info_text = None
def __init__(self, name=None):
self.name = name
self.items = []
def append_item(self, type):
for klass in item_classes:
if klass.key == type:
o = klass()
if self.items:
o.id = str(max(lax_int(x.id) for x in self.items) + 1)
else:
o.id = '1'
self.items.append(o)
return o
raise KeyError()
def append_trigger(self, type):
trigger_types = {
'manual': WorkflowGlobalActionManualTrigger,
'timeout': WorkflowGlobalActionTimeoutTrigger,
'webservice': WorkflowGlobalActionWebserviceTrigger,
}
o = trigger_types.get(type)()
if not self.triggers:
self.triggers = []
o.id = str(uuid.uuid4())
self.triggers.append(o)
return o
def export_to_xml(self, charset, include_id=False):
status = ET.Element('action')
ET.SubElement(status, 'id').text = force_text(self.id, charset)
ET.SubElement(status, 'name').text = force_text(self.name, charset)
if self.backoffice_info_text:
ET.SubElement(status, 'backoffice_info_text').text = force_text(
self.backoffice_info_text, charset
)
items = ET.SubElement(status, 'items')
for item in self.items:
items.append(item.export_to_xml(charset=charset, include_id=include_id))
triggers = ET.SubElement(status, 'triggers')
for trigger in self.triggers or []:
triggers.append(trigger.export_to_xml(charset=charset, include_id=include_id))
return status
def init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self.id = xml_node_text(elem.find('id'))
self.name = xml_node_text(elem.find('name'))
if elem.find('backoffice_info_text') is not None:
self.backoffice_info_text = xml_node_text(elem.find('backoffice_info_text'))
self.items = []
for item in elem.find('items'):
item_type = item.attrib['type']
self.append_item(item_type)
item_o = self.items[-1]
item_o.parent = self
item_o.init_with_xml(item, charset, include_id=include_id, snapshot=snapshot)
self.triggers = []
for trigger in elem.find('triggers'):
trigger_type = trigger.attrib['type']
self.append_trigger(trigger_type)
trigger_o = self.triggers[-1]
trigger_o.parent = self
trigger_o.init_with_xml(trigger, charset, include_id=include_id, snapshot=snapshot)
class WorkflowCriticalityLevel:
id = None
name = None
colour = None
def __init__(self, name=None, colour=None):
self.name = name
self.colour = colour
self.id = str(random.randint(0, 100000))
def export_to_xml(self, charset, include_id=False):
level = ET.Element('criticality-level')
ET.SubElement(level, 'id').text = force_text(self.id, charset) if self.id else ''
ET.SubElement(level, 'name').text = force_text(self.name, charset)
if self.colour:
ET.SubElement(level, 'colour').text = force_text(self.colour, charset)
return level
def init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self.id = xml_node_text(elem.find('id'))
self.name = xml_node_text(elem.find('name'))
if elem.find('colour') is not None:
self.colour = xml_node_text(elem.find('colour'))
class WorkflowStatus:
id = None
name = None
items = None
visibility = None
forced_endpoint = False
colour = 'FFFFFF'
backoffice_info_text = None
extra_css_class = ''
def __init__(self, name=None):
self.name = name
self.items = []
def __eq__(self, other):
if other is None:
return False
# this assumes both status are from the same workflow
if isinstance(other, str):
other_id = other
else:
other_id = other.id
return self.id == other_id
def migrate(self):
changed = False
for item in self.items:
changed |= item.migrate()
return changed
def append_item(self, type):
for klass in item_classes:
if klass.key == type:
o = klass()
if self.items:
o.id = str(max(lax_int(x.id) for x in self.items) + 1)
else:
o.id = '1'
self.items.append(o)
break
else:
raise KeyError()
def get_item(self, id):
for item in self.items:
if item.id == id:
return item
raise KeyError()
def get_action_form(self, filled, user, displayed_fields=None):
form = Form(enctype='multipart/form-data', use_tokens=False)
form.attrs['id'] = 'wf-actions'
for item in self.items:
if not item.check_auth(filled, user):
continue
if not item.check_condition(filled):
continue
item.fill_form(form, filled, user, displayed_fields=displayed_fields)
for action in filled.formdef.workflow.get_global_actions_for_user(filled, user):
form.add_submit('button-action-%s' % action.id, action.name)
widget = form.get_widget('button-action-%s' % action.id)
if widget:
widget.backoffice_info_text = action.backoffice_info_text
widget.ignore_form_errors = True
widget.attrs['formnovalidate'] = 'formnovalidate'
if form.widgets or form.submit_widgets:
return form
else:
return None
def get_active_items(self, form, filled, user):
for item in self.items:
if hasattr(item, 'by'):
for role in item.by or []:
if role == logged_users_role().id:
break
if role == '_submitter':
if filled.is_submitter(user):
break
continue
if user is None:
continue
if filled.get_function_roles(role).intersection(user.get_roles()):
break
else:
continue
if not item.check_condition(filled):
continue
yield item
def get_admin_url(self):
return self.parent.get_admin_url() + 'status/%s/' % self.id
def evaluate_live_form(self, form, filled, user):
for item in self.get_active_items(form, filled, user):
item.evaluate_live_form(form, filled, user)
def handle_form(self, form, filled, user):
# check for global actions
for action in filled.formdef.workflow.get_global_actions_for_user(filled, user):
if 'button-action-%s' % action.id in get_request().form:
url = perform_items(action.items, filled, event=('global-action-button', action.id))
if url:
return url
return
evo = Evolution()
evo.time = time.localtime()
if user:
if filled.is_submitter(user):
evo.who = '_submitter'
else:
evo.who = user.id
if not filled.evolution:
filled.evolution = []
for item in self.get_active_items(form, filled, user):
next_url = item.submit_form(form, filled, user, evo)
if next_url is True:
break
if next_url:
if not form.has_errors():
if evo.parts or evo.status or evo.comment or evo.status:
# add evolution entry only if there's some content
# within, i.e. do not register anything in the case of
# a single edit action (where the evolution should be
# appended only after successful edit).
filled.evolution.append(evo)
if evo.status:
filled.status = evo.status
filled.store()
return next_url
if form.has_errors():
return
filled.evolution.append(evo)
if evo.status:
filled.status = evo.status
filled.store()
url = filled.perform_workflow(event='workflow-form-submit')
if url:
return url
def get_subdirectories(self, formdata):
subdirectories = []
for item in self.items:
if item.directory_name:
subdirectories.append((item.directory_name, item.directory_class(formdata, item, self)))
return subdirectories
def get_visibility_restricted_roles(self):
if not self.visibility: # no restriction -> visible
return []
return self.visibility
def is_visible(self, formdata, user):
if not self.visibility: # no restriction -> visible
return True
if get_request() and get_request().is_in_frontoffice():
# always hide in front
return False
if user and user.is_admin:
return True
if user:
user_roles = set(user.get_roles())
user_roles.add(logged_users_role().id)
else:
user_roles = set()
visibility_roles = self.visibility[:]
for item in self.items or []:
if not hasattr(item, 'by') or not item.by:
continue
visibility_roles.extend(item.by)
for role in visibility_roles:
if role != '_submitter':
if formdata.get_function_roles(role).intersection(user_roles):
return True
return False
def is_endpoint(self):
# an endpoint status is a status that marks the end of the workflow; it
# can either be computed automatically (if there's no way out of the
# status) or be set manually (to mark the expected end while still
# allowing to go back and re-enter the workflow).
if self.forced_endpoint:
return True
endpoint = True
for item in self.items:
endpoint = endpoint and item.endpoint
if endpoint is False:
break
return endpoint
def is_waitpoint(self):
# a waitpoint status is a status waiting for an event (be it user
# interaction or something else), but can also be an endpoint (where
# the user would wait, infinitely).
waitpoint = False
endpoint = True
if self.forced_endpoint:
endpoint = True
else:
for item in self.items:
endpoint = item.endpoint and endpoint
waitpoint = item.waitpoint or waitpoint
return bool(endpoint or waitpoint)
def get_status_manual_actions(self):
actions = []
status_id = self.id
class StatusAction:
def __init__(self, action):
self.id = 'st-%s' % action.identifier
self.status_id = status_id
self.action_id = action.identifier
self.name = action.get_label()
self.status_action = True
self.require_confirmation = action.require_confirmation
self.action = action
for action in self.items or []:
if not isinstance(action, ChoiceWorkflowStatusItem):
continue
if not action.identifier:
continue
roles = action.by or []
functions = [x for x in roles if x in (self.parent.roles or [])]
roles = [x for x in roles if x not in (self.parent.roles or [])]
if functions or roles:
actions.append({'action': StatusAction(action), 'roles': roles, 'functions': functions})
return actions
def get_contrast_color(self):
colour = self.colour or 'ffffff'
return misc.get_foreground_colour(colour)
def __getstate__(self):
odict = self.__dict__.copy()
if 'parent' in odict:
del odict['parent']
return odict
def export_to_xml(self, charset, include_id=False):
status = ET.Element('status')
ET.SubElement(status, 'id').text = force_text(self.id, charset)
ET.SubElement(status, 'name').text = force_text(self.name, charset)
ET.SubElement(status, 'colour').text = force_text(self.colour, charset)
if self.extra_css_class:
ET.SubElement(status, 'extra_css_class').text = force_text(self.extra_css_class, charset)
if self.forced_endpoint:
ET.SubElement(status, 'forced_endpoint').text = 'true'
if self.backoffice_info_text:
ET.SubElement(status, 'backoffice_info_text').text = force_text(
self.backoffice_info_text, charset
)
visibility_node = ET.SubElement(status, 'visibility')
for role in self.visibility or []:
ET.SubElement(visibility_node, 'role').text = str(role)
items = ET.SubElement(status, 'items')
for item in self.items:
items.append(item.export_to_xml(charset=charset, include_id=include_id))
return status
def init_with_xml(self, elem, charset, include_id=False, snapshot=False, check_datasources=True):
self.id = xml_node_text(elem.find('id'))
self.name = xml_node_text(elem.find('name'))
if elem.find('colour') is not None:
self.colour = xml_node_text(elem.find('colour'))
if elem.find('extra_css_class') is not None:
self.extra_css_class = xml_node_text(elem.find('extra_css_class'))
if elem.find('forced_endpoint') is not None:
self.forced_endpoint = elem.find('forced_endpoint').text == 'true'
if elem.find('backoffice_info_text') is not None:
self.backoffice_info_text = xml_node_text(elem.find('backoffice_info_text'))
self.visibility = []
for visibility_role in elem.findall('visibility/role'):
self.visibility.append(visibility_role.text)
self.items = []
for item in elem.find('items'):
item_type = item.attrib['type']
self.append_item(item_type)
item_o = self.items[-1]
item_o.parent = self
item_o.init_with_xml(
item, charset, include_id=include_id, snapshot=snapshot, check_datasources=check_datasources
)
def __repr__(self):
return '<%s %s %r>' % (self.__class__.__name__, self.id, self.name)
def noop_mark(func):
# mark method as not executing anything
func.noop = True
return func
class WorkflowStatusItem(XmlSerialisable):
node_name = 'item'
description = 'XX'
category = None # (key, label)
id = None
condition = None
endpoint = True # means it's not possible to interact, and/or cause a status change
waitpoint = False # means it's possible to wait (user interaction, or other event)
ok_in_global_action = True # means it can be used in a global action
directory_name = None
directory_class = None
support_substitution_variables = False
@classmethod
def init(cls):
pass
@classmethod
def is_available(cls, workflow=None):
return True
def migrate(self):
changed = False
for roles_attribute in ('to', 'by'):
attribute_value = getattr(self, roles_attribute, []) or []
if any(x for x in attribute_value if isinstance(x, int)):
setattr(self, roles_attribute, [str(x) for x in attribute_value])
changed = True
return changed
def render_as_line(self):
label = self.description
details = self.get_line_details()
if details:
label += ' (%s)' % details
if self.condition and self.condition.get('value'):
label += ' (%s)' % _('conditional')
return label
def get_line_details(self):
return ''
def render_list_of_roles(self, roles):
return self.parent.parent.render_list_of_roles(roles)
def get_list_of_roles(self, include_logged_in_users=True):
return self.parent.parent.get_list_of_roles(include_logged_in_users=include_logged_in_users)
def get_add_role_label(self):
return self.parent.parent.get_add_role_label()
@noop_mark
def perform(self, formdata):
pass
def fill_form(self, form, formdata, user, **kwargs):
pass
def evaluate_live_form(self, form, formdata, user):
pass
def submit_form(self, form, formdata, user, evo):
pass
def check_auth(self, formdata, user):
if not hasattr(self, 'by'):
return True
for role in self.by or []:
if user and role == logged_users_role().id:
return True
if role == '_submitter':
t = formdata.is_submitter(user)
if t is True:
return True
continue
if not user:
continue
if formdata.get_function_roles(role).intersection(user.get_roles()):
return True
return False
def check_condition(self, formdata):
context = {'formdata': formdata, 'status_item': self}
try:
return Condition(self.condition, context).evaluate()
except RuntimeError:
return False
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
if 'condition' in parameters:
form.add(
ConditionWidget,
'%scondition' % prefix,
title=_('Condition of execution of the action'),
value=self.condition,
size=40,
advanced=not (self.condition),
)
if 'attachments' in parameters:
attachments_options, attachments = self.get_attachments_options()
if len(attachments_options) > 1:
form.add(
WidgetList,
'%sattachments' % prefix,
title=_('Attachments'),
element_type=SingleSelectWidgetWithOther,
value=attachments,
add_element_label=_('Add attachment'),
element_kwargs={'render_br': False, 'options': attachments_options},
)
else:
form.add(
WidgetList,
'%sattachments' % prefix,
title=_('Attachments (templates or Python expressions)'),
element_type=StringWidget,
value=attachments,
add_element_label=_('Add attachment'),
element_kwargs={'render_br': False, 'size': 50},
advanced=not (bool(attachments)),
)
def get_parameters(self):
return ('condition',)
def get_parameters_view(self):
r = TemplateIO(html=True)
form = Form()
parameters = [x for x in self.get_parameters() if getattr(self, x, None) is not None]
for parameter in parameters:
self.add_parameters_widgets(form, [parameter])
r += htmltext('<ul>')
for parameter in parameters:
widget = form.get_widget(parameter)
if not widget:
continue
r += htmltext('<li>')
r += htmltext('<span class="parameter">%s</span> ') % _('%s:') % widget.get_title()
r += self.get_parameter_view_value(widget, parameter)
r += htmltext('</li>')
r += htmltext('</ul>')
return r.getvalue()
def get_backoffice_info_text_parameter_view_value(self):
return htmltext('<pre>%s</pre>') % self.backoffice_info_text
def get_by_parameter_view_value(self):
return self.render_list_of_roles(self.by)
def get_timeout_parameter_view_value(self):
try:
return seconds2humanduration(int(self.timeout or 0))
except ValueError:
return self.timeout # probably an expression
def get_status_parameter_view_value(self):
for status in self.parent.parent.possible_status:
if status.id == self.status:
return htmltext('<a href="#status-%s">%s</a>') % (status.id, status.name)
return _('Unknown (%s)') % self.status
def get_parameter_view_value(self, widget, parameter):
if hasattr(self, 'get_%s_parameter_view_value' % parameter):
return getattr(self, 'get_%s_parameter_view_value' % parameter)()
value = getattr(self, parameter)
if isinstance(value, bool):
return _('Yes') if value else _('No')
elif hasattr(widget, 'options') and value:
for option in widget.options:
if isinstance(option, tuple):
if option[0] == value:
return option[1]
else:
if option == value:
return option
return '-'
else:
return str(value)
def fill_admin_form(self, form):
for parameter in self.get_parameters():
self.add_parameters_widgets(form, [parameter])
def submit_admin_form(self, form):
for f in self.get_parameters():
widget = form.get_widget(f)
if widget:
if hasattr(self, 'clean_%s' % f):
has_error = getattr(self, 'clean_%s' % f)(form)
if has_error:
continue
value = widget.parse()
if hasattr(self, '%s_parse' % f):
value = getattr(self, '%s_parse' % f)(value)
setattr(self, f, value)
@classmethod
def get_expression(cls, var):
if not var:
expression_type = 'text'
expression_value = ''
elif var.startswith('='):
expression_type = 'python'
expression_value = var[1:]
elif '{{' in var or '{%' in var or '[' in var:
expression_type = 'template'
expression_value = var
else:
expression_type = 'text'
expression_value = var
return {'type': expression_type, 'value': expression_value}
@classmethod
def compute(
cls,
var,
render=True,
raises=False,
record_errors=True,
allow_complex=False,
context=None,
formdata=None,
status_item=None,
):
if not isinstance(var, str):
return var
expression = cls.get_expression(var)
if expression['type'] != 'python' and not render:
return var
if expression['type'] == 'text':
return expression['value']
vars = get_publisher().substitutions.get_context_variables(
'lazy' if expression['type'] == 'template' else None
)
vars.update(context or {})
def log_exception(exception):
if expression['type'] == 'template':
summary = _('Failed to compute template')
else:
summary = _('Failed to compute Python expression')
get_publisher().record_error(
summary,
formdata=formdata,
status_item=status_item,
expression=expression['value'],
expression_type=expression['type'],
exception=exception,
)
if expression['type'] == 'template':
vars['allow_complex'] = allow_complex and get_publisher().has_site_option('complex-data')
try:
return Template(expression['value'], raises=raises, autoescape=False).render(vars)
except TemplateError as e:
if record_errors:
log_exception(e)
if raises:
raise
return var
try:
# noqa pylint: disable=eval-used
return eval(expression['value'], get_publisher().get_global_eval_dict(), vars)
except Exception as e:
if record_errors:
log_exception(e)
if raises:
raise
return var
def get_computed_role_id(self, role_id):
new_role_id = self.compute(str(role_id))
if get_publisher().role_class.get(new_role_id, ignore_errors=True):
return new_role_id
# computed value, not an id, try to get role by slug
new_role = get_publisher().role_class.get_on_index(new_role_id, 'slug', ignore_errors=True)
if new_role:
return new_role.id
# fallback to role label
for role in get_publisher().role_class.select():
if role.name == new_role_id:
return role.id
return None
def get_substitution_variables(self, formdata):
return {}
def get_target_status_url(self):
if not getattr(self, 'status', None) or self.status == '_previous':
return None
targets = [x for x in self.parent.parent.possible_status if x.id == self.status]
if not targets:
return None
return targets[0].get_admin_url()
def get_target_status(self, formdata=None):
"""Returns a list of status this item can lead to."""
if not getattr(self, 'status', None):
return []
if self.status == '_previous':
if formdata is None:
# must be in a formdata to compute destination, just give a
# fake status for presentation purpose
return [WorkflowStatus(_('Previously Marked Status'))]
previous_status = formdata.pop_previous_marked_status()
if previous_status:
return [previous_status]
return []
targets = [x for x in self.parent.parent.possible_status if x.id == self.status]
if not targets and formdata: # do not log in presentation context: formdata is needed
message = _(
'reference to invalid status %(target)s in status %(status)s, ' 'action %(status_item)s'
) % {'target': self.status, 'status': self.parent.name, 'status_item': self.description}
get_publisher().record_error(message, formdata=formdata, status_item=self)
return targets
def get_jump_label(self, target_id):
'''Return the label to use on a workflow graph arrow'''
if getattr(self, 'label', None):
label = self.label
if getattr(self, 'by', None):
roles = self.parent.parent.render_list_of_roles(self.by)
label += ' %s %s' % (_('by'), roles)
if getattr(self, 'status', None) == '_previous':
label += ' ' + str(_('(to last marker)'))
if getattr(self, 'set_marker_on_status', False):
label += ' ' + str(_('(and set marker)'))
if getattr(self, 'condition', None):
label += ' ' + str(_('(conditional)'))
else:
label = self.render_as_line()
return label
def get_backoffice_filefield_options(self):
options = []
for field in self.parent.parent.get_backoffice_fields():
if field.key == 'file':
options.append((field.id, field.label, field.id))
return options
def store_in_backoffice_filefield(
self, formdata, backoffice_filefield_id, filename, content_type, content
):
filefield = [
x
for x in self.parent.parent.get_backoffice_fields()
if x.id == backoffice_filefield_id and x.key == 'file'
]
if filefield:
upload = PicklableUpload(filename, content_type)
upload.receive([content])
formdata.data[backoffice_filefield_id] = upload
formdata.store()
def by_export_to_xml(self, item, charset, include_id=False):
self._roles_export_to_xml('by', item, charset, include_id=include_id)
def by_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self._roles_init_with_xml('by', elem, charset, include_id=include_id, snapshot=snapshot)
def to_export_to_xml(self, item, charset, include_id=False):
self._roles_export_to_xml('to', item, charset, include_id=include_id)
def to_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
self._roles_init_with_xml('to', elem, charset, include_id, snapshot=snapshot)
def condition_init_with_xml(self, node, charset, include_id=False, snapshot=False):
self.condition = None
if node is None:
return
if node.findall('type'):
self.condition = {
'type': xml_node_text(node.find('type')),
'value': xml_node_text(node.find('value')),
}
elif node.text:
# backward compatibility
self.condition = {'type': 'python', 'value': xml_node_text(node)}
def q_admin_lookup(self, workflow, status, component, html_top):
return None
def __getstate__(self):
odict = self.__dict__.copy()
if 'parent' in odict:
del odict['parent']
return odict
def mail_template_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
if elem is None:
self.mail_template = None
return
value = xml_node_text(elem)
mail_template = MailTemplate.get_by_slug(value)
if not mail_template:
raise WorkflowImportError(_('Unknown referenced mail template (%s)'), (value,))
self.mail_template = value
return
def attachments_init_with_xml(self, elem, charset, include_id=False, snapshot=False):
if elem is None:
self.attachments = None
else:
self.attachments = [xml_node_text(item) for item in elem.findall('attachment')]
def get_attachments_options(self):
attachments_options = [(None, '---', None)]
varnameless = []
for field in self.parent.parent.get_backoffice_fields():
if field.key != 'file':
continue
if field.varname:
codename = 'form_var_%s_raw' % field.varname
else:
codename = 'form_f%s' % field.id.replace('-', '_') # = form_fbo<...>
varnameless.append(codename)
attachments_options.append((codename, field.label, codename))
# filter: do not consider removed fields without varname
attachments = [
attachment
for attachment in self.attachments or []
if ((not attachment.startswith('form_fbo')) or (attachment in varnameless))
]
return attachments_options, attachments
def convert_attachments_to_uploads(self, extra_attachments=None):
uploads = []
attachments = []
attachments.extend(self.attachments or [])
attachments.extend(extra_attachments or [])
# 1. attachments defined as templates
with get_publisher().complex_data():
for attachment in attachments[:]:
if '{%' not in attachment and '{{' not in attachment:
continue
attachments.remove(attachment)
try:
attachment = WorkflowStatusItem.compute(attachment, allow_complex=True, raises=True)
except Exception as e:
get_publisher().record_error(exception=e, context='[workflow/attachments]', notify=True)
else:
if attachment:
complex_value = get_publisher().get_cached_complex_data(attachment)
if complex_value:
uploads.append(complex_value)
# 2. python expressions
if attachments:
global_eval_dict = get_publisher().get_global_eval_dict()
local_eval_dict = get_publisher().substitutions.get_context_variables()
for attachment in attachments:
if attachment.startswith('form_fbo') and '-' in attachment:
# detect varname-less backoffice fields that were set
# before #33366 was fixed, and fix them.
attachment = attachment.replace('-', '_')
try:
# execute any Python expression
# and magically convert string like 'form_var_*_raw' to a PicklableUpload
# noqa pylint: disable=eval-used
picklableupload = eval(attachment, global_eval_dict, local_eval_dict)
except Exception as e:
get_publisher().record_error(exception=e, context='[workflow/attachments]', notify=True)
continue
if not picklableupload:
continue
uploads.append(picklableupload)
# 3. convert any value to a PicklableUpload; this allows for
# dicts like those provided by qommon/evalutils:attachment()
for upload in uploads:
if not isinstance(upload, PicklableUpload):
try:
upload = FileField.convert_value_from_anything(upload)
except ValueError as e:
get_publisher().record_error(exception=e, context='[workflow/attachments]', notify=True)
continue
yield upload
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, self.id)
class WorkflowStatusJumpItem(WorkflowStatusItem):
status = None
endpoint = False
set_marker_on_status = False
category = 'status-change'
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'status' in parameters:
destinations = [(x.id, x.name) for x in self.parent.parent.possible_status]
# look for existing jumps that are dropping a mark
workflow = self.parent.parent
statuses = getattr(workflow, 'possible_status') or []
global_actions = getattr(workflow, 'global_actions') or []
for status in statuses + global_actions:
for item in status.items:
if getattr(item, 'set_marker_on_status', False):
destinations.append(('_previous', _('Previously Marked Status')))
break
else:
continue
break
form.add(
SingleSelectWidget,
'%sstatus' % prefix,
title=_('Status'),
value=self.status,
options=[(None, '---')] + destinations,
)
if 'set_marker_on_status' in parameters:
form.add(
CheckboxWidget,
'%sset_marker_on_status' % prefix,
title=_('Set marker to jump back to current status'),
value=self.set_marker_on_status,
advanced=not (self.set_marker_on_status),
)
def handle_markers_stack(self, formdata):
if self.set_marker_on_status:
if formdata.workflow_data and '_markers_stack' in formdata.workflow_data:
markers_stack = formdata.workflow_data.get('_markers_stack')
else:
markers_stack = []
markers_stack.append({'status_id': formdata.status[3:]})
formdata.update_workflow_data({'_markers_stack': markers_stack})
def get_parameters(self):
return ('status', 'set_marker_on_status', 'condition')
def get_role_translation_label(workflow, role_id):
if role_id == logged_users_role().id:
return logged_users_role().name
if role_id == '_submitter':
return C_('role|User')
if str(role_id).startswith('_'):
return workflow.roles.get(role_id)
else:
try:
return get_publisher().role_class.get(role_id).name
except KeyError:
return
def get_role_name(role_id, charset=None):
role_id = str(role_id)
if role_id.startswith('_') or role_id == 'logged-users':
return force_text(role_id, charset)
try:
return force_text(get_publisher().role_class.get(role_id).name, charset)
except KeyError:
return force_text(role_id, charset)
def render_list_of_roles(workflow, roles):
t = []
for r in roles:
role_label = get_role_translation_label(workflow, r)
if role_label:
t.append(role_label)
return ', '.join([str(x) for x in t])
item_classes = []
def register_item_class(klass):
if klass.key not in [x.key for x in item_classes]:
item_classes.append(klass)
klass.init()
class CommentableWorkflowStatusItem(WorkflowStatusItem):
description = _('Comment')
key = 'commentable'
category = 'interaction'
endpoint = False
waitpoint = True
ok_in_global_action = False
required = False
varname = None
label = None
button_label = 0 # hack to handle legacy commentable items
hint = None
by = []
backoffice_info_text = None
def get_line_details(self):
if self.by:
return _('by %s') % self.render_list_of_roles(self.by)
else:
return _('not completed')
def fill_form(self, form, formdata, user, **kwargs):
if 'comment' not in [x.name for x in form.widgets]:
if self.label is None:
title = _('Comment')
else:
title = self.label
form.add(
TextWidget, 'comment', title=title, required=self.required, cols=40, rows=10, hint=self.hint
)
form.get_widget('comment').attrs['class'] = 'comment'
if self.button_label == 0:
form.add_submit('button%s' % self.id, _('Add Comment'))
elif self.button_label:
form.add_submit('button%s' % self.id, self.button_label)
if form.get_widget('button%s' % self.id):
form.get_widget('button%s' % self.id).backoffice_info_text = self.backoffice_info_text
def submit_form(self, form, formdata, user, evo):
if form.get_widget('comment'):
evo.comment = form.get_widget('comment').parse()
if self.varname:
workflow_data = {'comment_%s' % self.varname: evo.comment}
formdata.update_workflow_data(workflow_data)
def submit_admin_form(self, form):
for f in self.get_parameters():
widget = form.get_widget(f)
setattr(self, f, widget.parse())
def fill_admin_form(self, form):
if self.by and not isinstance(self.by, list):
self.by = None
return super().fill_admin_form(form)
def get_parameters(self):
return (
'label',
'button_label',
'hint',
'by',
'varname',
'backoffice_info_text',
'required',
'condition',
)
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'label' in parameters:
if self.label is None:
self.label = str(_('Comment'))
form.add(StringWidget, '%slabel' % prefix, size=40, title=_('Label'), value=self.label)
if 'button_label' in parameters:
if self.button_label == 0:
self.button_label = str(_('Add Comment'))
form.add(
StringWidget,
'%sbutton_label' % prefix,
title=_('Button Label'),
hint=_('(empty to disable the button)'),
value=self.button_label,
)
if 'hint' in parameters:
form.add(StringWidget, '%shint' % prefix, size=40, title=_('Hint'), value=self.hint)
if 'by' in parameters:
form.add(
WidgetList,
'%sby' % prefix,
title=_('By'),
element_type=SingleSelectWidget,
value=self.by,
add_element_label=self.get_add_role_label(),
element_kwargs={
'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles(),
},
)
if 'varname' in parameters:
form.add(
VarnameWidget,
'%svarname' % prefix,
title=_('Identifier'),
value=self.varname,
hint=_('This will make the comment available in a variable named comment_ + identifier.'),
)
if 'required' in parameters:
form.add(CheckboxWidget, '%srequired' % prefix, title=_('Required'), value=self.required)
if 'backoffice_info_text' in parameters:
form.add(
WysiwygTextWidget,
'%sbackoffice_info_text' % prefix,
title=_('Information Text for Backoffice'),
value=self.backoffice_info_text,
)
def button_label_export_to_xml(self, xml_item, charset, include_id=False):
if self.button_label == 0:
pass
elif self.button_label is None:
# button_label being None is a special case meaning "no button", it
# should be handled differently than the "not filled" case
el = ET.SubElement(xml_item, 'button_label')
else:
el = ET.SubElement(xml_item, 'button_label')
el.text = self.button_label
def button_label_init_with_xml(self, element, charset, include_id=False, snapshot=False):
if element is None:
return
# this can be None if element is self-closing, <button_label />, which
# then maps to None, meaning "no button".
self.button_label = xml_node_text(element)
register_item_class(CommentableWorkflowStatusItem)
class ChoiceWorkflowStatusItem(WorkflowStatusJumpItem):
description = _('Manual Jump')
key = 'choice'
endpoint = False
waitpoint = True
ok_in_global_action = False
label = None
identifier = None
by = []
backoffice_info_text = None
require_confirmation = False
ignore_form_errors = False
def get_label(self):
expression = self.get_expression(self.label)
if expression['type'] == 'text':
return expression['value']
return _('computed label')
def get_line_details(self):
to_status = None
if self.status == '_previous':
to_status = WorkflowStatus(_('previously marked status'))
elif self.status:
try:
to_status = self.parent.parent.get_status(self.status)
except KeyError:
return _('broken, missing destination status')
if self.label and to_status:
more = ''
if self.set_marker_on_status:
more += ' ' + str(_('(and set marker)'))
if self.by:
return _('"%(label)s", to %(to)s, by %(by)s%(more)s') % {
'label': self.get_label(),
'to': to_status.name,
'by': self.render_list_of_roles(self.by),
'more': more,
}
else:
return _('"%(label)s", to %(to)s%(more)s') % {
'label': self.get_label(),
'to': to_status.name,
'more': more,
}
else:
return _('not completed')
def fill_form(self, form, formdata, user, **kwargs):
label = self.compute(self.label)
if not label:
return
widget = form.add_submit('button%s' % self.id, label)
if self.identifier:
widget.extra_css_class = 'button-%s' % self.identifier
if self.require_confirmation:
get_response().add_javascript(['jquery.js', '../../i18n.js', 'qommon.js'])
widget.attrs = {'data-ask-for-confirmation': 'true'}
widget.backoffice_info_text = self.backoffice_info_text
widget.ignore_form_errors = self.ignore_form_errors
if self.ignore_form_errors:
widget.attrs['formnovalidate'] = 'formnovalidate'
def submit_form(self, form, formdata, user, evo):
if form.get_submit() == 'button%s' % self.id:
wf_status = self.get_target_status(formdata)
if wf_status:
evo.status = 'wf-%s' % wf_status[0].id
self.handle_markers_stack(formdata)
form.clear_errors()
return True # get out of processing loop
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'label' in parameters:
form.add(ComputedExpressionWidget, '%slabel' % prefix, title=_('Label'), value=self.label)
if 'by' in parameters:
form.add(
WidgetList,
'%sby' % prefix,
title=_('By'),
element_type=SingleSelectWidget,
value=self.by,
add_element_label=self.get_add_role_label(),
element_kwargs={
'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles(),
},
)
if 'require_confirmation' in parameters:
form.add(
CheckboxWidget,
'%srequire_confirmation' % prefix,
title=_('Require confirmation'),
value=self.require_confirmation,
)
if 'backoffice_info_text' in parameters:
form.add(
WysiwygTextWidget,
'%sbackoffice_info_text' % prefix,
title=_('Information Text for Backoffice'),
value=self.backoffice_info_text,
)
if 'identifier' in parameters:
form.add(
VarnameWidget,
'%sidentifier' % prefix,
title=_('Identifier'),
value=self.identifier,
advanced=True,
)
if 'ignore_form_errors' in parameters:
form.add(
CheckboxWidget,
'%signore_form_errors' % prefix,
title=_('Ignore form errors'),
value=self.ignore_form_errors,
advanced=True,
)
def get_parameters(self):
return (
'label',
'by',
'status',
'require_confirmation',
'backoffice_info_text',
'ignore_form_errors',
'set_marker_on_status',
'condition',
'identifier',
)
register_item_class(ChoiceWorkflowStatusItem)
class JumpOnSubmitWorkflowStatusItem(WorkflowStatusJumpItem):
description = _('On Submit Jump')
key = 'jumponsubmit'
ok_in_global_action = False
def get_line_details(self):
if self.status:
if self.get_target_status():
return _('to %s') % self.get_target_status()[0].name
else:
return _('broken')
else:
return _('not completed')
def submit_form(self, form, formdata, user, evo):
if form.is_submitted() and not form.has_errors():
wf_status = self.get_target_status(formdata)
if wf_status:
evo.status = 'wf-%s' % wf_status[0].id
self.handle_markers_stack(formdata)
def get_parameters(self):
return ('status', 'set_marker_on_status', 'condition')
register_item_class(JumpOnSubmitWorkflowStatusItem)
class SendmailWorkflowStatusItem(WorkflowStatusItem):
description = _('Email')
key = 'sendmail'
category = 'interaction'
support_substitution_variables = True
to = []
subject = None
mail_template = None
body = None
custom_from = None
attachments = None
comment = None
def _get_role_id_from_xml(self, elem, charset, include_id=False):
# override to allow for destination set with computed values.
if elem is None:
return None
value = xml_node_text(elem)
if self.get_expression(value)['type'] != 'text' or '@' in value:
return value
return super()._get_role_id_from_xml(elem, charset, include_id=include_id)
def render_list_of_roles_or_emails(self, roles):
t = []
for r in roles:
expression = self.get_expression(r)
if expression['type'] in ('python', 'template'):
t.append(_('computed value'))
elif '@' in expression['value']:
t.append(expression['value'])
else:
role_label = get_role_translation_label(self.parent.parent, r)
if role_label:
t.append(role_label)
return ', '.join([str(x) for x in t])
def get_to_parameter_view_value(self):
return self.render_list_of_roles_or_emails(self.to)
def get_line_details(self):
if self.to:
return _('to %s') % self.render_list_of_roles_or_emails(self.to)
else:
return _('not completed')
def get_parameters(self):
return ('to', 'mail_template', 'subject', 'body', 'attachments', 'custom_from', 'condition')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
subject_body_attrs = {}
if 'subject' in parameters or 'body' in parameters:
if get_publisher().has_site_option('mail-templates') and MailTemplate.count():
subject_body_attrs = {
'data-dynamic-display-value': '',
'data-dynamic-display-child-of': '%smail_template' % prefix,
}
if 'to' in parameters:
form.add(
WidgetList,
'%sto' % prefix,
title=_('To'),
element_type=SingleSelectWidgetWithOther,
value=self.to,
add_element_label=self.get_add_role_label(),
element_kwargs={
'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles(include_logged_in_users=False),
},
)
if 'subject' in parameters:
form.add(
StringWidget,
'%ssubject' % prefix,
title=_('Subject'),
validation_function=ComputedExpressionWidget.validate_template,
value=self.subject,
size=40,
attrs=subject_body_attrs,
)
if (
'mail_template' in parameters
and get_publisher().has_site_option('mail-templates')
and MailTemplate.count()
):
form.add(
SingleSelectWidget,
'%smail_template' % prefix,
title=_('Mail Template'),
value=self.mail_template,
options=[(None, '', '')] + MailTemplate.get_as_options_list(),
attrs={'data-dynamic-display-parent': 'true'},
)
if 'body' in parameters:
form.add(
TextWidget,
'%sbody' % prefix,
title=_('Body'),
value=self.body,
cols=80,
rows=10,
validation_function=ComputedExpressionWidget.validate_template,
attrs=subject_body_attrs,
)
if 'custom_from' in parameters:
form.add(
ComputedExpressionWidget,
'%scustom_from' % prefix,
title=_('Custom From Address'),
value=self.custom_from,
advanced=not (bool(self.custom_from)),
)
def get_body_parameter_view_value(self):
return htmltext('<pre class="wrapping-pre">%s</pre>') % self.body
def perform(self, formdata):
if not self.to:
return
if not (self.subject and self.body) and not self.mail_template:
return
url = formdata.get_url()
body = self.body
subject = self.subject
extra_attachments = None
if self.mail_template:
mail_template = MailTemplate.get_by_slug(self.mail_template)
if mail_template:
body = mail_template.body
subject = mail_template.subject
extra_attachments = mail_template.attachments
else:
message = _('reference to invalid mail template %(mail_template)s in status %(status)s') % {
'status': self.parent.name,
'mail_template': self.mail_template,
}
get_publisher().record_error(message, formdata=formdata, status_item=self)
return
try:
mail_body = template_on_formdata(
formdata, self.compute(body, render=False), autoescape=body.startswith('<')
)
except TemplateError as e:
get_logger().error(
'error in template for email body [%s], ' 'mail could not be generated: %s' % (url, str(e))
)
return
try:
mail_subject = template_on_formdata(
formdata, self.compute(subject, render=False), autoescape=False
)
except TemplateError as e:
get_logger().error(
'error in template for email subject [%s], ' 'mail could not be generated: %s' % (url, str(e))
)
return
# this works around the fact that parametric workflows only support
# string values, so if we get set a string, we convert it here to an
# array.
if isinstance(self.to, str):
self.to = [self.to]
addresses = []
for dest in self.to:
try:
dest = self.compute(dest, raises=True)
except Exception:
continue
if not dest:
continue
if isinstance(dest, list):
addresses.extend(dest)
continue
if isinstance(dest, str) and ',' in dest:
# if the email contains a comma consider it as a serie of
# emails
addresses.extend([x.strip() for x in dest.split(',')])
continue
if '@' in str(dest):
addresses.append(dest)
continue
if dest == '_submitter':
submitter_email = formdata.formdef.get_submitter_email(formdata)
if submitter_email:
addresses.append(submitter_email)
continue
for real_dest in formdata.get_function_roles(dest):
try:
role = get_publisher().role_class.get(real_dest)
except KeyError:
continue
addresses.extend(role.get_emails())
if not addresses:
return
email_from = None
if self.custom_from:
email_from = self.compute(self.custom_from)
attachments = self.convert_attachments_to_uploads(extra_attachments)
if len(addresses) > 1:
emails.email(
mail_subject,
mail_body,
email_rcpt=None,
bcc=addresses,
email_from=email_from,
attachments=attachments,
fire_and_forget=True,
)
else:
emails.email(
mail_subject,
mail_body,
email_rcpt=addresses,
email_from=email_from,
attachments=attachments,
fire_and_forget=True,
)
register_item_class(SendmailWorkflowStatusItem)
def get_formdata_template_context(formdata=None):
ctx = get_publisher().substitutions.get_context_variables('lazy')
if formdata:
ctx['url'] = formdata.get_url()
ctx['url_status'] = '%sstatus' % formdata.get_url()
ctx['details'] = formdata.formdef.get_detailed_email_form(formdata, ctx['url'])
ctx['name'] = formdata.formdef.name
ctx['number'] = formdata.id
if formdata.evolution and formdata.evolution[-1].comment:
ctx['comment'] = formdata.evolution[-1].comment
else:
ctx['comment'] = ''
ctx.update(formdata.get_as_dict())
# compatibility vars
ctx['before'] = ctx.get('form_previous_status')
ctx['after'] = ctx.get('form_status')
ctx['evolution'] = ctx.get('form_evolution')
return ctx
def template_on_html_string(template):
return template_on_formdata(None, template, ezt_format=ezt.FORMAT_HTML)
def template_on_formdata(formdata=None, template=None, **kwargs):
assert template is not None
if not Template.is_template_string(template):
# no tags, no variables: don't even process formdata
return template
context = get_formdata_template_context(formdata)
return template_on_context(context, template, **kwargs)
def template_on_context(context=None, template=None, **kwargs):
assert template is not None
if not Template.is_template_string(template):
return template
return Template(template, **kwargs).render(context)
class SendSMSWorkflowStatusItem(WorkflowStatusItem):
description = _('SMS')
key = 'sendsms'
category = 'interaction'
support_substitution_variables = True
to = []
body = None
# don't use roles (de)serializer for "to" field
to_export_to_xml = None
to_init_with_xml = None
@classmethod
def is_available(cls, workflow=None):
sms_cfg = get_cfg('sms', {})
return bool(sms_cfg.get('sender') and sms_cfg.get('passerelle_url'))
def get_parameters(self):
return ('to', 'body', 'condition')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'to' in parameters:
form.add(
WidgetList,
'%sto' % prefix,
title=_('To'),
element_type=ComputedExpressionWidget,
value=self.to,
add_element_label=_('Add Number'),
element_kwargs={'render_br': False},
)
if 'body' in parameters:
form.add(TextWidget, '%sbody' % prefix, title=_('Body'), value=self.body, cols=80, rows=10)
def perform(self, formdata):
if not self.is_available():
return
if not self.to:
return
if not self.body:
return
destinations = [self.compute(x) for x in self.to]
destinations = [x for x in destinations if x] # ignore empty elements
if not destinations:
return
try:
sms_body = template_on_formdata(formdata, self.compute(self.body, render=False))
except TemplateError as e:
get_logger().error('error in template for sms [%s], sms could not be generated' % str(e))
return
from .qommon import sms
sms_cfg = get_cfg('sms', {})
sender = sms_cfg.get('sender', 'AuQuotidien')[:11]
try:
sms.SMS.get_sms_class().send(sender, destinations, sms_body)
except errors.SMSError as e:
get_logger().error(e)
register_item_class(SendSMSWorkflowStatusItem)
class DisplayMessageWorkflowStatusItem(WorkflowStatusItem):
description = _('Alert')
key = 'displaymsg'
category = 'interaction'
support_substitution_variables = True
ok_in_global_action = False
to = None
position = 'top'
level = None
message = None
def get_line_details(self):
parts = []
if self.position == 'top':
parts.append(_('top of page'))
elif self.position == 'bottom':
parts.append(_('bottom of page'))
elif self.position == 'actions':
parts.append(_('with actions'))
if self.to:
parts.append(_('for %s') % self.render_list_of_roles(self.to))
return ', '.join([str(x) for x in parts])
def get_message(self, filled, position='top'):
if not (self.message and self.position == position and filled.is_for_current_user(self.to)):
return ''
dict = copy.copy(get_publisher().substitutions.get_context_variables('lazy'))
dict['date'] = misc.localstrftime(filled.receipt_time)
dict['number'] = filled.id
handling_role = filled.get_handling_role()
if handling_role and handling_role.details:
dict['receiver'] = handling_role.details.replace('\n', '<br />')
message = self.message
if self.level:
message = '<div class="%snotice">%s</div>' % (self.level, message)
return Template(message, ezt_format=ezt.FORMAT_HTML).render(dict)
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'message' in parameters:
form.add(
TextWidget,
'%smessage' % prefix,
title=_('Message'),
value=self.message,
cols=80,
rows=10,
validation_function=ComputedExpressionWidget.validate_template,
)
if 'level' in parameters:
form.add(
SingleSelectWidget,
'%slevel' % prefix,
title=_('Level'),
value=self.level,
options=[
(None, ''),
('success', _('Success')),
('info', _('Information')),
('warning', _('Warning')),
('error', _('Error')),
],
)
if 'position' in parameters:
form.add(
SingleSelectWidget,
'%sposition' % prefix,
title=_('Position'),
value=self.position,
options=[
('top', _('Top of page')),
('bottom', _('Bottom of page')),
# ('actions', _('With actions')) "too complicated"
],
)
if 'to' in parameters:
form.add(
WidgetList,
'%sto' % prefix,
title=_('To'),
element_type=SingleSelectWidget,
value=self.to or [],
add_element_label=self.get_add_role_label(),
element_kwargs={
'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles(include_logged_in_users=False),
},
)
def get_parameters(self):
return ('message', 'level', 'position', 'to', 'condition')
def get_message_parameter_view_value(self):
if self.message.startswith('<'):
return htmltext(self.message)
return htmltext('<pre>%s</pre>') % self.message
register_item_class(DisplayMessageWorkflowStatusItem)
class RedirectToStatusWorkflowStatusItem(WorkflowStatusItem):
description = _('Status Page Redirection')
key = 'redirectstatus'
ok_in_global_action = False
backoffice = False
def perform(self, formdata):
return formdata.get_url(self.backoffice)
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'backoffice' in parameters:
form.add(
CheckboxWidget,
'%sbackoffice' % prefix,
title=_('Redirect to backoffice page'),
value=self.backoffice,
)
def get_parameters(self):
return ('backoffice',)
# RedirectToStatusWorkflowStatusItem is not registered as the class kept for
# backward compatibility only and should not be exposed to the user. (#3031)
class EditableWorkflowStatusItem(WorkflowStatusItem):
description = _('Edition')
key = 'editable'
category = 'formdata-action'
endpoint = False
waitpoint = True
ok_in_global_action = False
by = []
status = None
label = None
backoffice_info_text = None
def get_line_details(self):
if self.by:
return _('by %s') % self.render_list_of_roles(self.by)
else:
return _('not completed')
def fill_form(self, form, formdata, user, **kwargs):
label = self.label
if not label:
label = _('Edit Form')
form.add_submit('button%s' % self.id, label)
form.get_widget('button%s' % self.id).backoffice_info_text = self.backoffice_info_text
def submit_form(self, form, formdata, user, evo):
if form.get_submit() == 'button%s' % self.id:
return formdata.get_url(backoffice=get_request().is_in_backoffice()) + 'wfedit-%s' % self.id
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs):
super().add_parameters_widgets(form, parameters, prefix=prefix, formdef=formdef, **kwargs)
if 'by' in parameters:
form.add(
WidgetList,
'%sby' % prefix,
title=_('By'),
element_type=SingleSelectWidget,
value=self.by,
add_element_label=self.get_add_role_label(),
element_kwargs={
'render_br': False,
'options': [(None, '---', None)] + self.get_list_of_roles(),
},
)
if 'status' in parameters:
form.add(
SingleSelectWidget,
'%sstatus' % prefix,
title=_('Status After Edit'),
value=self.status,
hint=_("Don't select any if you don't want status change processing"),
options=[(None, '---')] + [(x.id, x.name) for x in self.parent.parent.possible_status],
)
if 'label' in parameters:
form.add(StringWidget, '%slabel' % prefix, title=_('Button Label'), value=self.label)
if 'backoffice_info_text' in parameters:
form.add(
WysiwygTextWidget,
'%sbackoffice_info_text' % prefix,
title=_('Information Text for Backoffice'),
value=self.backoffice_info_text,
)
def get_parameters(self):
return ('by', 'status', 'label', 'backoffice_info_text', 'condition')
register_item_class(EditableWorkflowStatusItem)
def load_extra():
from .wf import aggregation_email # noqa pylint: disable=unused-import
from .wf import anonymise # noqa pylint: disable=unused-import
from .wf import attachment # noqa pylint: disable=unused-import
from .wf import backoffice_fields # noqa pylint: disable=unused-import
from .wf import create_carddata # noqa pylint: disable=unused-import
from .wf import create_formdata # noqa pylint: disable=unused-import
from .wf import criticality # noqa pylint: disable=unused-import
from .wf import dispatch # noqa pylint: disable=unused-import
from .wf import edit_carddata # noqa pylint: disable=unused-import
from .wf import export_to_model # noqa pylint: disable=unused-import
from .wf import external_workflow # noqa pylint: disable=unused-import
from .wf import form # noqa pylint: disable=unused-import
from .wf import geolocate # noqa pylint: disable=unused-import
from .wf import jump # noqa pylint: disable=unused-import
from .wf import notification # noqa pylint: disable=unused-import
from .wf import profile # noqa pylint: disable=unused-import
from .wf import redirect_to_url # noqa pylint: disable=unused-import
from .wf import register_comment # noqa pylint: disable=unused-import
from .wf import remove # noqa pylint: disable=unused-import
from .wf import resubmit # noqa pylint: disable=unused-import
from .wf import roles # noqa pylint: disable=unused-import
from .wf import timeout_jump # noqa pylint: disable=unused-import
from .wf import wscall # noqa pylint: disable=unused-import
aggregation_email.register_cronjob()
jump.register_cronjob()
from .wf.export_to_model import ExportToModel # noqa pylint: disable=unused-import,wrong-import-position