wcs/wcs/wf/external_workflow.py

183 lines
7.5 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2020 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from quixote import get_publisher
from wcs.qommon import _
from wcs.qommon.form import SingleSelectWidget
from wcs.logged_errors import LoggedError
from wcs.workflows import WorkflowStatusItem, perform_items, register_item_class
from wcs.workflows import WorkflowGlobalActionWebserviceTrigger, Workflow
from wcs.wf.create_formdata import LinkedFormdataEvolutionPart
from wcs.carddef import CardDef
from wcs.formdef import FormDef
class ExternalWorkflowGlobalAction(WorkflowStatusItem):
description = _('External workflow')
key = 'external_workflow_global_action'
category = 'formdata-action'
slug = None
trigger_id = None
@classmethod
def is_available(cls, workflow=None):
return get_publisher().has_site_option('external-workflow')
def get_workflow_webservice_triggers(self, workflow):
for action in workflow.global_actions or []:
for trigger in action.triggers or []:
if isinstance(trigger, WorkflowGlobalActionWebserviceTrigger) and trigger.identifier:
yield trigger
def get_object_def(self, object_slug=None):
slug = object_slug or self.slug
object_type, slug = slug.split(':')
if object_type == 'formdef':
object_class = FormDef
elif object_type == 'carddef':
object_class = CardDef
try:
return object_class.get_by_urlname(slug)
except KeyError:
pass
def get_trigger(self, workflow):
try:
trigger_type, trigger_id = self.trigger_id.split(':', 1)
except ValueError:
return
for trigger in self.get_workflow_webservice_triggers(workflow):
if trigger.identifier == trigger_id:
return trigger
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
super(ExternalWorkflowGlobalAction, self).add_parameters_widgets(
form, parameters, prefix=prefix, formdef=formdef)
if 'slug' in parameters:
objects = [(None, '---', '')]
for wf in Workflow.select():
if any(self.get_workflow_webservice_triggers(wf)):
for objectdef in wf.formdefs(lightweight=True) + wf.carddefs(lightweight=True):
object_slug = '%s:%s' % (objectdef.__class__.__name__.lower(),
objectdef.url_name)
objects.append((object_slug, objectdef.name, object_slug))
if len(objects) == 1:
form.add_global_errors([_('No workflow with external triggerable global action.')])
return
objects.sort(key=lambda x: x[1])
form.add(SingleSelectWidget, '%sslug' % prefix,
title=_('Form/Card'),
value=self.slug,
required=True,
options=objects)
if 'trigger_id' in parameters and form.get('%sslug' % prefix):
object_def = self.get_object_def(form.get('%sslug' % prefix))
if not object_def:
return
triggers_names = [(None, '---', '')]
for trigger in self.get_workflow_webservice_triggers(object_def.workflow):
trigger_id = 'action:%s' % trigger.identifier
triggers_names.append((trigger_id, trigger.parent.name, trigger_id))
form.add(SingleSelectWidget, '%strigger_id' % prefix,
title=_('Action'),
value=self.trigger_id,
required=True,
options=triggers_names)
def get_line_details(self):
if self.slug and self.trigger_id:
objectdef = self.get_object_def()
if objectdef:
trigger = self.get_trigger(objectdef.workflow)
if trigger:
return _('action "%(trigger_name)s" on %(object_name)s') % {
'trigger_name': trigger.parent.name,
'object_name': objectdef.name}
return _('not completed')
def iter_target_datas(self, formdata, objectdef):
parent = formdata.get_parent()
if parent:
parent_identifier = '%s:%s' % (parent.formdef.xml_root_node,
parent.formdef.url_name)
if parent_identifier == self.slug:
yield parent
data_ids = []
# search linked objects in data sources
for field in formdata.get_formdef().get_all_fields():
if getattr(field, 'data_source', None) and field.data_source['type'] == self.slug:
linked_id = formdata.data.get(field.id)
if linked_id:
data_ids.append(linked_id)
# search in evolution
for part in formdata.iter_evolution_parts():
if isinstance(part, LinkedFormdataEvolutionPart):
part_identifier = '%s:%s' % (part.formdef.xml_root_node, part.formdef.url_name)
if part_identifier == self.slug:
data_ids.append(part.formdata_id)
for target_id in data_ids:
try:
yield objectdef.data_class().get(target_id)
except KeyError as e:
# use custom error message depending on target type
LoggedError.record(_('Could not find linked "%(object_name)s" object by id %(object_id)s') % {
'object_name': objectdef.name, 'object_id': target_id},
formdata=formdata, status_item=self, exception=e)
def get_parameters(self):
return ('slug', 'trigger_id', 'condition')
def perform(self, formdata):
objectdef = self.get_object_def()
if not objectdef:
return
trigger = self.get_trigger(objectdef.workflow)
if not trigger:
LoggedError.record(_('No trigger with id "%s" found in workflow') % self.trigger_id,
formdata=formdata, status_item=self)
return
class CallerSource:
def __init__(self, formdata):
self.formdata = formdata
def get_substitution_variables(self):
return {'caller_form': self.formdata.get_substitution_variables(minimal=True)['form']}
caller_source = CallerSource(formdata)
for target_data in self.iter_target_datas(formdata, objectdef):
with get_publisher().substitutions.temporary_feed(target_data):
get_publisher().substitutions.reset()
get_publisher().substitutions.feed(get_publisher())
get_publisher().substitutions.feed(target_data.formdef)
get_publisher().substitutions.feed(target_data)
get_publisher().substitutions.feed(caller_source)
perform_items(trigger.parent.items, target_data)
register_item_class(ExternalWorkflowGlobalAction)