workflows: add possibility to add/remove a computed role (#27313)

This commit is contained in:
Frédéric Péters 2018-10-14 10:03:02 +02:00
parent 14c51926a2
commit a636338d40
4 changed files with 88 additions and 37 deletions

View File

@ -508,7 +508,7 @@ def test_dispatch_computed(pub, caplog):
item.role_id = '="foobar"'
item.perform(formdata)
assert not formdata.workflow_roles
assert caplog.records[-1].message == 'error in dispatch, missing role foobar (="foobar")'
assert caplog.records[-1].message == 'error in dispatch, missing role (="foobar")'
def test_roles(pub):
user = pub.user_class()
@ -551,6 +551,52 @@ def test_roles(pub):
item.perform(formdata)
assert pub.user_class.get(user.id).roles == ['2']
def test_add_remove_computed_roles(pub):
user = pub.user_class()
user.store()
formdef = FormDef()
formdef.name = 'baz'
formdef.store()
formdata = formdef.data_class()()
formdata.user_id = user.id
role = Role(name='plop')
role.store()
role2 = Role(name='xxx')
role2.store()
item = AddRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
item.role_id = role.name
item.perform(formdata)
assert pub.user_class.get(user.id).roles == [role.id]
user.roles = None
user.store()
item = RemoveRoleWorkflowStatusItem()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
item.role_id = role.name
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
user.roles = [role.id]
user.store()
item.perform(formdata)
assert not pub.user_class.get(user.id).roles
user.roles = [role2.id, role.id]
user.store()
item.perform(formdata)
assert pub.user_class.get(user.id).roles == [role2.id]
def test_roles_idp(pub):
pub.cfg['sp'] = {'idp-manage-user-attributes': True}
pub.cfg['idp'] = {'xxx': {'metadata_url': 'http://idp.example.net/idp/saml2/metadata'}}

View File

@ -185,22 +185,9 @@ class DispatchWorkflowStatusItem(WorkflowStatusItem):
if self.dispatch_type == 'manual' or not self.dispatch_type:
if not (self.role_id and self.role_key):
return
new_role_id = self.compute(self.role_id)
if not Role.has_key(new_role_id):
# computed value, not an id, try to get role by slug
new_role = Role.get_on_index(new_role_id, 'slug', ignore_errors=True)
if not new_role:
# fallback to role label
for role in Role.select():
if role.name == new_role_id:
new_role = role
break
if new_role:
new_role_id = new_role.id
else:
get_logger().error('error in dispatch, missing role %s (%s)' % (
new_role_id, self.role_id))
new_role_id = None
new_role_id = self.get_computed_role_id(self.role_id)
if not new_role_id:
get_logger().error('error in dispatch, missing role (%s)' % self.role_id)
elif self.dispatch_type == 'automatic':
if not (self.role_key and self.variable and self.rules):
return

View File

@ -59,8 +59,8 @@ class AddRoleWorkflowStatusItem(WorkflowStatusItem):
super(AddRoleWorkflowStatusItem, self).add_parameters_widgets(
form, parameters, prefix=prefix, formdef=formdef)
if 'role_id' in parameters:
form.add(SingleSelectWidget, '%srole_id' % prefix,
title=_('Role to Add'), value=str(self.role_id),
form.add(SingleSelectWidgetWithOther, '%srole_id' % prefix,
title=_('Role to Add'), value=str(self.role_id) if self.role_id else None,
options=[(None, '----', None)] + get_user_roles())
def role_id_export_to_xml(self, item, charset, include_id=False):
@ -74,20 +74,22 @@ class AddRoleWorkflowStatusItem(WorkflowStatusItem):
def perform(self, formdata):
if not self.role_id:
return
self.role_id = str(self.role_id)
role_id = self.get_computed_role_id(self.role_id)
if not role_id:
return
if not formdata.user_id:
# we can't work on anonymous forms
return
user = get_publisher().user_class.get(formdata.user_id)
self.perform_local(user, formdata)
self.perform_local(user, formdata, role_id)
if user.name_identifiers and is_idp_managing_user_attributes():
self.perform_idp(user, formdata)
self.perform_idp(user, formdata, role_id)
def perform_local(self, user, formdata):
def perform_local(self, user, formdata, role_id):
if not user.roles:
user.roles = []
if not self.role_id in user.roles:
user.roles.append(self.role_id)
if not role_id in user.roles:
user.roles.append(role_id)
user.store()
request = get_request()
if request and request.user and request.user.id == user.id:
@ -95,8 +97,8 @@ class AddRoleWorkflowStatusItem(WorkflowStatusItem):
# changes.
request.user = user
def perform_idp(self, user, formdata):
role = Role.get(self.role_id)
def perform_idp(self, user, formdata, role_id):
role = Role.get(role_id)
role_uuid = role.uuid or role.slug
user_uuid = user.name_identifiers[0]
try:
@ -135,25 +137,27 @@ class RemoveRoleWorkflowStatusItem(WorkflowStatusItem):
super(RemoveRoleWorkflowStatusItem, self).add_parameters_widgets(
form, parameters, prefix=prefix, formdef=formdef)
if 'role_id' in parameters:
form.add(SingleSelectWidget, '%srole_id' % prefix,
title=_('Role to Remove'), value=self.role_id,
form.add(SingleSelectWidgetWithOther, '%srole_id' % prefix,
title=_('Role to Remove'), value=str(self.role_id) if self.role_id else None,
options=[(None, '----', None)] + get_user_roles())
def perform(self, formdata):
if not self.role_id:
return
self.role_id = str(self.role_id)
role_id = self.get_computed_role_id(self.role_id)
if not role_id:
return
if not formdata.user_id:
# we can't work on anonymous forms
return
user = get_publisher().user_class.get(formdata.user_id)
self.perform_local(user, formdata)
self.perform_local(user, formdata, role_id)
if user.name_identifiers and is_idp_managing_user_attributes():
self.perform_idp(user, formdata)
self.perform_idp(user, formdata, role_id)
def perform_local(self, user, formdata):
if user.roles and self.role_id in user.roles:
user.roles.remove(self.role_id)
def perform_local(self, user, formdata, role_id):
if user.roles and role_id in user.roles:
user.roles.remove(role_id)
user.store()
request = get_request()
if request and request.user and request.user.id == user.id:
@ -161,8 +165,8 @@ class RemoveRoleWorkflowStatusItem(WorkflowStatusItem):
# with the changes.
request.user = user
def perform_idp(self, user, formdata):
role = Role.get(self.role_id)
def perform_idp(self, user, formdata, role_id):
role = Role.get(role_id)
role_uuid = role.uuid or role.slug
user_uuid = user.name_identifiers[0]
try:

View File

@ -1729,6 +1729,20 @@ class WorkflowStatusItem(XmlSerialisable):
raise
return var
def get_computed_role_id(self, role_id):
new_role_id = self.compute(str(role_id))
if Role.has_key(new_role_id):
return new_role_id
# computed value, not an id, try to get role by slug
new_role = Role.get_on_index(new_role_id, 'slug', ignore_errors=True)
if new_role:
return new_role.id
# fallback to role label
for role in Role.select():
if role.name == new_role_id:
return role.id
return None
def get_substitution_variables(self, formdata):
return {}