workflows: allow changes to "workflow functions" (#8256)

This commit is contained in:
Frédéric Péters 2015-09-12 14:20:07 +02:00
parent 1eb04fb441
commit 20d84a0cda
5 changed files with 242 additions and 15 deletions

View File

@ -1321,6 +1321,88 @@ def test_workflows_variables_edit():
assert Workflow.get(1).variables_formdef.fields[0].key == 'string'
assert Workflow.get(1).variables_formdef.fields[0].varname == '1*1*message'
def test_workflows_functions():
create_superuser()
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
workflow.store()
app = login(get_app(pub))
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
resp = resp.click('add function')
resp = resp.forms[0].submit('cancel')
assert Workflow.get(workflow.id).roles.keys() == ['_receiver']
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
resp = resp.click('add function')
resp.forms[0]['name'] = 'Other Function'
resp = resp.forms[0].submit('submit')
assert set(Workflow.get(workflow.id).roles.keys()) == set(['_receiver', '_other-function'])
assert Workflow.get(workflow.id).roles['_other-function'] == 'Other Function'
# test rename
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
resp = resp.click('Other Function')
resp = resp.forms[0].submit('cancel')
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
resp = resp.click('Other Function')
resp.forms[0]['name'] = 'Other Renamed Function'
resp = resp.forms[0].submit('submit')
assert set(Workflow.get(workflow.id).roles.keys()) == set(['_receiver', '_other-function'])
assert Workflow.get(workflow.id).roles['_other-function'] == 'Other Renamed Function'
# test removal
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
resp = resp.click('Other Renamed Function')
resp = resp.forms[0].submit('delete')
assert set(Workflow.get(workflow.id).roles.keys()) == set(['_receiver'])
# make sure it's not possible to remove the "_receiver" key
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
resp = resp.click('Recipient')
assert not 'delete' in resp.forms[0].fields
def test_workflows_functions_vs_visibility():
create_superuser()
create_role()
Workflow.wipe()
workflow = Workflow(name='foo')
workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
workflow.store()
# restrict visibility
app = login(get_app(pub))
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
resp = resp.click('Just Submitted')
resp = resp.click('Change Status Visibility')
resp.forms[0]['hide_status_from_user'].checked = True
resp = resp.forms[0].submit()
assert Workflow.get(workflow.id).possible_status[0].visibility == ['_receiver']
# add function, make sure visibility follows
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
resp = resp.click('add function')
resp.forms[0]['name'] = 'Other Function'
resp = resp.forms[0].submit('submit')
assert set(Workflow.get(workflow.id).roles.keys()) == set(['_receiver', '_other-function'])
assert Workflow.get(workflow.id).roles['_other-function'] == 'Other Function'
assert set(Workflow.get(workflow.id).possible_status[0].visibility) == set(
['_receiver', '_other-function'])
# restrict visibility in a different status, check it gets all the
# functions
resp = app.get('/backoffice/workflows/%s/' % workflow.id)
resp = resp.click('Rejected')
resp = resp.click('Change Status Visibility')
resp.forms[0]['hide_status_from_user'].checked = True
resp = resp.forms[0].submit()
assert set(Workflow.get(workflow.id).possible_status[2].visibility) == set(
['_receiver', '_other-function'])
def test_users():
create_superuser()
app = login(get_app(pub))

View File

@ -726,3 +726,49 @@ def test_workflow_display_message(pub):
# makes sure the string is correctly escaped for HTML
display_message.message = '[foo]'
assert display_message.get_message(formdata) == '1 < 3'
def test_workflow_roles(pub):
pub.substitutions.feed(MockSubstitutionVariables())
user = pub.user_class(name='foo')
user.email = 'zorg@localhost'
user.store()
Role.wipe()
role1 = Role(name='foo')
role1.emails = ['foo@localhost']
role1.store()
role2 = Role(name='bar')
role2.emails = ['bar@localhost', 'baz@localhost']
role2.store()
workflow = Workflow(name='wf roles')
st1 = workflow.add_status('Status1', 'st1')
item = SendmailWorkflowStatusItem()
item.to = ['_receiver', '_other']
item.subject = 'Foobar'
item.body = 'Hello'
st1.items.append(item)
item.parent = st1
workflow.roles['_other'] = 'Other Function'
workflow.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.fields = []
formdef.workflow_roles = {'_receiver': role1.id, '_other': role2.id}
formdef.workflow_id = workflow.id
formdef.store()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
emails.empty()
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('Foobar')
assert set(emails.get('Foobar')['email_rcpt']) == set(
['foo@localhost', 'bar@localhost', 'baz@localhost'])

View File

@ -346,7 +346,7 @@ class WorkflowStatusPage(Directory):
r += htmltext('%s</h3>') % self.status.name
r += htmltext('</div>')
if self.status.visibility == ['_receiver']:
if self.status.get_visibility_restricted_roles():
r += htmltext('<div class="bo-block">')
r += _('This status is hidden from the user.')
if not str(self.workflow.id).startswith(str('_')):
@ -621,7 +621,7 @@ class WorkflowStatusPage(Directory):
form = Form(enctype = 'multipart/form-data')
form.add(CheckboxWidget, 'hide_status_from_user',
title=_('Hide status from user'),
value=(self.status.visibility == ['_receiver']))
value=bool(self.status.get_visibility_restricted_roles()))
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
@ -630,7 +630,7 @@ class WorkflowStatusPage(Directory):
if form.is_submitted() and not form.has_errors():
hide_status = form.get_widget('hide_status_from_user').parse()
if hide_status:
self.status.visibility = [str('_receiver')]
self.status.visibility = self.workflow.roles.keys()
else:
self.status.visibility = None
self.workflow.store()
@ -806,9 +806,92 @@ class VariablesDirectory(Directory):
return Directory._q_traverse(self, path)
class FunctionsDirectory(Directory):
_q_exports = ['', 'new']
def __init__(self, workflow):
self.workflow = workflow
def _q_traverse(self, path):
get_response().breadcrumb.append(('functions/', _('Functions')))
return Directory._q_traverse(self, path)
def new(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50)
form.add_submit('submit', _('Add'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.is_submitted() and not form.has_errors():
name = form.get_widget('name').parse()
slug = '_%s' % misc.simplify(name)
self.workflow.roles[slug] = name
# go over all existing status and update their visibility
# restrictions if necessary
for status in self.workflow.possible_status:
if status.get_visibility_restricted_roles():
status.visibility = self.workflow.roles.keys()
self.workflow.store()
return redirect('..')
get_response().breadcrumb.append(('new', _('New Function')))
html_top('workflows', title=_('New Function'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('New Function')
r += form.render()
return r.getvalue()
def _q_lookup(self, component):
function = self.workflow.roles.get('_' + component)
if not function:
raise errors.TraversalError()
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50,
value=function)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if component != 'receiver':
# do not allow removing the standard "receiver" function.
# TODO: do not display "delete" for functions that are currently in
# use.
form.add_submit('delete', _('Delete'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.get_submit() == 'delete':
slug = '_%s' % component
del self.workflow.roles[slug]
self.workflow.store()
return redirect('..')
if form.is_submitted() and not form.has_errors():
name = form.get_widget('name').parse()
slug = '_%s' % component
self.workflow.roles[slug] = name
self.workflow.store()
return redirect('..')
get_response().breadcrumb.append(('new', _('Edit Function')))
html_top('workflows', title=_('Edit Function'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Edit Function')
r += form.render()
return r.getvalue()
def _q_index(self):
return redirect('..')
class WorkflowPage(Directory):
_q_exports = ['', 'edit', 'delete', 'newstatus', ('status', 'status_dir'), 'update_order',
'duplicate', 'export', 'svg', ('variables', 'variables_dir')]
'duplicate', 'export', 'svg', ('variables', 'variables_dir'),
('functions', 'functions_dir')]
def __init__(self, component, html_top):
try:
@ -819,6 +902,7 @@ class WorkflowPage(Directory):
self.workflow_ui = WorkflowUI(self.workflow)
self.status_dir = WorkflowStatusDirectory(self.workflow, html_top)
self.variables_dir = VariablesDirectory(self.workflow)
self.functions_dir = FunctionsDirectory(self.workflow)
get_response().breadcrumb.append((component + '/', self.workflow.name))
def _q_index(self):
@ -868,7 +952,7 @@ class WorkflowPage(Directory):
r += htmltext('<ul id="status-list" class="biglist">')
for status in self.workflow.possible_status:
klass = ['biglistitem']
if status.visibility == ['_receiver']:
if status.get_visibility_restricted_roles():
klass.append('hidden-status')
r += htmltext('<li class="%s" id="itemId_%s">') % (
' '.join(klass), status.id)
@ -881,11 +965,17 @@ class WorkflowPage(Directory):
r += htmltext('<div class="splitcontent-right">')
r += htmltext('<div class="bo-block">')
r += htmltext('<h3>%s</h3>') % _('Workflow Roles')
r += htmltext('<h3>%s') % _('Workflow Functions')
if not str(self.workflow.id).startswith('_'):
r += htmltext(' <span class="change">(<a rel="popup" href="functions/new">%s</a>)</span>') % _('add function')
r += htmltext('</h3>')
r += htmltext('<ul id="roles-list" class="biglist">')
for key, label in (self.workflow.roles or {}).items():
r += htmltext('<li class="bliglistitem">')
r += htmltext('<a href="#roles/%s">%s</a>') % (key, label)
r += htmltext('<li class="biglistitem">')
if not str(self.workflow.id).startswith('_'):
r += htmltext('<a rel="popup" href="functions/%s">%s</a>') % (key[1:], label)
else:
r += htmltext('<a>%s</a>') % label
r += htmltext('</li>')
r += htmltext('</ul>')
r += htmltext('</div>')

View File

@ -487,12 +487,14 @@ class FormData(StorableObject):
status_action_roles = set()
# make sure the person defined as handling role always gets access
# to the formdata, till the very end (where it may be that there is
# no workflow status item at all).
handling_role_id = self.get_handling_role_id()
if handling_role_id:
status_action_roles.add(handling_role_id)
# make sure the handling roles always gets access to the formdata, till
# the very end (where it may be that there is no workflow status item
# at all).
from wcs.workflows import get_role_translation
for function_key in self.formdef.workflow.roles.keys():
handling_role = get_role_translation(self, function_key)
if handling_role:
status_action_roles.add(handling_role)
wf_status = self.get_status()
if not wf_status:
@ -501,7 +503,6 @@ class FormData(StorableObject):
# people that can act on a workflow status item are considered
# 'concerned' by the formdata
from wcs.workflows import get_role_translation
for item in wf_status.items or []:
if not hasattr(item, 'by') or not item.by:
continue

View File

@ -176,6 +176,7 @@ class Workflow(StorableObject):
StorableObject.__init__(self)
self.name = name
self.possible_status = []
self.roles = {'_receiver': _('Recipient')}
def migrate(self):
changed = False
@ -688,6 +689,11 @@ class WorkflowStatus(object):
item.directory_class(formdata, item, self)))
return subdirectories
def get_visibility_restricted_roles(self):
if not self.visibility: # no restriction -> visible
return []
return self.visibility
def is_visible(self, formdata, user):
if not self.visibility: # no restriction -> visible
return True
@ -1065,6 +1071,8 @@ def get_role_translation(formdata, role_name):
role_id = formdata.workflow_roles.get(role_name)
if not role_id and formdata.formdef.workflow_roles:
role_id = formdata.formdef.workflow_roles.get(role_name)
if role_id is None:
return role_id
return str(role_id)
else:
return str(role_name)