wcs/wcs/wf/jump.py

288 lines
11 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2013 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import json
import os
import sys
from quixote import get_publisher, get_request, redirect
from quixote.directory import Directory
from qommon import _
from qommon.humantime import *
from qommon.form import *
from qommon import errors
from qommon.publisher import get_publisher_class
from qommon.cron import CronJob
from wcs.workflows import Workflow, WorkflowStatusJumpItem, register_item_class
from wcs.api import get_user_from_api_query_string, is_url_signed
JUMP_TIMEOUT_INTERVAL = max((60 // int(os.environ.get('WCS_JUMP_TIMEOUT_CHECKS', '3')), 1))
def jump_and_perform(formdata, status, workflow_data=None):
if workflow_data:
formdata.update_workflow_data(workflow_data)
formdata.store()
formdata.jump_status(status)
url = formdata.perform_workflow()
return url
class JumpDirectory(Directory):
_q_exports = ['trigger']
def __init__(self, formdata, wfstatusitem, wfstatus):
self.formdata = formdata
self.wfstatusitem = wfstatusitem
self.wfstatus = wfstatus
self.trigger = TriggerDirectory(formdata, wfstatusitem, wfstatus)
class TriggerDirectory(Directory):
def __init__(self, formdata, wfstatusitem, wfstatus):
self.formdata = formdata
self.wfstatusitem = wfstatusitem
self.wfstatus = wfstatus
def _q_lookup(self, component):
if get_request().is_json():
get_response().set_content_type('application/json')
if not get_request().get_method() == 'POST':
raise errors.AccessForbiddenError()
signed_request = is_url_signed()
user = get_user_from_api_query_string() or get_request().user
for item in self.wfstatus.items:
if not isinstance(item, JumpWorkflowStatusItem):
continue
if not hasattr(item, 'trigger'):
continue
if component == item.trigger:
if signed_request and not item.by:
pass
elif not item.check_auth(self.formdata, user):
raise errors.AccessForbiddenError()
get_request().trigger_name = component
workflow_data = None
if hasattr(get_request(), 'json'):
workflow_data = get_request().json
url = jump_and_perform(self.formdata, item.status,
workflow_data=workflow_data)
if get_request().is_json():
return json.dumps({'err': 0, 'url': url})
elif url:
return redirect(url)
else:
return redirect(self.formdata.get_url())
# no trigger found
raise errors.TraversalError()
class JumpWorkflowStatusItem(WorkflowStatusJumpItem):
description = N_('Change Status Automatically')
key = 'jump'
by = []
condition = None
trigger = None
timeout = None
_granularity = JUMP_TIMEOUT_INTERVAL * 60
directory_name = 'jump'
directory_class = JumpDirectory
def timeout_init_with_xml(self, elem, charset, include_id=False):
if elem is None or elem.text is None:
self.timeout = None
else:
timeout = elem.text.encode(charset)
if timeout.startswith('='):
self.timeout = timeout
else:
self.timeout = int(timeout)
@property
def waitpoint(self):
if self.timeout or self.trigger:
return True
return False
def render_as_line(self):
if not self.status:
return _('Change Status Automatically (not completed)')
wf_status = self.get_target_status()
if not wf_status:
return _('Change Status Automatically (broken)')
reasons = []
if self.condition:
reasons.append(_('condition'))
if self.trigger:
reasons.append(_('trigger'))
if self.timeout:
reasons.append(_('timeout'))
if reasons:
return _('Change Status Automatically (to %(name)s) (%(reasons)s)') % {
'name': wf_status[0].name,
'reasons': ', '.join(reasons)}
else:
return _('Change Status Automatically (to %s)') % wf_status[0].name
def get_parameters(self):
return ('status', 'set_marker_on_status', 'condition', 'trigger', 'by', 'timeout')
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
WorkflowStatusJumpItem.add_parameters_widgets(self, form, parameters, prefix, formdef)
if 'condition' in parameters:
form.add(StringWidget, '%scondition' % prefix, title=_('Condition (formula)'),
value=self.condition, size=40)
if 'trigger' in parameters:
form.add(StringWidget, '%strigger' % prefix, title=_('Trigger (string)'),
value=self.trigger, size=40)
if 'by' in parameters:
form.add(WidgetList, '%sby' % prefix, title=_('Roles allowed to trigger'),
element_type=SingleSelectWidget,
value=self.by,
add_element_label=_('Add Role'),
element_kwargs={'render_br': False,
'options': [(None, '---', None)] +
self.get_list_of_roles(include_logged_in_users=False)})
if 'timeout' in parameters:
_hint = htmltext(_("ex.: 7 days 1 minute<br/> Usable units of time: %(variables)s. " \
'<br/><span class="warning">Minimal duration is %(granularity)s</span>')) % {
'variables': ', '.join(timewords()),
'granularity': seconds2humanduration(self._granularity)}
if str(self.timeout).startswith('='):
form.add(ComputedExpressionWidget, '%stimeout' % prefix, title=_('Timeout'),
value=self.timeout, hint=_hint)
else:
form.add(StringWidget, '%stimeout' % prefix, title=_('Timeout'),
value=seconds2humanduration(self.timeout), hint=_hint)
def timeout_parse(self, value):
if not value:
return value
if value.startswith('='):
return value
try:
return humanduration2seconds(value)
except ValueError:
return None
def perform(self, formdata):
if not self.status:
return
if self.must_jump(formdata):
wf_status = self.get_target_status(formdata)
if wf_status:
formdata.status = 'wf-%s' % wf_status[0].id
self.handle_markers_stack(formdata)
def must_jump(self, formdata):
must_jump = True
if self.condition:
variables = get_publisher().substitutions.get_context_variables()
try:
must_jump = eval(self.condition, get_publisher().get_global_eval_dict(), variables)
except:
# get the variables in the locals() namespace so they are
# displayed within the trace.
condition = self.condition
global_variables = get_publisher().get_global_eval_dict()
get_publisher().notify_of_exception(sys.exc_info())
must_jump = False
if self.trigger:
triggered = (hasattr(get_request(), 'trigger_name') and
get_request().trigger_name == self.trigger)
must_jump = must_jump and triggered
if self.timeout:
timeout = int(self.compute(self.timeout))
if formdata.evolution:
last = formdata.evolution[-1].time
else:
last = formdata.receipt_time
if last:
diff = time.time() - time.mktime(last)
must_jump = (diff > timeout) and must_jump
return must_jump
register_item_class(JumpWorkflowStatusItem)
def workflows_with_timeout():
'''Return a list of workflow objects with at least a status with a timeout object
'''
wfs_status = {}
for id in Workflow.keys():
workflow = Workflow.get(id, ignore_errors = True)
if not workflow:
continue
for status in workflow.possible_status:
status_str = 'wf-%s' % status.id
for item in status.items:
if hasattr(item, 'status') and hasattr(item, 'timeout') and (
item.status and item.timeout):
if not wfs_status.has_key(id):
wfs_status[id] = {}
if not wfs_status[id].has_key(status_str):
wfs_status[id][status_str] = []
if not item.get_target_status():
# this will catch status being a removed status
continue
wfs_status[id][status_str].append(item)
return wfs_status
def _apply_timeouts(publisher):
'''Traverse all filled form and apply expired timeout jumps if needed'''
from formdef import FormDef
wfs_status = workflows_with_timeout()
for formdef_id in FormDef.keys():
formdef = FormDef.get(formdef_id, ignore_errors = True)
if not formdef:
continue
if str(formdef.workflow_id) in wfs_status.keys():
for formdata_id in formdef.data_class().keys():
formdata = formdef.data_class().get(formdata_id, ignore_errors = True)
if not formdata:
continue
if formdata.status in wfs_status[str(formdef.workflow_id)]:
for x in wfs_status[str(formdef.workflow_id)][formdata.status]:
get_publisher().substitutions.reset()
get_publisher().substitutions.feed(get_publisher())
get_publisher().substitutions.feed(formdef)
get_publisher().substitutions.feed(formdata)
if x.must_jump(formdata):
jump_and_perform(formdata, x.status)
break
if get_publisher_class():
# every JUMP_TIMEOUT_INTERVAL minutes check for expired status jump
# timeouts.
get_publisher_class().register_cronjob(
CronJob(_apply_timeouts,
hours=range(24), minutes=range(0, 60, JUMP_TIMEOUT_INTERVAL)))