wcs/wcs/admin/workflows.py

1960 lines
74 KiB
Python

# -*- coding: utf-8 -*-
#
# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import io
import textwrap
import time
import xml.etree.ElementTree as ET
from subprocess import PIPE, Popen
from django.utils.encoding import force_bytes
from quixote import get_publisher, get_request, get_response, get_session, redirect
from quixote.directory import Directory
from quixote.html import TemplateIO, htmltext
from wcs.backoffice.snapshots import SnapshotsDirectory
from wcs.backoffice.studio import StudioDirectory
from wcs.carddef import CardDef
from wcs.formdata import Evolution
from wcs.formdef import FormDef
from wcs.qommon import N_, _, errors, force_str, get_logger, misc, template
from wcs.qommon.admin.menu import command_icon
from wcs.qommon.backoffice.menu import html_top
from wcs.qommon.form import (
CheckboxWidget,
ColourWidget,
CompositeWidget,
FileWidget,
Form,
HtmlWidget,
SingleSelectWidget,
StringWidget,
UrlWidget,
VarnameWidget,
WysiwygTextWidget,
)
from wcs.workflows import (
DuplicateGlobalActionNameError,
DuplicateStatusNameError,
Workflow,
WorkflowBackofficeFieldsFormDef,
WorkflowCriticalityLevel,
WorkflowImportError,
WorkflowVariablesFieldsFormDef,
item_classes,
)
from . import utils
from .data_sources import NamedDataSourcesDirectory
from .fields import FieldDefPage, FieldsDirectory
from .logged_errors import LoggedErrorsDirectory
from .mail_templates import MailTemplatesDirectory
def svg(tag):
return '{http://www.w3.org/2000/svg}%s' % tag
def xlink(tag):
return '{http://www.w3.org/1999/xlink}%s' % tag
TITLE = svg('title')
POLYGON = svg('polygon')
XLINK_TITLE = xlink('title')
def remove_tag(node, tag):
for child in node:
if child.tag == tag:
node.remove(child)
def remove_attribute(node, att):
if att in node.attrib:
del node.attrib[att]
def adjust_style(node, top, colours, white_text=False, colour_class=None):
remove_tag(node, TITLE)
if node.get('class') and node.get('class').startswith('node '):
colour_class = node.get('class').split()[-1]
if (node.get('fill'), node.get('stroke')) in (('white', 'white'), ('white', 'none')):
# this is the general white background, reduce it to a dot
node.attrib['points'] = '0,0 0,0 0,0 0,0'
if node.tag == svg('text') and white_text:
node.attrib['fill'] = 'white'
for child in node:
remove_attribute(child, XLINK_TITLE)
if child.tag == '{http://www.w3.org/2000/svg}polygon' and colour_class:
# for compatibility with graphviz >= 2.40 replace fill attribute
# with the original colour name.
child.attrib['fill'] = colour_class
if child.get('fill') in colours:
matching_hexa = colours.get(child.get('fill'))
child.attrib['fill'] = '#' + matching_hexa
del child.attrib['stroke']
if misc.get_foreground_colour(matching_hexa) == 'white':
white_text = True
if child.get('font-family'):
del child.attrib['font-family']
if child.get('font-size'):
child.attrib['font-size'] = str(float(child.attrib['font-size']) * 0.8)
remove_attribute(child, 'style')
adjust_style(child, top, colours, white_text=white_text, colour_class=colour_class)
def graphviz_post_treatment(content, colours, include=False):
"""Remove all svg:title and top-level svg:polygon nodes, remove style
attributes and xlink:title attributes.
If a color style is set to a name matching class-\\w+, set the second
part on as class selector on the top level svg:g element.
"""
tree = ET.fromstring(content)
if not include:
style = ET.SubElement(tree, svg('style'))
style.attrib['type'] = 'text/css'
css_url = '%s%s%s' % (
get_publisher().get_root_url(),
get_publisher().qommon_static_dir,
get_publisher().qommon_admin_css,
)
style.text = '@import url(%s);' % css_url
for root in tree:
remove_tag(root, TITLE)
for child in root:
adjust_style(child, child, colours)
return force_str(ET.tostring(tree))
def graphviz(workflow, url_prefix='', select=None, svg=True, include=False):
out = io.StringIO()
# a list of colours known to graphviz, they will serve as key to get back
# to the colours defined in wcs, they are used as color attributes in
# graphviz (<= 2.38) then as class attribute on node elements for 2.40 and
# later.
graphviz_colours = [
'aliceblue',
'antiquewhite',
'aqua',
'aquamarine',
'azure',
'beige',
'bisque',
'black',
'blanchedalmond',
'blue',
'blueviolet',
'brown',
'burlywood',
'cadetblue',
'chartreuse',
'chocolate',
'coral',
'cornflowerblue',
'cornsilk',
'crimson',
'cyan',
'darkblue',
'darkcyan',
'darkgoldenrod',
'darkgray',
'darkgreen',
'darkgrey',
'darkkhaki',
'darkmagenta',
'darkolivegreen',
'darkorange',
'darkorchid',
'darkred',
'darksalmon',
'darkseagreen',
'darkslateblue',
'darkslategray',
'darkslategrey',
'darkturquoise',
'darkviolet',
'deeppink',
'deepskyblue',
'dimgray',
'dimgrey',
'dodgerblue',
'firebrick',
'floralwhite',
'forestgreen',
'fuchsia',
'gainsboro',
'ghostwhite',
'gold',
'goldenrod',
'gray',
'grey',
'green',
'greenyellow',
'honeydew',
'hotpink',
'indianred',
'indigo',
'ivory',
'khaki',
'lavender',
'lavenderblush',
'lawngreen',
'lemonchiffon',
'lightblue',
'lightcoral',
'lightcyan',
'lightgoldenrodyellow',
'lightgray',
'lightgrey',
'lightpink',
]
colours = {}
revert_colours = {}
print('digraph main {', file=out)
# print >>out, 'graph [ rankdir=LR ];'
print('node [shape=box,style=filled];', file=out)
print('edge [];', file=out)
for status in workflow.possible_status:
i = status.id
print('status%s' % i, end=' ', file=out)
print('[label="%s"' % status.name.replace('"', "'"), end=' ', file=out)
if select == str(i):
print(',id=current_status', file=out)
if status.colour:
if status.colour not in colours:
colours[status.colour] = graphviz_colours.pop()
revert_colours[colours[status.colour]] = status.colour
print(',color=%s' % colours[status.colour], file=out)
print(',class=%s' % colours[status.colour], file=out)
print(' URL="%sstatus/%s/"];' % (url_prefix, i), file=out)
for status in workflow.possible_status:
i = status.id
for item in status.items:
next_status_ids = [x.id for x in item.get_target_status() if x.id]
if not next_status_ids:
next_status_ids = [status.id]
done = {}
url = 'status/%s/items/%s/' % (i, item.id)
for next_id in next_status_ids:
if next_id in done:
# don't display multiple arrows for same action and target
# status
continue
print('status%s -> status%s' % (i, next_id), file=out)
done[next_id] = True
label = item.get_jump_label(target_id=next_id)
label = label.replace('"', '\\"')
label = textwrap.fill(label, 20, break_long_words=False)
label = label.replace('\n', '\\n')
print('[label="%s"' % label, end=' ', file=out)
print(',URL="%s%s"]' % (url_prefix, url), file=out)
print('}', file=out)
out = out.getvalue()
if svg:
try:
with Popen(['dot', '-Tsvg'], stdin=PIPE, stdout=PIPE) as process:
out = process.communicate(force_bytes(out))[0]
if process.returncode != 0:
return ''
except OSError:
return ''
out = graphviz_post_treatment(out, revert_colours, include=include)
if include:
# It seems webkit refuse to accept SVG when using its proper namespace,
# and xlink namespace prefix must be xlink: to be acceptable
out = out.replace('ns0:', '')
out = out.replace('xmlns:ns0', 'xmlns:svg')
out = out.replace('ns1:', 'xlink:')
out = out.replace(':ns1', ':xlink')
out = out.replace('<title>main</title>', '<title>%s</title>' % workflow.name)
return out
class WorkflowUI:
def __init__(self, workflow):
self.workflow = workflow
def form_new(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Workflow Name'), required=True, size=30)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
return form
def form_edit(self):
form = Form(enctype='multipart/form-data')
form.add_hidden('id', value=self.workflow.id)
form.add(
StringWidget, 'name', title=_('Workflow Name'), required=True, size=30, value=self.workflow.name
)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
return form
def submit_form(self, form):
if self.workflow:
workflow = self.workflow
else:
workflow = Workflow(name=form.get_widget('name').parse())
name = form.get_widget('name').parse()
workflows_name = [x.name for x in Workflow.select() if x.id != workflow.id]
if name in workflows_name:
form.get_widget('name').set_error(_('This name is already used'))
raise ValueError()
for f in ('name',):
setattr(workflow, f, form.get_widget(f).parse())
workflow.store()
return workflow
class WorkflowItemPage(Directory):
_q_exports = ['', 'delete']
def __init__(self, workflow, parent, component, html_top):
try:
self.item = [x for x in parent.items if x.id == component][0]
except (IndexError, ValueError):
raise errors.TraversalError()
self.workflow = workflow
self.parent = parent
self.html_top = html_top
get_response().breadcrumb.append(('items/%s/' % component, _(self.item.description)))
def _q_index(self):
request = get_request()
if request.get_method() == 'GET' and request.form.get('file'):
value = getattr(self.item, request.form.get('file'), None)
if value:
return value.build_response()
form = Form(enctype='multipart/form-data')
self.item.fill_admin_form(form)
if not self.workflow.is_readonly():
submit_label = _('Submit')
if hasattr(self.item, 'submit_button_label'):
submit_label = _(self.item.submit_button_label)
form.add_submit('submit', submit_label)
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.get_submit() == 'submit' and not form.has_errors():
self.item.submit_admin_form(form)
if not form.has_errors():
self.workflow.store(
comment=_('Change in action "%(description)s" in status "%(status)s"')
% {
'description': self.item.render_as_line(),
'status': self.parent.name,
}
)
if hasattr(self.item, 'redirect_after_submit_url'):
return redirect(self.item.redirect_after_submit_url)
return redirect('..')
self.html_top('%s - %s' % (_('Workflow'), self.workflow.name))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _(self.item.description)
r += form.render()
if self.item.support_substitution_variables:
r += get_publisher().substitutions.get_substitution_html_table()
return r.getvalue()
def delete(self):
form = Form(enctype='multipart/form-data')
form.widgets.append(HtmlWidget('<p>%s</p>' % _('You are about to remove an item.')))
form.add_submit('delete', _('Delete'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('../../')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('delete', _('Delete')))
self.html_top(title=_('Delete Item'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Deleting Item')
r += form.render()
return r.getvalue()
else:
del self.parent.items[self.parent.items.index(self.item)]
self.workflow.store(
comment=_('Deletion of action "%(description)s" in status "%(status)s"')
% {
'description': self.item.render_as_line(),
'status': self.parent.name,
}
)
return redirect('../../')
def _q_lookup(self, component):
t = self.item.q_admin_lookup(self.workflow, self.parent, component, self.html_top)
if t:
return t
return Directory._q_lookup(self, component)
class GlobalActionTriggerPage(Directory):
_q_exports = ['', 'delete']
def __init__(self, workflow, action, component, html_top):
try:
self.trigger = [x for x in action.triggers if x.id == component][0]
except (IndexError, ValueError):
raise errors.TraversalError()
self.workflow = workflow
self.action = action
self.status = action
self.html_top = html_top
get_response().breadcrumb.append(('triggers/%s/' % component, _('Trigger')))
def _q_index(self):
form = self.trigger.form(self.workflow)
form.add_submit('submit', _('Save'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.get_submit() == 'submit' and not form.has_errors():
self.trigger.submit_admin_form(form)
if not form.has_errors():
self.workflow.store(comment=_('Change in global action trigger'))
return redirect('../../')
self.html_top('%s - %s' % (_('Workflow'), self.workflow.name))
r = TemplateIO(html=True)
r += htmltext('<h2>%s - %s</h2>') % (self.workflow.name, self.action.name)
r += form.render()
return r.getvalue()
def delete(self):
form = Form(enctype='multipart/form-data')
form.widgets.append(HtmlWidget('<p>%s</p>' % _('You are about to remove a trigger.')))
form.add_submit('delete', _('Delete'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('../../')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('delete', _('Delete')))
self.html_top(title=_('Delete Trigger'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Deleting Trigger')
r += form.render()
return r.getvalue()
else:
del self.action.triggers[self.action.triggers.index(self.trigger)]
self.workflow.store(comment=_('Deletion of global action trigger'))
return redirect('../../')
class ToChildDirectory(Directory):
_q_exports = ['']
klass = None
def __init__(self, workflow, status, html_top):
self.workflow = workflow
self.status = status
self.html_top = html_top
def _q_lookup(self, component):
return self.klass(self.workflow, self.status, component, self.html_top)
def _q_index(self):
return redirect('..')
class WorkflowItemsDir(ToChildDirectory):
klass = WorkflowItemPage
class GlobalActionTriggersDir(ToChildDirectory):
klass = GlobalActionTriggerPage
class GlobalActionItemsDir(ToChildDirectory):
klass = WorkflowItemPage
class WorkflowStatusPage(Directory):
_q_exports = [
'',
'delete',
'newitem',
('items', 'items_dir'),
'update_order',
'edit',
'reassign',
'endpoint',
'display',
('backoffice-info-text', 'backoffice_info_text'),
('schema.svg', 'svg'),
'svg',
]
do_not_call_in_templates = True
def __init__(self, workflow, status_id, html_top):
self.html_top = html_top
self.workflow = workflow
try:
self.status = [x for x in self.workflow.possible_status if x.id == status_id][0]
except IndexError:
raise errors.TraversalError()
self.items_dir = WorkflowItemsDir(workflow, self.status, html_top)
get_response().breadcrumb.append(('status/%s/' % status_id, self.status.name))
def _q_index(self):
self.html_top('%s - %s' % (_('Workflow'), self.workflow.name))
get_response().add_javascript(
[
'jquery.js',
'jquery-ui.js',
'biglist.js',
'svg-pan-zoom.js',
'qommon.wysiwyg.js',
'popup.js',
'jquery.colourpicker.js',
]
)
return template.QommonTemplateResponse(
templates=['wcs/backoffice/workflow-status.html'],
context={'view': self, 'workflow': self.workflow, 'status': self.status, 'has_sidebar': True},
is_django_native=True,
)
def get_source_statuses(self):
statuses = []
for status in self.workflow.possible_status:
if status is self.status:
continue
for item in status.items:
if self.status in item.get_target_status():
statuses.append(status)
break
return statuses
def graphviz(self):
return graphviz(self.workflow, url_prefix='../../', include=True, select='%s' % self.status.id)
def svg(self):
response = get_response()
response.set_content_type('image/svg+xml')
root_url = get_publisher().get_application_static_files_root_url()
css = root_url + get_publisher().qommon_static_dir + get_publisher().qommon_admin_css
return graphviz(
self.workflow, url_prefix='../../', include=False, select='%s' % self.status.id
).replace('?>', '?>\n<?xml-stylesheet href="%s" type="text/css"?>\n' % css)
def is_item_available(self, item):
return item.is_available(workflow=self.workflow)
def get_new_item_form(self):
form = Form(enctype='multipart/form-data', action='newitem', id='new-action-form')
categories = [
('status-change', _('Change Status')),
('interaction', _('Interact')),
('formdata-action', _('Act on a Form/Card')),
('user-action', _('Act on User')),
]
available_items = [x for x in item_classes if self.is_item_available(x)]
available_items.sort(key=lambda x: misc.simplify(_(x.description)))
for category, category_label in categories:
options = [(x.key, _(x.description)) for x in available_items if x.category == category]
form.add(
SingleSelectWidget,
'action-%s' % category,
title=category_label,
required=False,
options=[(None, '')] + options,
)
form.add_submit('submit', _('Add'))
return form
def update_order(self):
request = get_request()
new_order = request.form['order'].strip(';').split(';')
self.status.items = [[x for x in self.status.items if x.id == y][0] for y in new_order]
self.workflow.store(comment=_('Change in action order'))
return 'ok'
def newitem(self):
form = self.get_new_item_form()
if not form.is_submitted() or form.has_errors():
get_session().message = ('error', _('Submitted form was not filled properly.'))
return redirect('.')
for category in ('status-change', 'interaction', 'formdata-action', 'user-action'):
action_type = form.get_widget('action-%s' % category).parse()
if action_type:
self.status.append_item(action_type)
self.workflow.store(
comment=_('New action "%(description)s" in status "%(status)s"')
% {
'description': self.status.items[-1].description,
'status': self.status.name,
}
)
return redirect('.')
get_session().message = ('error', _('Submitted form was not filled properly.'))
return redirect('.')
def delete(self):
form = Form(enctype="multipart/form-data")
form.widgets.append(HtmlWidget('<p>%s</p>' % _("You are about to remove a status.")))
form.add_submit('delete', _('Delete'))
form.add_submit("cancel", _("Cancel"))
if form.get_widget('cancel').parse():
return redirect('.')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('delete', _('Delete')))
self.html_top(title=_('Delete Status'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s %s</h2>') % (_('Deleting Status:'), self.status.name)
r += form.render()
return r.getvalue()
else:
# Before removing the status, scan formdata to know if it's in use.
for formdef in FormDef.select():
if formdef.workflow_id != self.workflow.id:
continue
if any(formdef.data_class().get_with_indexed_value(str('status'), 'wf-%s' % self.status.id)):
return redirect('reassign')
del self.workflow.possible_status[self.workflow.possible_status.index(self.status)]
self.workflow.store(comment=_('Deletion of status %s') % self.status.name)
return redirect('../../')
def edit(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Status Name'), required=True, size=30, value=self.status.name)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
new_name = str(form.get_widget('name').parse())
if [x for x in self.workflow.possible_status if x.name == new_name]:
form.get_widget('name').set_error(_('There is already a status with that name.'))
else:
self.status.name = new_name
self.workflow.store(comment=_('Change name of status %s') % new_name)
return redirect('.')
self.html_top(title=_('Edit Workflow Status'))
get_response().breadcrumb.append(('edit', _('Edit')))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Edit Workflow Status')
r += form.render()
return r.getvalue()
def reassign(self):
options = [(None, _('Do nothing')), ('remove', _('Remove these forms'))]
for status in self.workflow.get_waitpoint_status():
if status.id == self.status.id:
continue
options.append(('reassign-%s' % status.id, _('Change these forms status to "%s"') % status.name))
form = Form(enctype='multipart/form-data')
form.add(SingleSelectWidget, 'action', title=_('Pick an Action'), options=options)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_submit() == 'cancel':
return redirect('.')
if form.is_submitted() and not form.get_widget('action').parse():
return redirect('.')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('reassign', _('Delete / Reassign')))
self.html_top(title=_('Delete Status / Reassign'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s %s</h2>') % (_('Deleting Status:'), self.status.name)
r += htmltext('<p>')
r += _(
'''There are forms set to this status, they need to be changed before
this status can be deleted.'''
)
r += htmltext('</p>')
r += htmltext('<ul>')
for formdef in FormDef.select():
if formdef.workflow_id != self.workflow.id:
continue
items = list(
formdef.data_class().get_with_indexed_value(str('status'), 'wf-%s' % self.status.id)
)
r += htmltext('<li>%s: %s</li>') % (formdef.name, _('%s items') % len(items))
r += htmltext('</ul>')
r += form.render()
return r.getvalue()
else:
self.submit_reassign(form)
del self.workflow.possible_status[self.workflow.possible_status.index(self.status)]
self.workflow.store(comment=_('Removal of status %s') % self.status.name)
return redirect('../..')
def submit_reassign(self, form):
nb_forms = 0
action = form.get_widget('action').parse()
if action.startswith(str('reassign-')):
new_status = 'wf-%s' % str(action)[9:]
for formdef in FormDef.select():
if formdef.workflow_id != self.workflow.id:
continue
for item in formdef.data_class().get_with_indexed_value(str('status'), 'wf-%s' % self.status.id):
nb_forms += 1
if action == 'remove':
item.remove_self()
else:
item.status = new_status
evo = Evolution()
evo.time = time.localtime()
evo.status = new_status
evo.comment = _('Administrator reassigned status')
if not item.evolution:
item.evolution = []
item.evolution.append(evo)
item.store()
# delete all (old) status references in evolutions
for item in formdef.data_class().select():
if item.evolution:
modified = False
for evo in item.evolution:
if evo.status == self.status:
evo.status = None
modified = True
if modified:
item.store()
if action == 'remove':
get_logger().info(
'admin - delete status "%s" in workflow "%s": %d forms deleted'
% (self.status.name, self.workflow.name, nb_forms)
)
def display(self):
form = Form(enctype='multipart/form-data')
form.add(
CheckboxWidget,
'hide_status_from_user',
title=_('Hide status from user'),
value=bool(self.status.get_visibility_restricted_roles()),
)
form.add(ColourWidget, 'colour', title=_('Colour in backoffice'), value=self.status.colour)
form.add(
StringWidget,
'extra_css_class',
title=_('Extra CSS for frontoffice style'),
value=self.status.extra_css_class,
)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
hide_status = form.get_widget('hide_status_from_user').parse()
if hide_status:
self.status.visibility = list(self.workflow.roles.keys())
else:
self.status.visibility = None
self.status.colour = form.get_widget('colour').parse() or 'ffffff'
self.status.extra_css_class = form.get_widget('extra_css_class').parse()
self.workflow.store(comment=_('Change in display options'))
return redirect('.')
self.html_top(title=_('Change Display Settings'))
get_response().breadcrumb.append(('display', _('Display Settings')))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Change Display Settings')
r += form.render()
return r.getvalue()
def endpoint(self):
form = Form(enctype='multipart/form-data')
form.add(
CheckboxWidget,
'force_terminal_status',
title=_('Force Terminal Status'),
value=(self.status.forced_endpoint is True),
)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
self.status.forced_endpoint = form.get_widget('force_terminal_status').parse()
self.workflow.store(comment=_('Change of terminal status option'))
return redirect('.')
self.html_top(title=_('Edit Terminal Status'))
get_response().breadcrumb.append(('endpoint', _('Terminal Status')))
return form.render()
def backoffice_info_text(self):
form = Form(enctype='multipart/form-data')
form.add(
WysiwygTextWidget,
'backoffice_info_text',
title=_('Information text for backoffice'),
value=self.status.backoffice_info_text,
)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
self.status.backoffice_info_text = form.get_widget('backoffice_info_text').parse()
self.workflow.store(comment=_('Change in backoffice info text'))
return redirect('.')
self.html_top(title=_('Edit Backoffice Information Text'))
get_response().breadcrumb.append(('backoffice_info_text', _('Backoffice Information Text')))
return form.render()
class WorkflowStatusDirectory(Directory):
_q_exports = ['']
def __init__(self, workflow, html_top):
self.workflow = workflow
self.html_top = html_top
def _q_lookup(self, component):
return WorkflowStatusPage(self.workflow, component, self.html_top)
def _q_index(self):
return redirect('..')
class WorkflowVariableWidget(CompositeWidget):
def __init__(self, name, value=None, workflow=None, **kwargs):
CompositeWidget.__init__(self, name, **kwargs)
if value and '*' in value:
varname = None
else:
varname = value
self.add(VarnameWidget, 'name', render_br=False, value=varname)
options = []
if workflow:
excluded_parameters = ['backoffice_info_text']
for status in workflow.possible_status:
for item in status.items:
prefix = '%s*%s*' % (status.id, item.id)
parameters = [
x
for x in item.get_parameters()
if not getattr(item, x) and x not in excluded_parameters
]
label = getattr(item, 'label', None) or _(item.description)
for parameter in parameters:
key = prefix + parameter
fake_form = Form()
item.add_parameters_widgets(fake_form, [parameter], orig='variable_widget')
if not fake_form.widgets:
continue
parameter_label = fake_form.widgets[0].title
option_value = '%s / %s / %s' % (status.name, label, parameter_label)
options.append((key, option_value, key))
if not options:
return
options = [('', '---', '')] + options
self.widgets.append(
HtmlWidget(_('or you can use this field to directly replace a workflow parameter:'))
)
self.add(
SingleSelectWidget,
'select',
options=options,
value=value,
hint=_('This takes priority over a variable name'),
)
def _parse(self, request):
super()._parse(request)
if self.get('select'):
self.value = self.get('select')
elif self.get('name'):
self.value = self.get('name')
class WorkflowVariablesFieldDefPage(FieldDefPage):
section = 'workflows'
blacklisted_attributes = ['condition']
def form(self):
form = super().form()
form.remove('varname')
form.add(
WorkflowVariableWidget,
'varname',
title=_('Variable'),
value=self.field.varname,
advanced=False,
required=True,
workflow=self.objectdef.workflow,
)
display_locations = form.get_widget('display_locations')
if display_locations:
del display_locations.widgets[0] # validation page
return form
class WorkflowBackofficeFieldDefPage(FieldDefPage):
section = 'workflows'
blacklisted_attributes = ['condition']
def form(self):
form = super().form()
form.remove('prefill')
display_locations = form.get_widget('display_locations')
if display_locations:
del display_locations.widgets[0] # validation page
return form
class WorkflowVariablesFieldsDirectory(FieldsDirectory):
_q_exports = ['', 'update_order', 'new']
section = 'workflows'
field_def_page_class = WorkflowVariablesFieldDefPage
support_import = False
blacklisted_types = ['page', 'blocks', 'computed']
field_var_prefix = 'form_option_'
readonly_message = N_('This workflow is readonly.')
def index_top(self):
r = TemplateIO(html=True)
r += htmltext('<h2>%s - %s - %s</h2>') % (_('Workflow'), self.objectdef.name, _('Variables'))
r += get_session().display_message()
if not self.objectdef.fields:
r += htmltext('<p>%s</p>') % _('There are not yet any variables.')
return r.getvalue()
def index_bottom(self):
pass
class WorkflowBackofficeFieldsDirectory(FieldsDirectory):
_q_exports = ['', 'update_order', 'new']
section = 'workflows'
field_def_page_class = WorkflowBackofficeFieldDefPage
support_import = False
blacklisted_types = ['page', 'blocks', 'computed']
blacklisted_attributes = ['condition']
field_var_prefix = 'form_var_'
readonly_message = N_('This workflow is readonly.')
def index_top(self):
r = TemplateIO(html=True)
r += htmltext('<h2>%s - %s - %s</h2>') % (_('Workflow'), self.objectdef.name, _('Backoffice Fields'))
r += get_session().display_message()
if not self.objectdef.fields:
r += htmltext('<p>%s</p>') % _('There are not yet any backoffice fields.')
return r.getvalue()
def index_bottom(self):
pass
class VariablesDirectory(Directory):
_q_exports = ['', 'fields']
def __init__(self, workflow):
self.workflow = workflow
def _q_index(self):
return redirect('fields/')
def _q_traverse(self, path):
get_response().breadcrumb.append(('variables/', _('Variables')))
self.fields = WorkflowVariablesFieldsDirectory(WorkflowVariablesFieldsFormDef(self.workflow))
return Directory._q_traverse(self, path)
class BackofficeFieldsDirectory(Directory):
_q_exports = ['', 'fields']
def __init__(self, workflow):
self.workflow = workflow
def _q_index(self):
return redirect('fields/')
def _q_traverse(self, path):
get_response().breadcrumb.append(('backoffice-fields/', _('Backoffice Fields')))
self.fields = WorkflowBackofficeFieldsDirectory(WorkflowBackofficeFieldsFormDef(self.workflow))
return Directory._q_traverse(self, path)
class FunctionsDirectory(Directory):
_q_exports = ['', 'new']
def __init__(self, workflow):
self.workflow = workflow
def _q_traverse(self, path):
get_response().breadcrumb.append(('functions/', _('Functions')))
return Directory._q_traverse(self, path)
def new(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50)
form.add_submit('submit', _('Add'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.is_submitted() and not form.has_errors():
name = form.get_widget('name').parse()
base_slug = slug = '_%s' % misc.simplify(name)
base_idx = 2
while slug in self.workflow.roles:
slug = '%s-%s' % (base_slug, base_idx)
base_idx += 1
self.workflow.roles[slug] = name
# go over all existing status and update their visibility
# restrictions if necessary
for status in self.workflow.possible_status:
if status.get_visibility_restricted_roles():
status.visibility = list(self.workflow.roles.keys())
self.workflow.store(comment=_('New function "%s"') % name)
return redirect('..')
get_response().breadcrumb.append(('new', _('New Function')))
html_top('workflows', title=_('New Function'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('New Function')
r += form.render()
return r.getvalue()
def _q_lookup(self, component):
function = self.workflow.roles.get('_' + component)
if not function:
raise errors.TraversalError()
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50, value=function)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if component != 'receiver':
# do not allow removing the standard "receiver" function.
# TODO: do not display "delete" for functions that are currently in
# use.
form.add_submit('delete', _('Delete'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.get_submit() == 'delete':
slug = '_%s' % component
name = self.workflow.roles[slug]
del self.workflow.roles[slug]
self.workflow.store(comment=_('Deletion of function "%s"') % name)
return redirect('..')
if form.is_submitted() and not form.has_errors():
name = form.get_widget('name').parse()
slug = '_%s' % component
self.workflow.roles[slug] = name
self.workflow.store(comment=_('Rename of function "%s"') % name)
return redirect('..')
get_response().breadcrumb.append(('new', _('Edit Function')))
html_top('workflows', title=_('Edit Function'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Edit Function')
r += form.render()
return r.getvalue()
def _q_index(self):
return redirect('..')
class CriticalityLevelsDirectory(Directory):
_q_exports = ['', 'new']
def __init__(self, workflow):
self.workflow = workflow
def _q_traverse(self, path):
get_response().breadcrumb.append(('criticality-levels/', _('Criticality Levels')))
return Directory._q_traverse(self, path)
def new(self):
currentlevels = self.workflow.criticality_levels or []
default_colours = ['FFFFFF', 'FFFF00', 'FF9900', 'FF6600', 'FF0000']
try:
default_colour = default_colours[len(currentlevels)]
except IndexError:
default_colour = '000000'
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50)
form.add(ColourWidget, 'colour', title=_('Colour'), required=False, value=default_colour)
form.add_submit('submit', _('Add'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.is_submitted() and not form.has_errors():
if not self.workflow.criticality_levels:
self.workflow.criticality_levels = []
level = WorkflowCriticalityLevel()
level.name = form.get_widget('name').parse()
level.colour = form.get_widget('colour').parse()
self.workflow.criticality_levels.append(level)
self.workflow.store(comment=_('New criticality level'))
return redirect('..')
get_response().breadcrumb.append(('new', _('New Criticality Level')))
html_top('workflows', title=_('New Criticality level'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('New Criticality Level')
r += form.render()
return r.getvalue()
def _q_lookup(self, component):
for level in self.workflow.criticality_levels or []:
if level.id == component:
break
else:
raise errors.TraversalError()
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50, value=level.name)
form.add(ColourWidget, 'colour', title=_('Colour'), required=False, value=level.colour)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
form.add_submit('delete-level', _('Delete'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.get_submit() == 'delete-level':
self.workflow.criticality_levels.remove(level)
self.workflow.store(comment=_('Deletion of criticality level'))
return redirect('..')
if form.is_submitted() and not form.has_errors():
level.name = form.get_widget('name').parse()
level.colour = form.get_widget('colour').parse()
self.workflow.store(comment=_('Change of name of criticality level'))
return redirect('..')
get_response().breadcrumb.append(('new', _('Edit Criticality Level')))
html_top('workflows', title=_('Edit Criticality Level'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Edit Criticality Level')
r += form.render()
return r.getvalue()
def _q_index(self):
return redirect('..')
class GlobalActionPage(WorkflowStatusPage):
_q_exports = [
'',
'new',
'delete',
'newitem',
('items', 'items_dir'),
'update_order',
'edit',
'newtrigger',
('triggers', 'triggers_dir'),
'update_triggers_order',
('backoffice-info-text', 'backoffice_info_text'),
]
def __init__(self, workflow, action_id, html_top):
self.html_top = html_top
self.workflow = workflow
try:
self.action = [x for x in self.workflow.global_actions if x.id == action_id][0]
except IndexError:
raise errors.TraversalError()
self.status = self.action
self.items_dir = GlobalActionItemsDir(workflow, self.action, html_top)
self.triggers_dir = GlobalActionTriggersDir(workflow, self.action, html_top)
def _q_traverse(self, path):
get_response().breadcrumb.append(
('global-actions/%s/' % self.action.id, _('Global Action: %s') % self.action.name)
)
return Directory._q_traverse(self, path)
def is_item_available(self, item):
return item.is_available(self.workflow) and item.ok_in_global_action
def _q_index(self):
self.html_top('%s - %s' % (_('Workflow'), self.workflow.name))
r = TemplateIO(html=True)
get_response().add_javascript(['jquery.js', 'jquery-ui.js', 'biglist.js', 'qommon.wysiwyg.js'])
r += htmltext('<h2>%s</h2>') % self.action.name
r += get_session().display_message()
r += htmltext('<div class="bo-block">')
r += htmltext('<h2>%s</h2>') % _('Actions')
if not self.action.items:
r += htmltext('<p>%s</p>') % _('There are not yet any items in this action.')
else:
if self.workflow.is_readonly():
r += htmltext('<ul id="items-list" class="biglist">')
else:
r += htmltext('<p class="items">')
r += _('Use drag and drop with the handles to reorder items.')
r += htmltext('</p>')
r += htmltext('<ul id="items-list" class="biglist sortable">')
for item in self.action.items:
r += htmltext('<li class="biglistitem" id="itemId_%s">') % item.id
if self.workflow.is_readonly():
r += item.render_as_line()
else:
if hasattr(item, 'fill_admin_form'):
r += htmltext('<a href="items/%s/">%s</a>') % (item.id, item.render_as_line())
else:
r += item.render_as_line()
r += htmltext('<p class="commands">')
if hasattr(item, 'fill_admin_form'):
r += command_icon('items/%s/' % item.id, 'edit')
r += command_icon('items/%s/delete' % item.id, 'remove', popup=True)
r += htmltext('</p>')
r += htmltext('</li>')
r += htmltext('</ul>')
r += htmltext('</div>') # bo-block
r += htmltext('<div class="bo-block">')
r += htmltext('<h2>%s</h2>') % _('Triggers')
r += htmltext(
'<ul id="items-list" class="biglist sortable" data-order-function="update_triggers_order">'
)
for trigger in self.action.triggers:
r += htmltext('<li class="biglistitem" id="trigId_%s">') % trigger.id
r += htmltext('<a rel="popup" href="triggers/%s/">%s</a>') % (
trigger.id,
trigger.render_as_line(),
)
r += htmltext('<p class="commands">')
r += command_icon('triggers/%s/' % trigger.id, 'edit', popup=True)
r += command_icon('triggers/%s/delete' % trigger.id, 'remove', popup=True)
r += htmltext('</li>')
r += htmltext('</ul>')
r += htmltext('</div>') # bo-block
r += htmltext('<p><a href="../../">%s</a></p>') % _('Back to workflow main page')
get_response().filter['sidebar'] = self.get_sidebar()
return r.getvalue()
def delete(self):
form = Form(enctype='multipart/form-data')
form.widgets.append(HtmlWidget('<p>%s</p>' % _('You are about to remove an action.')))
form.add_submit('delete', _('Delete'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('delete', _('Delete')))
self.html_top(title=_('Delete Action'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s %s</h2>') % (_('Deleting Action:'), self.action.name)
r += form.render()
return r.getvalue()
del self.workflow.global_actions[self.workflow.global_actions.index(self.action)]
self.workflow.store(comment=_('Deletion of global action'))
return redirect('../../')
def edit(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Action Name'), required=True, size=30, value=self.action.name)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
new_name = str(form.get_widget('name').parse())
if [x for x in self.workflow.global_actions if x.name == new_name]:
form.get_widget('name').set_error(_('There is already an action with that name.'))
else:
self.action.name = new_name
self.workflow.store(comment=_('Change in global action'))
return redirect('.')
self.html_top(title=_('Edit Action Name'))
get_response().breadcrumb.append(('edit', _('Edit')))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Edit Action Name')
r += form.render()
return r.getvalue()
def get_sidebar(self):
get_response().add_javascript(['popup.js', 'jquery.colourpicker.js'])
r = TemplateIO(html=True)
if self.workflow.is_default():
r += htmltext('<p>')
r += _(
'''This is the default workflow, you cannot edit it but you can
duplicate it to base your own workflow on it.'''
)
r += htmltext('</p>')
else:
r += htmltext('<ul id="sidebar-actions">')
r += htmltext('<li><a href="edit" rel="popup">%s</a></li>') % _('Change Action Name')
r += htmltext('<li><a href="backoffice-info-text" rel="popup">%s</a></li>') % _(
'Change Backoffice Information Text'
)
r += htmltext('<li><a href="delete" rel="popup">%s</a></li>') % _('Delete')
r += htmltext('</ul>')
r += htmltext('<div id="new-field">')
r += htmltext('<h3>%s</h3>') % _('New Item')
r += self.get_new_item_form().render()
r += htmltext('</div>')
r += htmltext('<div id="new-trigger">')
r += htmltext('<h3>%s</h3>') % _('New Trigger')
r += self.get_new_trigger_form().render()
r += htmltext('</div>')
return r.getvalue()
def update_triggers_order(self):
request = get_request()
new_order = request.form['order'].strip(';').split(';')
self.action.triggers = [[x for x in self.action.triggers if x.id == y][0] for y in new_order]
self.workflow.store(comment=_('Change in trigger order'))
return 'ok'
def newtrigger(self):
form = self.get_new_trigger_form()
if not form.is_submitted() or form.has_errors():
get_session().message = ('error', _('Submitted form was not filled properly.'))
return redirect('.')
if form.get_widget('type').parse():
self.action.append_trigger(form.get_widget('type').parse())
else:
get_session().message = ('error', _('Submitted form was not filled properly.'))
return redirect('.')
self.workflow.store(comment=_('New global action trigger'))
return redirect('.')
def get_new_trigger_form(self):
form = Form(enctype='multipart/form-data', action='newtrigger')
available_triggers = [
('timeout', _('Automatic')),
('manual', _('Manual')),
('webservice', _('External call')),
]
form.add(SingleSelectWidget, 'type', title=_('Type'), required=True, options=available_triggers)
form.add_submit('submit', _('Add'))
return form
class GlobalActionsDirectory(Directory):
_q_exports = ['', 'new']
def __init__(self, workflow, html_top):
self.workflow = workflow
self.html_top = html_top
def _q_lookup(self, component):
return GlobalActionPage(self.workflow, component, self.html_top)
def _q_index(self):
return redirect('..')
def new(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50)
form.add_submit('submit', _('Add'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.is_submitted() and not form.has_errors():
name = form.get_widget('name').parse()
try:
action = self.workflow.add_global_action(name)
except DuplicateGlobalActionNameError:
form.get_widget('name').set_error(_('There is already an action with that name.'))
else:
self.workflow.store(comment=_('New global action'))
return redirect('%s/' % action.id)
get_response().breadcrumb.append(('new', _('New Global Action')))
html_top('workflows', title=_('New Global Action'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('New Global Action')
r += form.render()
return r.getvalue()
class WorkflowPage(Directory):
_q_exports = [
'',
'edit',
'delete',
'newstatus',
('status', 'status_dir'),
'update_order',
'duplicate',
'export',
'svg',
('variables', 'variables_dir'),
'inspect',
('schema.svg', 'svg'),
('backoffice-fields', 'backoffice_fields_dir'),
'update_actions_order',
'update_criticality_levels_order',
('functions', 'functions_dir'),
('global-actions', 'global_actions_dir'),
('criticality-levels', 'criticality_levels_dir'),
('logged-errors', 'logged_errors_dir'),
('history', 'snapshots_dir'),
]
do_not_call_in_templates = True
def __init__(self, component, instance=None):
if instance:
self.workflow = instance
elif component == '_carddef_default':
self.workflow = CardDef.get_default_workflow()
else:
try:
self.workflow = Workflow.get(component)
except KeyError:
raise errors.TraversalError()
self.workflow_ui = WorkflowUI(self.workflow)
self.status_dir = WorkflowStatusDirectory(self.workflow, self.html_top)
self.variables_dir = VariablesDirectory(self.workflow)
self.backoffice_fields_dir = BackofficeFieldsDirectory(self.workflow)
self.functions_dir = FunctionsDirectory(self.workflow)
self.global_actions_dir = GlobalActionsDirectory(self.workflow, self.html_top)
self.criticality_levels_dir = CriticalityLevelsDirectory(self.workflow)
self.logged_errors_dir = LoggedErrorsDirectory(parent_dir=self, workflow_id=self.workflow.id)
self.snapshots_dir = SnapshotsDirectory(self.workflow)
get_response().breadcrumb.append((component + '/', self.workflow.name))
def html_top(self, title):
return html_top('workflows', title)
def last_modification_block(self):
return utils.last_modification_block(obj=self.workflow)
def graphviz(self):
return graphviz(self.workflow, include=True)
def get_workflow_roles(self):
workflow_roles = list((self.workflow.roles or {}).items())
workflow_roles.sort(key=lambda x: '' if x[0] == '_receiver' else misc.simplify(x[1]))
return workflow_roles
def _q_index(self):
self.html_top(title=_('Workflow - %s') % self.workflow.name)
get_response().filter['sidebar'] = self.get_sidebar()
get_response().add_javascript(
['jquery.js', 'jquery-ui.js', 'biglist.js', 'svg-pan-zoom.js', 'jquery.colourpicker.js']
)
return template.QommonTemplateResponse(
templates=['wcs/backoffice/workflow.html'],
context={'view': self, 'workflow': self.workflow},
)
def get_sidebar(self):
r = TemplateIO(html=True)
if self.workflow.is_default():
r += htmltext('<p>')
r += _(
'''This is the default workflow, you cannot edit it but you can
duplicate it to base your own workflow on it.'''
)
r += htmltext('</p>')
elif self.workflow.is_readonly():
r += htmltext('<div class="infonotice"><p>%s</p></div>') % _('This workflow is readonly.')
r += utils.snapshot_info_block(snapshot=self.workflow.snapshot_object)
return r.getvalue()
r += htmltext('<ul id="sidebar-actions">')
if not self.workflow.is_readonly():
r += htmltext('<li><a href="delete" rel="popup">%s</a></li>') % _('Delete')
r += htmltext('<li><a href="duplicate">%s</a></li>') % _('Duplicate')
r += htmltext('<li><a href="export">%s</a></li>') % _('Export')
if get_publisher().snapshot_class:
r += htmltext('<li><a rel="popup" href="history/save">%s</a></li>') % _('Save snapshot')
r += htmltext('<li><a href="history/">%s</a></li>') % _('History')
r += htmltext('</ul>')
if not self.workflow.is_readonly():
r += self.get_new_status_form()
r += LoggedErrorsDirectory.errors_block(workflow_id=self.workflow.id)
return r.getvalue()
def expand_workflow_formdef(self, formdef):
r = TemplateIO(html=True)
r += htmltext('<ul>')
for field in formdef.fields:
r += htmltext('<li>')
r += field.label
if getattr(field, 'required', False):
r += htmltext(' (%s)') % _('required')
r += htmltext(' (%s)') % field.get_type_label()
if field.varname:
r += htmltext(' (<tt>%s</tt>)') % field.varname
r += htmltext('</li>')
r += htmltext('</ul>')
return r.getvalue()
def inspect(self):
self.html_top('%s - %s' % (_('Workflow'), self.workflow.name))
r = TemplateIO(html=True)
get_response().breadcrumb.append(('inspect', _('Inspector')))
r += htmltext('<h2>%s</h2>') % _('Workflow Functions')
r += htmltext('<ul>')
for label in (self.workflow.roles or {}).values():
r += htmltext('<li>%s</li>') % label
r += htmltext('</ul>')
if self.workflow.variables_formdef:
r += htmltext('<h2>%s</h2>') % _('Workflow Variables')
r += self.expand_workflow_formdef(self.workflow.variables_formdef)
if self.workflow.backoffice_fields_formdef:
r += htmltext('<h2>%s</h2>') % _('Backoffice Fields')
r += self.expand_workflow_formdef(self.workflow.backoffice_fields_formdef)
if self.workflow.criticality_levels:
r += htmltext('<h2>%s</h2>') % _('Criticality Levels')
r += htmltext('<ul>')
for level in self.workflow.criticality_levels or []:
r += htmltext('<li>%s</li>') % level.name
r += htmltext('</ul>')
r += htmltext('<h2>%s</h2>') % _('Statuses')
r += htmltext('<div class="expanded-statuses">')
for status in self.workflow.possible_status or []:
r += htmltext(
'<div class="status" style="border-left-color: #%s;">'
% (getattr(status, 'colour', None) or 'fff')
)
r += htmltext('<h3 id="status-%s"><a href="status/%s/">%s</a></h3>') % (
status.id,
status.id,
status.name,
)
if status.backoffice_info_text:
r += htmltext('<div>')
r += htmltext(status.backoffice_info_text)
r += htmltext('</div>')
if not status.items:
r += htmltext('<p>%s</p>') % _('No actions in this status.')
for item in status.items or []:
r += htmltext('<h4>%s</h4>') % _(item.description)
r += item.get_parameters_view()
r += htmltext('</div>')
r += htmltext('</div>')
if self.workflow.global_actions:
r += htmltext('<h2>%s</h2>') % _('Global Actions')
r += htmltext('<div class="expanded-statuses">')
for action in self.workflow.global_actions:
r += htmltext('<div class="status">')
r += htmltext('<h3><a href="global-actions/%s/">%s</a></h3>') % (action.id, action.name)
r += htmltext('<ul>')
for trigger in action.triggers:
r += htmltext('<li>%s</li>') % trigger.render_as_line()
r += htmltext('</ul>')
r += htmltext('</h3>')
for item in action.items or []:
r += htmltext('<h4>%s</h4>') % _(item.description)
r += item.get_parameters_view()
r += htmltext('</div>')
r += htmltext('</div>')
return r.getvalue()
def svg(self):
response = get_response()
response.set_content_type('image/svg+xml')
root_url = get_publisher().get_application_static_files_root_url()
css = root_url + get_publisher().qommon_static_dir + get_publisher().qommon_admin_css
return graphviz(self.workflow, include=False).replace(
'?>', '?>\n<?xml-stylesheet href="%s" type="text/css"?>\n' % css
)
def export(self):
return misc.xml_response(
self.workflow,
filename='workflow-%s.wcs' % misc.simplify(self.workflow.name),
content_type='application/x-wcs-workflow',
)
def get_new_status_form(self):
r = TemplateIO(html=True)
r += htmltext('<div id="new-field">')
r += htmltext('<h3>%s</h3>') % _('New Status')
form = Form(enctype='multipart/form-data', action='newstatus')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50)
form.add_submit('submit', _('Add'))
r += form.render()
r += htmltext('</div>')
return r.getvalue()
def update_order(self):
request = get_request()
new_order = request.form['order'].strip(';').split(';')
self.workflow.possible_status = [
[x for x in self.workflow.possible_status if x.id == y][0] for y in new_order
]
self.workflow.store(comment=_('Change in status order'))
return 'ok'
def update_actions_order(self):
request = get_request()
new_order = request.form['order'].strip(';').split(';')
self.workflow.global_actions = [
[x for x in self.workflow.global_actions if x.id == y][0] for y in new_order
]
self.workflow.store(comment=_('Change in global actions order'))
return 'ok'
def update_criticality_levels_order(self):
request = get_request()
new_order = request.form['order'].strip(';').split(';')
self.workflow.criticality_levels = [
[x for x in self.workflow.criticality_levels if x.id == y][0] for y in new_order
]
self.workflow.store(comment=_('Change in criticality levels order'))
return 'ok'
def newstatus(self):
form = Form(enctype='multipart/form-data', action='newstatus')
form.add(StringWidget, 'name', title=_('Name'), size=50)
if not form.is_submitted() or form.has_errors():
get_session().message = ('error', _('Submitted form was not filled properly.'))
return redirect('.')
if form.get_widget('name').parse():
try:
self.workflow.add_status(form.get_widget('name').parse())
except DuplicateStatusNameError:
get_session().message = ('error', _('There is already a status with that name.'))
return redirect('.')
else:
get_session().message = ('error', _('Submitted form was not filled properly.'))
return redirect('.')
self.workflow.store(comment=_('New status "%s"') % form.get_widget('name').parse())
return redirect('.')
def edit(self, duplicate=False):
form = self.workflow_ui.form_edit()
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
try:
self.workflow_ui.submit_form(form)
except ValueError:
pass
else:
return redirect('.')
self.html_top(title=_('Edit Workflow'))
r = TemplateIO(html=True)
if duplicate:
get_response().breadcrumb.append(('edit', _('Duplicate')))
r += htmltext('<h2>%s</h2>') % _('Duplicate Workflow')
else:
get_response().breadcrumb.append(('edit', _('Edit')))
r += htmltext('<h2>%s</h2>') % _('Edit Workflow')
r += form.render()
return r.getvalue()
def delete(self):
form = Form(enctype="multipart/form-data")
from itertools import chain
for objdef in chain(FormDef.select(), CardDef.select()):
if objdef.workflow_id == self.workflow.id:
form.widgets.append(
HtmlWidget('<p>%s</p>' % _("This workflow is currently in use, you cannot remove it."))
)
form.add_submit("cancel", _("Cancel"))
break
else:
form.widgets.append(
HtmlWidget('<p>%s</p>' % _("You are about to irrevocably delete this workflow."))
)
form.add_submit('delete', _('Delete'))
form.add_submit("cancel", _("Cancel"))
if form.get_widget('cancel').parse():
return redirect('.')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('delete', _('Delete')))
self.html_top(title=_('Delete Workflow'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s %s</h2>') % (_('Deleting Workflow:'), self.workflow.name)
r += form.render()
return r.getvalue()
else:
self.workflow.remove_self()
return redirect('..')
def duplicate(self):
self.workflow_ui.workflow.id = None
original_name = self.workflow_ui.workflow.name
self.workflow_ui.workflow.name = self.workflow_ui.workflow.name + _(' (copy)')
workflow_names = [x.name for x in Workflow.select()]
no = 2
while self.workflow_ui.workflow.name in workflow_names:
self.workflow_ui.workflow.name = _('%(name)s (copy %(no)d)') % {'name': original_name, 'no': no}
no += 1
self.workflow_ui.workflow.store()
return redirect('../%s/' % self.workflow_ui.workflow.id)
class NamedDataSourcesDirectoryInWorkflows(NamedDataSourcesDirectory):
pass
class WorkflowsDirectory(Directory):
_q_exports = [
'',
'new',
('import', 'p_import'),
('data-sources', 'data_sources'),
('mail-templates', 'mail_templates'),
]
data_sources = NamedDataSourcesDirectoryInWorkflows()
mail_templates = MailTemplatesDirectory()
def html_top(self, title):
return html_top('workflows', title)
def _q_traverse(self, path):
get_response().breadcrumb.append(('workflows/', _('Workflows')))
return super()._q_traverse(path)
def _q_index(self):
self.html_top(title=_('Workflows'))
r = TemplateIO(html=True)
r += htmltext('<div id="appbar">')
r += htmltext('<h2>%s</h2>') % _('Workflows')
r += htmltext('<span class="actions">')
if get_publisher().has_site_option('mail-templates'):
r += htmltext('<a href="mail-templates/">%s</a>') % _('Mail Templates')
r += htmltext('<a href="data-sources/">%s</a>') % _('Data sources')
r += htmltext('<a href="import" rel="popup">%s</a>') % _('Import')
r += htmltext('<a class="new-item" rel="popup" href="new">%s</a>') % _('New Workflow')
r += htmltext('</span>')
r += htmltext('</div>')
formdef_workflows = [Workflow.get_default_workflow()]
workflows_in_formdef_use = set(formdef_workflows[0].id)
for formdef in FormDef.select(lightweight=True):
workflows_in_formdef_use.add(str(formdef.workflow_id))
if StudioDirectory.is_visible():
carddef_workflows = [CardDef.get_default_workflow()]
workflows_in_carddef_use = set(carddef_workflows[0].id)
for carddef in CardDef.select(lightweight=True):
workflows_in_carddef_use.add(str(carddef.workflow_id))
else:
carddef_workflows = []
workflows_in_carddef_use = set()
shared_workflows = []
unused_workflows = []
workflows = formdef_workflows + carddef_workflows
for workflow in Workflow.select(order_by='name'):
if str(workflow.id) in workflows_in_formdef_use and str(workflow.id) in workflows_in_carddef_use:
shared_workflows.append(workflow)
elif str(workflow.id) in workflows_in_formdef_use:
formdef_workflows.append(workflow)
elif str(workflow.id) in workflows_in_carddef_use:
carddef_workflows.append(workflow)
if str(workflow.id) in workflows_in_formdef_use or str(workflow.id) in workflows_in_carddef_use:
workflows.append(workflow)
else:
unused_workflows.append(workflow)
def workflow_section(r, workflows):
r += htmltext('<div class="bo-block"><ul class="biglist">')
for workflow in workflows:
if workflow in shared_workflows:
css_class = 'shared-workflow'
usage_label = _('Forms and card models')
elif workflow in formdef_workflows:
css_class = 'formdef-workflow'
usage_label = _('Forms')
elif workflow in carddef_workflows:
css_class = 'carddef-workflow'
usage_label = _('Card models')
else:
css_class = 'unused-workflow'
usage_label = None
r += htmltext('<li class="%s">' % css_class)
r += htmltext('<strong class="label"><a href="%s/">%s</a></strong>') % (
workflow.id,
workflow.name,
)
if usage_label and carddef_workflows:
r += htmltext('<p class="details badge">%s</p>') % usage_label
r += htmltext('</li>')
r += htmltext('</ul></div>')
workflow_section(r, workflows)
if unused_workflows:
r += htmltext('<h2>%s</h2>') % _('Unused workflows')
workflow_section(r, unused_workflows)
return r.getvalue()
def new(self):
get_response().breadcrumb.append(('new', _('New')))
workflow_ui = WorkflowUI(None)
form = workflow_ui.form_new()
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
try:
workflow = workflow_ui.submit_form(form)
except ValueError:
pass
else:
return redirect('%s/' % workflow.id)
self.html_top(title=_('New Workflow'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('New Workflow')
r += form.render()
return r.getvalue()
def _q_lookup(self, component):
return WorkflowPage(component)
def p_import(self):
form = Form(enctype='multipart/form-data')
form.add(FileWidget, 'file', title=_('File'), required=False)
form.add(UrlWidget, 'url', title=_('Address'), required=False, size=50)
form.add_submit('submit', _('Import Workflow'))
form.add_submit('cancel', _('Cancel'))
if form.get_submit() == 'cancel':
return redirect('.')
if form.is_submitted() and not form.has_errors():
try:
return self.import_submit(form)
except ValueError:
pass
get_response().breadcrumb.append(('import', _('Import')))
self.html_top(title=_('Import Workflow'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Import Workflow')
r += htmltext('<p>%s</p>') % _(
'You can install a new workflow by uploading a file ' 'or by pointing to the workflow URL.'
)
r += form.render()
return r.getvalue()
def import_submit(self, form):
if form.get_widget('file').parse():
fp = form.get_widget('file').parse().fp
elif form.get_widget('url').parse():
url = form.get_widget('url').parse()
try:
fp = misc.urlopen(url)
except misc.ConnectionError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
else:
form.set_error('file', _('You have to enter a file or a URL.'))
raise ValueError()
error, reason = False, None
try:
workflow = Workflow.import_from_xml(fp)
except WorkflowImportError as e:
error = True
reason = _(e.msg) % e.msg_args
if e.details:
reason += ' [%s]' % e.details
except ValueError:
error = True
if error:
if reason:
msg = _('Invalid File (%s)') % reason
else:
msg = _('Invalid File')
if form.get_widget('url').parse():
form.set_error('url', msg)
else:
form.set_error('file', msg)
raise ValueError()
initial_workflow_name = workflow.name
workflow_names = [x.name for x in Workflow.select()]
copy_no = 1
while workflow.name in workflow_names:
if copy_no == 1:
workflow.name = _('Copy of %s') % initial_workflow_name
else:
workflow.name = _('Copy of %(name)s (%(no)d)') % {
'name': initial_workflow_name,
'no': copy_no,
}
copy_no += 1
workflow.store()
get_session().message = ('info', _('This workflow has been successfully imported.'))
return redirect('%s/' % workflow.id)