wcs/wcs/wf/jump.py

310 lines
12 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2013 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import itertools
import json
import os
import sys
from django.utils import six
from quixote import get_publisher, get_request, redirect
from quixote.directory import Directory
from ..qommon import _, force_str
from ..qommon.humantime import *
from ..qommon.form import *
from ..qommon import errors
from ..qommon.publisher import get_publisher_class
from ..qommon.cron import CronJob
from wcs.workflows import Workflow, WorkflowStatusJumpItem, register_item_class
from wcs.api import get_user_from_api_query_string, is_url_signed
from wcs.conditions import Condition
JUMP_TIMEOUT_INTERVAL = max((60 // int(os.environ.get('WCS_JUMP_TIMEOUT_CHECKS', '3')), 1))
def jump_and_perform(formdata, action, workflow_data=None):
action.handle_markers_stack(formdata)
if workflow_data:
formdata.update_workflow_data(workflow_data)
formdata.store()
formdata.jump_status(action.status)
url = formdata.perform_workflow()
return url
class JumpDirectory(Directory):
_q_exports = ['trigger']
def __init__(self, formdata, wfstatusitem, wfstatus):
self.formdata = formdata
self.wfstatusitem = wfstatusitem
self.wfstatus = wfstatus
self.trigger = TriggerDirectory(formdata, wfstatusitem, wfstatus)
class TriggerDirectory(Directory):
def __init__(self, formdata, wfstatusitem, wfstatus):
self.formdata = formdata
self.wfstatusitem = wfstatusitem
self.wfstatus = wfstatus
def _q_lookup(self, component):
if get_request().is_json():
get_response().set_content_type('application/json')
if not get_request().get_method() == 'POST':
raise errors.AccessForbiddenError()
signed_request = is_url_signed()
user = get_user_from_api_query_string() or get_request().user
for item in self.wfstatus.items:
if not isinstance(item, JumpWorkflowStatusItem):
continue
if not hasattr(item, 'trigger'):
continue
if component == item.trigger:
if signed_request and not item.by:
pass
elif not item.check_auth(self.formdata, user):
raise errors.AccessForbiddenError()
if item.must_jump(self.formdata, trigger=item.trigger):
workflow_data = None
if hasattr(get_request(), '_json'):
workflow_data = get_request().json
url = jump_and_perform(self.formdata, item, workflow_data=workflow_data)
else:
if get_request().is_json():
get_response().status_code = 403
return json.dumps({'err': 1, 'err_desc': 'unmet condition'})
else:
raise errors.AccessForbiddenError()
if get_request().is_json():
return json.dumps({'err': 0, 'url': url})
elif url:
return redirect(url)
else:
return redirect(self.formdata.get_url())
# no trigger found
raise errors.TraversalError()
class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
description = N_('Automatic Jump')
key = 'jump'
by = []
condition = None
trigger = None
timeout = None
_granularity = JUMP_TIMEOUT_INTERVAL * 60
directory_name = 'jump'
directory_class = JumpDirectory
def timeout_init_with_xml(self, elem, charset, include_id=False):
if elem is None or elem.text is None:
self.timeout = None
else:
timeout = force_str(elem.text)
if self.get_expression(timeout)['type'] != 'text':
self.timeout = timeout
else:
self.timeout = int(timeout)
def migrate(self):
changed = super(JumpWorkflowStatusItem, self).migrate()
if isinstance(self.condition, six.string_types):
if self.condition:
self.condition = {'type': 'python', 'value': self.condition}
else:
self.condition = {}
changed = True
return changed
@property
def waitpoint(self):
if self.timeout or self.trigger:
return True
return False
def render_as_line(self):
# override parent method to avoid mentioning the condition twice.
return '%s (%s)' % (_(self.description), self.get_line_details())
def get_line_details(self):
if not self.status:
return _('not completed')
wf_status = self.get_target_status()
if not wf_status:
return _('broken')
reasons = []
if self.condition and self.condition.get('value'):
reasons.append(_('condition'))
if self.trigger:
reasons.append(_('trigger'))
if self.timeout:
reasons.append(_('timeout'))
if reasons:
return _('to %(name)s, %(reasons)s') % {
'name': wf_status[0].name,
'reasons': ', '.join(reasons)}
else:
return wf_status[0].name
def get_parameters(self):
return ('status', 'condition', 'trigger', 'by', 'timeout', 'set_marker_on_status')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
super(JumpWorkflowStatusItem, self).add_parameters_widgets(
form, parameters, prefix, formdef)
if 'condition' in parameters:
form.get_widget('%scondition' % prefix).advanced = False
if 'trigger' in parameters:
form.add(StringWidget, '%strigger' % prefix,
title=_('Identifier for webservice'),
hint=_('This jump will be triggered by an authorized call '
'to <form_url>/jump/trigger/<identifier>.'),
value=self.trigger, size=40)
if 'by' in parameters:
form.add(WidgetList, '%sby' % prefix, title=_('Roles allowed to trigger'),
element_type=SingleSelectWidget,
value=self.by,
add_element_label=_('Add Role'),
element_kwargs={'render_br': False,
'options': [(None, '---', None)] +
self.get_list_of_roles(include_logged_in_users=False)})
if 'timeout' in parameters:
_hint = htmltext(_("ex.: 7 days 1 minute<br/> Usable units of time: %(variables)s. " \
'<br/><span class="warning">Minimal duration is %(granularity)s</span>')) % {
'variables': ', '.join(timewords()),
'granularity': seconds2humanduration(self._granularity)}
if not isinstance(self.timeout, int) and self.get_expression(self.timeout)['type'] != 'text':
form.add(ComputedExpressionWidget, '%stimeout' % prefix, title=_('Timeout'),
value=self.timeout, hint=_hint)
else:
form.add(StringWidget, '%stimeout' % prefix, title=_('Timeout'),
value=seconds2humanduration(self.timeout), hint=_hint)
def timeout_parse(self, value):
if not value:
return value
if self.get_expression(value)['type'] != 'text':
return value
try:
return humanduration2seconds(value)
except ValueError:
return None
def perform(self, formdata):
if not self.status:
return
if self.must_jump(formdata):
wf_status = self.get_target_status(formdata)
if wf_status:
self.handle_markers_stack(formdata)
formdata.status = 'wf-%s' % wf_status[0].id
def check_condition(self, formdata):
# ship condition check here so it is not evaluated twice.
return True
def must_jump(self, formdata, trigger=None):
must_jump = True
if self.condition:
context = {'formdata': formdata, 'status_item': self}
try:
must_jump = Condition(self.condition, context).evaluate()
except RuntimeError:
must_jump = False
if self.trigger:
triggered = trigger is not None and trigger == self.trigger
must_jump = must_jump and triggered
if self.timeout:
timeout = int(self.compute(self.timeout))
last = formdata.last_update_time
if last:
diff = time.time() - time.mktime(last)
must_jump = (diff > timeout) and must_jump
return must_jump
register_item_class(JumpWorkflowStatusItem)
def workflows_with_timeout():
'''Return a list of workflow objects with at least a status with a timeout object
'''
wfs_status = {}
for id in Workflow.keys():
workflow = Workflow.get(id, ignore_errors = True)
if not workflow:
continue
for status in workflow.possible_status:
status_str = 'wf-%s' % status.id
for item in status.items:
if hasattr(item, 'status') and hasattr(item, 'timeout') and (
item.status and item.timeout):
if id not in wfs_status:
wfs_status[id] = {}
if status_str not in wfs_status[id]:
wfs_status[id][status_str] = []
if not item.get_target_status():
# this will catch status being a removed status
continue
wfs_status[id][status_str].append(item)
return wfs_status
def _apply_timeouts(publisher):
'''Traverse all filled form and apply expired timeout jumps if needed'''
from ..formdef import FormDef
from ..carddef import CardDef
wfs_status = workflows_with_timeout()
for formdef in itertools.chain(FormDef.select(ignore_errors=True), CardDef.select(ignore_errors=True)):
status_ids = wfs_status.get(str(formdef.workflow_id))
if not status_ids:
continue
formdata_class = formdef.data_class()
for status_id in status_ids:
for formdata in formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True):
for x in wfs_status[str(formdef.workflow_id)][formdata.status]:
get_publisher().substitutions.reset()
get_publisher().substitutions.feed(get_publisher())
get_publisher().substitutions.feed(formdef)
get_publisher().substitutions.feed(formdata)
if x.must_jump(formdata):
jump_and_perform(formdata, x)
break
if get_publisher_class():
# every JUMP_TIMEOUT_INTERVAL minutes check for expired status jump
# timeouts.
get_publisher_class().register_cronjob(
CronJob(_apply_timeouts,
name='evaluate_jumps',
hours=range(24), minutes=range(0, 60, JUMP_TIMEOUT_INTERVAL)))