store IdP role's uuid in w.c.s. Role objects (#14207)
- added Role.uuid = None, make it indexed - modify hobo_notify to store provisionnined roles' uuid in Role.uuid, and roles' slug in Role.slug - factorize role lookup in hobo_notify and implement those rules for lookup: - Role.uuid matches uuid OR - Role.id matches uuid (retro-compatibility) OR - Role.slug matches uuid (retro-compatibility) OR - Role.slug matches slug (NEW) OR - Role.name matches name (NEW) - search users's roles by name too - update roles provisionning through SAML to use the new .uuid field - update workflow actions to add/remove roles to use the new .uuid field
This commit is contained in:
parent
e432c776ff
commit
03123e9742
|
@ -117,13 +117,16 @@ def test_process_notification_role(pub):
|
|||
assert Role.count() == 2
|
||||
old_role = Role.get(existing_role_id)
|
||||
assert old_role.name == 'Service état civil'
|
||||
assert old_role.slug == 'xyz'
|
||||
assert old_role.uuid == 'xyz'
|
||||
assert old_role.slug == 'service-ett-civil'
|
||||
assert old_role.details == "Rôle du service état civil"
|
||||
assert old_role.emails == ['etat-civil@example.com']
|
||||
assert old_role.emails_to_members is True
|
||||
new_role = Role.get('12345')
|
||||
new_role = Role.get_on_index('12345', 'uuid')
|
||||
assert new_role.id == '12345'
|
||||
assert new_role.name == 'Service enfance'
|
||||
assert new_role.slug == '12345'
|
||||
assert new_role.slug == 'service-enfance'
|
||||
assert new_role.uuid == '12345'
|
||||
assert new_role.details == "Rôle du service petite enfance"
|
||||
assert new_role.emails == ['petite-enfance@example.com']
|
||||
assert new_role.emails_to_members is False
|
||||
|
@ -149,8 +152,9 @@ def test_process_notification_role(pub):
|
|||
CmdHoboNotify.process_notification(notification)
|
||||
assert Role.count() == 1
|
||||
assert Role.select()[0].id == new_role.id
|
||||
assert Role.select()[0].uuid == '12345'
|
||||
assert Role.select()[0].name == 'Service enfance'
|
||||
assert Role.select()[0].slug == '12345'
|
||||
assert Role.select()[0].slug == 'service-enfance'
|
||||
assert Role.select()[0].details is None
|
||||
assert Role.select()[0].emails == ['petite-enfance@example.com']
|
||||
assert Role.select()[0].emails_to_members is True
|
||||
|
@ -196,13 +200,15 @@ def test_process_notification_role_description(pub):
|
|||
assert Role.count() == 2
|
||||
old_role = Role.get(existing_role_id)
|
||||
assert old_role.name == 'Service état civil'
|
||||
assert old_role.slug == 'xyz'
|
||||
assert old_role.slug == 'service-ett-civil'
|
||||
assert old_role.uuid == 'xyz'
|
||||
assert old_role.details is None
|
||||
assert old_role.emails == ['etat-civil@example.com']
|
||||
assert old_role.emails_to_members is True
|
||||
new_role = Role.get('12345')
|
||||
new_role = Role.get_on_index('12345', 'uuid')
|
||||
assert new_role.name == 'Service enfance'
|
||||
assert new_role.slug == '12345'
|
||||
assert new_role.slug == 'service-enfance'
|
||||
assert new_role.uuid == '12345'
|
||||
assert new_role.details is None
|
||||
assert new_role.emails == ['petite-enfance@example.com']
|
||||
assert new_role.emails_to_members is False
|
||||
|
@ -229,7 +235,8 @@ def test_process_notification_role_description(pub):
|
|||
assert Role.count() == 1
|
||||
assert Role.select()[0].id == new_role.id
|
||||
assert Role.select()[0].name == 'Service enfance'
|
||||
assert Role.select()[0].slug == '12345'
|
||||
assert Role.select()[0].uuid == '12345'
|
||||
assert Role.select()[0].slug == 'service-enfance'
|
||||
assert Role.select()[0].details is None
|
||||
assert Role.select()[0].emails == ['petite-enfance@example.com']
|
||||
assert Role.select()[0].emails_to_members is True
|
||||
|
@ -269,7 +276,7 @@ def test_process_notification_role_deprovision(pub):
|
|||
assert Role.select()[0].slug == 'bar'
|
||||
|
||||
r = Role(name='Service étt civil')
|
||||
r.slug = 'xyz'
|
||||
r.uuid = 'xyz'
|
||||
r.store()
|
||||
assert Role.count() == 2
|
||||
CmdHoboNotify.process_notification(notification)
|
||||
|
@ -449,13 +456,15 @@ def test_process_notification_user_provision(pub):
|
|||
assert Role.count() == 2
|
||||
old_role = Role.get(existing_role_id)
|
||||
assert old_role.name == 'Service état civil'
|
||||
assert old_role.slug == 'xyz'
|
||||
assert old_role.uuid == 'xyz'
|
||||
assert old_role.slug == 'service-ett-civil'
|
||||
assert old_role.details is None
|
||||
assert old_role.emails == ['etat-civil@example.com']
|
||||
assert old_role.emails_to_members is True
|
||||
new_role = Role.get('12345')
|
||||
new_role = Role.get_on_index('12345', 'uuid')
|
||||
assert new_role.name == 'Service enfance'
|
||||
assert new_role.slug == '12345'
|
||||
assert new_role.slug == 'service-enfance'
|
||||
assert new_role.uuid == '12345'
|
||||
assert new_role.details is None
|
||||
assert new_role.emails == ['petite-enfance@example.com']
|
||||
assert new_role.emails_to_members is False
|
||||
|
@ -502,7 +511,7 @@ def test_process_notification_user_provision(pub):
|
|||
assert user.form_data['_birthdate'] is None
|
||||
assert user.name_identifiers == ['a'*32]
|
||||
assert user.is_admin is False
|
||||
assert set(user.roles) == set(['12345', old_role.id])
|
||||
assert set(user.roles) == set([new_role.id, old_role.id])
|
||||
|
||||
notification = {
|
||||
u'@type': u'provision',
|
||||
|
|
|
@ -110,6 +110,7 @@ class CmdHoboNotify(Command):
|
|||
uuid = o['uuid'].encode(publisher.site_charset)
|
||||
uuids.add(uuid)
|
||||
slug = None
|
||||
name = None
|
||||
if action == 'provision':
|
||||
if not cls.check_valid_role(o):
|
||||
raise ValueError('invalid role')
|
||||
|
@ -119,28 +120,16 @@ class CmdHoboNotify(Command):
|
|||
emails = [email.encode(publisher.site_charset) for email in o['emails']]
|
||||
emails_to_members = o['emails_to_members']
|
||||
# Find existing role
|
||||
role = None
|
||||
try:
|
||||
role = Role.get(uuid)
|
||||
except KeyError:
|
||||
for slug_id in (slug, uuid):
|
||||
if not slug_id:
|
||||
continue
|
||||
try:
|
||||
role = Role.get_on_index(slug_id, 'slug')
|
||||
break
|
||||
except KeyError:
|
||||
continue
|
||||
else:
|
||||
# New role
|
||||
if action != 'provision':
|
||||
continue
|
||||
role = Role()
|
||||
role.id = uuid
|
||||
role = Role.resolve(uuid, slug, name)
|
||||
if not role:
|
||||
if action != 'provision':
|
||||
continue
|
||||
role = Role(id=uuid)
|
||||
if action == 'provision':
|
||||
# Provision/rename
|
||||
role.name = name
|
||||
role.slug = uuid
|
||||
role.uuid = uuid
|
||||
role.slug = slug
|
||||
role.emails = emails
|
||||
role.details = details
|
||||
role.emails_to_members = emails_to_members
|
||||
|
@ -151,7 +140,7 @@ class CmdHoboNotify(Command):
|
|||
# All roles have been sent
|
||||
if full and action == 'provision':
|
||||
for role in Role.select():
|
||||
if role.slug not in uuids:
|
||||
if role.uuid not in uuids:
|
||||
role.remove_self()
|
||||
|
||||
@classmethod
|
||||
|
@ -193,19 +182,12 @@ class CmdHoboNotify(Command):
|
|||
field_value = field.convert_value_from_anything(field_value)
|
||||
user.form_data[field.id] = field_value
|
||||
user.name_identifiers = [uuid]
|
||||
role_uuids = [role['uuid'] for role in o['roles']]
|
||||
# reset roles
|
||||
user.is_admin = o.get('is_superuser', False)
|
||||
user.roles = []
|
||||
for role_uuid in role_uuids:
|
||||
try:
|
||||
role = Role.get(role_uuid)
|
||||
except KeyError:
|
||||
try:
|
||||
role = Role.get_on_index(role_uuid, 'slug')
|
||||
except KeyError:
|
||||
continue
|
||||
if role.id not in user.roles:
|
||||
for role_ref in o.get('roles', []):
|
||||
role = Role.resolve(role_ref['uuid'], name=role_ref.get('name'))
|
||||
if role and role.id not in user.roles:
|
||||
user.add_roles([role.id])
|
||||
user.set_attributes_from_formdata(user.form_data)
|
||||
user.store()
|
||||
|
|
|
@ -504,8 +504,11 @@ class Saml2Directory(Directory):
|
|||
if 'role-slug' in m:
|
||||
role_ids = []
|
||||
names = []
|
||||
for slug in m['role-slug']:
|
||||
role = Role.get_on_index(slug, 'slug', ignore_errors=True)
|
||||
# uuid are in a role-slug attribute, it's historical, as at some
|
||||
# point roles in authentic where provisionned from w.c.s. and join
|
||||
# was done on the slug field.
|
||||
for uuid in m['role-slug']:
|
||||
role = Role.resolve(uuid)
|
||||
if not role:
|
||||
logger.warn('role slug %s is unknown', slug)
|
||||
continue
|
||||
|
|
32
wcs/roles.py
32
wcs/roles.py
|
@ -22,17 +22,18 @@ import qommon.misc
|
|||
|
||||
class Role(StorableObject):
|
||||
_names = 'roles'
|
||||
_indexes = ['slug']
|
||||
_indexes = ['uuid', 'slug']
|
||||
|
||||
name = None
|
||||
uuid = None
|
||||
slug = None
|
||||
details = None
|
||||
emails = None
|
||||
emails_to_members = False
|
||||
allows_backoffice_access = True
|
||||
|
||||
def __init__(self, name = None):
|
||||
StorableObject.__init__(self)
|
||||
def __init__(self, name=None, id=None):
|
||||
StorableObject.__init__(self, id=id)
|
||||
self.name = name
|
||||
|
||||
def migrate(self):
|
||||
|
@ -92,6 +93,31 @@ class Role(StorableObject):
|
|||
'slug': unicode(self.slug, charset),
|
||||
'id': self.id}
|
||||
|
||||
@classmethod
|
||||
def resolve(cls, uuid, slug=None, name=None):
|
||||
try:
|
||||
return cls.get_on_index(uuid, 'uuid')
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
return cls.get(uuid)
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
return cls.get_on_index(uuid, 'slug')
|
||||
except KeyError:
|
||||
pass
|
||||
if slug:
|
||||
try:
|
||||
return cls.get_on_index(slug, 'slug')
|
||||
except KeyError:
|
||||
pass
|
||||
if name:
|
||||
for role in cls.select():
|
||||
if role.name == name:
|
||||
return role
|
||||
return None
|
||||
|
||||
|
||||
def logged_users_role():
|
||||
volatile_role = Role.volatile()
|
||||
|
|
|
@ -94,7 +94,7 @@ class AddRoleWorkflowStatusItem(WorkflowStatusItem):
|
|||
|
||||
def perform_idp(self, user, formdata):
|
||||
role = Role.get(self.role_id)
|
||||
role_uuid = role.slug
|
||||
role_uuid = role.uuid or role.slug
|
||||
user_uuid = user.name_identifiers[0]
|
||||
try:
|
||||
url = roles_ws_url(role_uuid, user_uuid)
|
||||
|
@ -154,7 +154,7 @@ class RemoveRoleWorkflowStatusItem(WorkflowStatusItem):
|
|||
|
||||
def perform_idp(self, user, formdata):
|
||||
role = Role.get(self.role_id)
|
||||
role_uuid = role.slug
|
||||
role_uuid = role.uuid or role.slug
|
||||
user_uuid = user.name_identifiers[0]
|
||||
try:
|
||||
url = roles_ws_url(role_uuid, user_uuid)
|
||||
|
|
Loading…
Reference in New Issue