a2_rbac: rename role's admin role on role's rename (#34774)

This commit is contained in:
Benjamin Dauvergne 2019-07-12 10:03:06 +02:00
parent d6fbfaa0c8
commit 5732f219c5
3 changed files with 109 additions and 22 deletions

View File

@ -30,7 +30,7 @@ class OrganizationalUnitManager(AbstractBaseManager):
class RoleManager(BaseRoleManager):
def get_admin_role(self, instance, name, slug, ou=None, operation=ADMIN_OP,
update_name=False, update_slug=False, permissions=(),
self_administered=False):
self_administered=False, create=True):
'''Get or create the role of manager's of this object instance'''
kwargs = {}
if ou or getattr(instance, 'ou', None):
@ -40,14 +40,31 @@ class RoleManager(BaseRoleManager):
# find an operation matching the template
op = get_operation(operation)
Permission = rbac_utils.get_permission_model()
perm, created = Permission.objects.get_or_create(
operation=op,
target_ct=ContentType.objects.get_for_model(instance),
target_id=instance.pk,
**kwargs)
if create:
perm, created = Permission.objects.get_or_create(
operation=op,
target_ct=ContentType.objects.get_for_model(instance),
target_id=instance.pk,
**kwargs)
else:
try:
perm = Permission.objects.get(
operation=op,
target_ct=ContentType.objects.get_for_model(instance),
target_id=instance.pk,
**kwargs)
except Permission.DoesNotExist:
return None
created = False
admin_role = self.get_mirror_role(perm, name, slug, ou=ou,
update_name=update_name,
update_slug=update_slug)
update_slug=update_slug,
create=create)
if not admin_role:
return None
permissions = set(permissions)
permissions.add(perm)
if self_administered:
@ -60,7 +77,7 @@ class RoleManager(BaseRoleManager):
return admin_role
def get_mirror_role(self, instance, name, slug, ou=None,
update_name=False, update_slug=False):
update_name=False, update_slug=False, create=True):
'''Get or create a role which mirror another model, for example a
permission.
'''
@ -70,14 +87,25 @@ class RoleManager(BaseRoleManager):
kwargs['ou'] = ou or instance.ou
else:
kwargs['ou__isnull'] = True
role, created = self.prefetch_related('permissions').get_or_create(
admin_scope_ct=ct,
admin_scope_id=instance.pk,
defaults={
'name': name,
'slug': slug,
},
**kwargs)
if create:
role, created = self.prefetch_related('permissions').get_or_create(
admin_scope_ct=ct,
admin_scope_id=instance.pk,
defaults={
'name': name,
'slug': slug,
},
**kwargs)
else:
try:
role = self.prefetch_related('permissions').get(
admin_scope_ct=ct,
admin_scope_id=instance.pk,
**kwargs)
except self.model.DoesNotExist:
return None
created = False
if update_name and not created and role.name != name:
role.name = name
role.save()

View File

@ -203,7 +203,7 @@ class Role(RoleAbstractBase):
content_type_field='target_ct',
object_id_field='target_id')
def get_admin_role(self, ou=None):
def get_admin_role(self, ou=None, create=True):
from . import utils
admin_role = self.__class__.objects.get_admin_role(
self, ou=self.ou,
@ -212,7 +212,10 @@ class Role(RoleAbstractBase):
slug='_a2-managers-of-role-{role}'.format(
role=slugify(six.text_type(self))),
permissions=(utils.get_view_user_perm(),),
self_administered=True)
self_administered=True,
update_name=True,
update_slug=True,
create=create)
return admin_role
def validate_unique(self, exclude=None):
@ -237,7 +240,9 @@ class Role(RoleAbstractBase):
# Service roles can only be part of the same ou as the service
if self.service:
self.ou = self.service.ou
return super(Role, self).save(*args, **kwargs)
result = super(Role, self).save(*args, **kwargs)
self.get_admin_role(create=False)
return result
def has_self_administration(self, op=CHANGE_OP):
Permission = rbac_utils.get_permission_model()

View File

@ -69,14 +69,14 @@ def test_role_with_ou_export_json(db):
ou = OU.objects.create(name='ou', slug='ou')
role = Role.objects.create(name='some role', ou=ou)
role_dict = role.export_json()
assert role_dict['ou'] == {'uuid': ou.uuid, 'slug': ou.slug, 'name': ou.name}
assert role_dict['ou'] == {'uuid': ou.uuid, 'slug': ou.slug, 'name': ou.name}
def test_role_with_service_export_json(db):
service = Service.objects.create(name='service name', slug='service-name')
role = Role.objects.create(name='some role', service=service)
role_dict = role.export_json()
assert role_dict['service'] == {'slug': service.slug, 'ou': None}
assert role_dict['service'] == {'slug': service.slug, 'ou': None}
def test_role_with_service_with_ou_export_json(db):
@ -85,7 +85,7 @@ def test_role_with_service_with_ou_export_json(db):
role = Role.objects.create(name='some role', service=service)
role_dict = role.export_json()
assert role_dict['service'] == {
'slug': service.slug, 'ou': {'uuid': ou.uuid, 'slug': 'ou', 'name': 'ou'}}
'slug': service.slug, 'ou': {'uuid': ou.uuid, 'slug': 'ou', 'name': 'ou'}}
def test_role_with_attributes_export_json(db):
@ -218,6 +218,20 @@ def test_admin_cleanup(db):
assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 0
def test_admin_cleanup_bulk_delete(db):
r1 = Role.objects.create(name='r1')
assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 0
r1.get_admin_role()
assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 1
Role.objects.filter(name='r1').delete()
assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 0
def test_admin_cleanup_failure(db):
Role.objects.create(
name='manager of r1',
@ -242,3 +256,43 @@ def test_admin_cleanup_command(db):
call_command('cleanupauthentic')
assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 0
def test_role_rename(db):
r1 = Role.objects.create(name='r1')
assert r1.slug == 'r1'
assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 0
assert not r1.get_admin_role(create=False)
assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 0
ar1 = r1.get_admin_role()
assert ar1
assert ar1.name == 'Managers of role "r1"'
assert ar1.slug == '_a2-managers-of-role-r1'
assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 1
assert ar1.name == 'Managers of role "r1"'
Role.objects.filter(pk=r1.pk).update(name='r1bis')
r1.refresh_from_db()
ar1.refresh_from_db()
assert r1.name == 'r1bis'
assert ar1.name == 'Managers of role "r1"'
ar1 = r1.get_admin_role(create=False)
assert ar1.name == 'Managers of role "r1bis"'
assert ar1.slug == '_a2-managers-of-role-r1bis'
r1.name = 'r1ter'
r1.save()
ar1.refresh_from_db()
assert ar1.name == 'Managers of role "r1ter"'
assert ar1.slug == '_a2-managers-of-role-r1ter'