a2_rbac: do not break unicity when get-or-creating admin role (#42179)

This commit is contained in:
Paul Marillonnet 2020-04-28 14:01:19 +02:00 committed by Benjamin Dauvergne
parent 45a93bb273
commit 7c4f725bfc
4 changed files with 66 additions and 41 deletions

View File

@ -20,7 +20,9 @@ from django.utils.text import slugify
from django.contrib.contenttypes.models import ContentType
from django_rbac.models import ADMIN_OP
from django_rbac.utils import get_role_model, get_ou_model
from django_rbac.utils import get_ou_model
from django_rbac.utils import get_permission_model
from django_rbac.utils import get_role_model
from ..utils import get_fk_model
from . import utils, app_settings
@ -140,7 +142,9 @@ def update_content_types_roles():
continue
ct_admin_role = Role.objects.get_admin_role(instance=ct, name=name,
slug=slug,
update_name=True)
update_name=True,
update_slug=True,
create=True)
if MANAGED_CT[ct_tuple].get('must_view_user'):
ct_admin_role.permissions.add(view_user_perm)
if MANAGED_CT[ct_tuple].get('must_manage_authorizations_user'):
@ -150,17 +154,22 @@ def update_content_types_roles():
def update_user_admin_roles_permission():
roles = get_role_model().objects.filter(slug__startswith='_a2-managers-of-role',
permissions__operation__slug=ADMIN_OP.slug)
for role in roles:
old_perm = role.permissions.get(operation__slug=ADMIN_OP.slug)
administered_role = old_perm.target
role_ct = ContentType.objects.get_for_model(get_role_model())
permissions = get_permission_model().objects.filter(target_ct=role_ct, operation__slug=ADMIN_OP.slug)
for perm in permissions:
administered_role = perm.target
admin_role = administered_role.get_admin_role()
new_perm = admin_role.permissions.get(operation__slug=MANAGE_MEMBERS_OP.slug,
target_id=administered_role.pk)
assert admin_role.slug.startswith('_a2-managers-of-role')
new_perm = admin_role.permissions.get(
operation__slug=MANAGE_MEMBERS_OP.slug,
target_ct=role_ct,
target_id=administered_role.id)
assert new_perm.ou is None
admin_role.delete()
assert len(perm.roles.all()) == 1
role = perm.roles.first()
role.admin_scope_id = new_perm.pk
role.save()
role.permissions.remove(old_perm)
role.save(update_fields=['admin_scope_id'])
role.permissions.remove(perm)
role.permissions.add(new_perm)
assert role.pk == administered_role.get_admin_role().pk
assert role.pk == administered_role.get_admin_role(create=False).pk

View File

@ -33,18 +33,30 @@ class RoleManager(BaseRoleManager):
self_administered=False, create=True):
'''Get or create the role of manager's of this object instance'''
kwargs = {}
if ou or getattr(instance, 'ou', None):
ou = kwargs['ou'] = ou or instance.ou
else:
kwargs['ou__isnull'] = True
assert not ou or isinstance(instance, ContentType), (
'get_admin_role(ou=...) can only be used with ContentType instances: %s %s %s' % (name, ou, instance)
)
# Does the permission need to be scoped by ou ? Yes if the target is a
# ContentType and ou is given. It's a general administration right upon
# all instance of a ContentType, eventually scoped to the given ou.
defaults = {}
if isinstance(instance, ContentType):
if ou:
kwargs['ou'] = ou
else:
kwargs['ou__isnull'] = True
else: # for non ContentType instances, OU must be set to NULL, always.
defaults['ou'] = None
# find an operation matching the template
op = get_operation(operation)
Permission = rbac_utils.get_permission_model()
if create:
perm, created = Permission.objects.get_or_create(
perm, _ = Permission.objects.update_or_create(
operation=op,
target_ct=ContentType.objects.get_for_model(instance),
target_id=instance.pk,
defaults=defaults,
**kwargs)
else:
try:
@ -55,9 +67,15 @@ class RoleManager(BaseRoleManager):
**kwargs)
except Permission.DoesNotExist:
return None
created = False
admin_role = self.get_mirror_role(perm, name, slug, ou=ou,
# in which ou do we put the role ?
if ou:
mirror_role_ou = ou
elif getattr(instance, 'ou', None):
mirror_role_ou = instance.ou
else:
mirror_role_ou = None
admin_role = self.get_mirror_role(perm, name, slug, ou=mirror_role_ou,
update_name=update_name,
update_slug=update_slug,
create=create)
@ -76,23 +94,26 @@ class RoleManager(BaseRoleManager):
def get_mirror_role(self, instance, name, slug, ou=None,
update_name=False, update_slug=False, create=True):
'''Get or create a role which mirror another model, for example a
'''Get or create a role which mirrors another model, for example a
permission.
'''
ct = ContentType.objects.get_for_model(instance)
update_fields = {}
kwargs = {}
if ou or getattr(instance, 'ou', None):
kwargs['ou'] = ou or instance.ou
if ou:
update_fields['ou'] = ou
else:
kwargs['ou__isnull'] = True
update_fields['ou'] = None
if update_name:
update_fields['name'] = name
if update_slug:
update_fields['slug'] = slug
if create:
role, created = self.prefetch_related('permissions').get_or_create(
role, _ = self.prefetch_related('permissions').update_or_create(
admin_scope_ct=ct,
admin_scope_id=instance.pk,
defaults={
'name': name,
'slug': slug,
},
defaults=update_fields,
**kwargs)
else:
try:
@ -102,14 +123,9 @@ class RoleManager(BaseRoleManager):
**kwargs)
except self.model.DoesNotExist:
return None
created = False
if update_name and not created and role.name != name:
role.name = name
role.save()
if update_slug and not created and role.slug != slug:
role.slug = slug
role.save()
for field, value in update_fields.items():
setattr(role, field, value)
role.save(update_fields=update_fields)
return role
def get_by_natural_key(self, slug, ou_natural_key, service_natural_key):

View File

@ -150,7 +150,7 @@ class OrganizationalUnit(OrganizationalUnitAbstractBase):
slug = '_a2-managers-of-{ou.slug}'.format(ou=self)
return Role.objects.get_admin_role(
instance=self, name=name, slug=slug, operation=VIEW_OP,
update_name=True, update_slug=True)
update_name=True, update_slug=True, create=True)
def delete(self, *args, **kwargs):
Permission.objects.filter(ou=self).delete()
@ -239,7 +239,7 @@ class Role(RoleAbstractBase):
view_user_perm = utils.get_view_user_perm()
admin_role = self.__class__.objects.get_admin_role(
self, ou=self.ou,
self,
name=_('Managers of role "{role}"').format(
role=six.text_type(self)),
slug='_a2-managers-of-role-{role}'.format(

View File

@ -110,7 +110,7 @@ def test_admin_roles_update_slug(db):
user = User.objects.create(username='john.doe')
name1 = 'Can manage john.doe'
slug1 = 'can-manage-john-doe'
admin_role1 = Role.objects.get_admin_role(user, name1, slug1)
admin_role1 = Role.objects.get_admin_role(user, name1, slug1, update_name=True, update_slug=True)
assert admin_role1.name == name1
assert admin_role1.slug == slug1
name2 = 'Should manage john.doe'
@ -405,7 +405,7 @@ def test_role_rename(db):
assert ar1.slug == '_a2-managers-of-role-r1ter'
def test_admin_role_user_view(settings, app, admin, simple_user, ou1, user_ou1, role_ou1):
def test_admin_role_user_view(db, settings, app, admin, simple_user, ou1, user_ou1, role_ou1):
role_ou1.get_admin_role().members.add(simple_user)
# Default: all users are visible
@ -495,7 +495,7 @@ def test_manager_roles_multi_ou(db, ou1):
@pytest.mark.parametrize(
'alert,deletion', [(-1, 31), (31, -1), (0, 31), (31, 0), (None, 31), (31, None), (32, 31)]
)
def test_unused_account_settings_validation(ou1, alert, deletion):
def test_unused_account_settings_validation(db, ou1, alert, deletion):
ou1.clean_unused_accounts_alert = alert
ou1.clean_unused_accounts_deletion = deletion
with pytest.raises(ValidationError):