a2_rbac: do not break unicity when get-or-creating admin role (#42179)
This commit is contained in:
parent
45a93bb273
commit
7c4f725bfc
|
@ -20,7 +20,9 @@ from django.utils.text import slugify
|
|||
from django.contrib.contenttypes.models import ContentType
|
||||
|
||||
from django_rbac.models import ADMIN_OP
|
||||
from django_rbac.utils import get_role_model, get_ou_model
|
||||
from django_rbac.utils import get_ou_model
|
||||
from django_rbac.utils import get_permission_model
|
||||
from django_rbac.utils import get_role_model
|
||||
|
||||
from ..utils import get_fk_model
|
||||
from . import utils, app_settings
|
||||
|
@ -140,7 +142,9 @@ def update_content_types_roles():
|
|||
continue
|
||||
ct_admin_role = Role.objects.get_admin_role(instance=ct, name=name,
|
||||
slug=slug,
|
||||
update_name=True)
|
||||
update_name=True,
|
||||
update_slug=True,
|
||||
create=True)
|
||||
if MANAGED_CT[ct_tuple].get('must_view_user'):
|
||||
ct_admin_role.permissions.add(view_user_perm)
|
||||
if MANAGED_CT[ct_tuple].get('must_manage_authorizations_user'):
|
||||
|
@ -150,17 +154,22 @@ def update_content_types_roles():
|
|||
|
||||
|
||||
def update_user_admin_roles_permission():
|
||||
roles = get_role_model().objects.filter(slug__startswith='_a2-managers-of-role',
|
||||
permissions__operation__slug=ADMIN_OP.slug)
|
||||
for role in roles:
|
||||
old_perm = role.permissions.get(operation__slug=ADMIN_OP.slug)
|
||||
administered_role = old_perm.target
|
||||
role_ct = ContentType.objects.get_for_model(get_role_model())
|
||||
permissions = get_permission_model().objects.filter(target_ct=role_ct, operation__slug=ADMIN_OP.slug)
|
||||
for perm in permissions:
|
||||
administered_role = perm.target
|
||||
admin_role = administered_role.get_admin_role()
|
||||
new_perm = admin_role.permissions.get(operation__slug=MANAGE_MEMBERS_OP.slug,
|
||||
target_id=administered_role.pk)
|
||||
assert admin_role.slug.startswith('_a2-managers-of-role')
|
||||
new_perm = admin_role.permissions.get(
|
||||
operation__slug=MANAGE_MEMBERS_OP.slug,
|
||||
target_ct=role_ct,
|
||||
target_id=administered_role.id)
|
||||
assert new_perm.ou is None
|
||||
admin_role.delete()
|
||||
assert len(perm.roles.all()) == 1
|
||||
role = perm.roles.first()
|
||||
role.admin_scope_id = new_perm.pk
|
||||
role.save()
|
||||
role.permissions.remove(old_perm)
|
||||
role.save(update_fields=['admin_scope_id'])
|
||||
role.permissions.remove(perm)
|
||||
role.permissions.add(new_perm)
|
||||
assert role.pk == administered_role.get_admin_role().pk
|
||||
assert role.pk == administered_role.get_admin_role(create=False).pk
|
||||
|
|
|
@ -33,18 +33,30 @@ class RoleManager(BaseRoleManager):
|
|||
self_administered=False, create=True):
|
||||
'''Get or create the role of manager's of this object instance'''
|
||||
kwargs = {}
|
||||
if ou or getattr(instance, 'ou', None):
|
||||
ou = kwargs['ou'] = ou or instance.ou
|
||||
else:
|
||||
kwargs['ou__isnull'] = True
|
||||
assert not ou or isinstance(instance, ContentType), (
|
||||
'get_admin_role(ou=...) can only be used with ContentType instances: %s %s %s' % (name, ou, instance)
|
||||
)
|
||||
|
||||
# Does the permission need to be scoped by ou ? Yes if the target is a
|
||||
# ContentType and ou is given. It's a general administration right upon
|
||||
# all instance of a ContentType, eventually scoped to the given ou.
|
||||
defaults = {}
|
||||
if isinstance(instance, ContentType):
|
||||
if ou:
|
||||
kwargs['ou'] = ou
|
||||
else:
|
||||
kwargs['ou__isnull'] = True
|
||||
else: # for non ContentType instances, OU must be set to NULL, always.
|
||||
defaults['ou'] = None
|
||||
# find an operation matching the template
|
||||
op = get_operation(operation)
|
||||
Permission = rbac_utils.get_permission_model()
|
||||
if create:
|
||||
perm, created = Permission.objects.get_or_create(
|
||||
perm, _ = Permission.objects.update_or_create(
|
||||
operation=op,
|
||||
target_ct=ContentType.objects.get_for_model(instance),
|
||||
target_id=instance.pk,
|
||||
defaults=defaults,
|
||||
**kwargs)
|
||||
else:
|
||||
try:
|
||||
|
@ -55,9 +67,15 @@ class RoleManager(BaseRoleManager):
|
|||
**kwargs)
|
||||
except Permission.DoesNotExist:
|
||||
return None
|
||||
created = False
|
||||
|
||||
admin_role = self.get_mirror_role(perm, name, slug, ou=ou,
|
||||
# in which ou do we put the role ?
|
||||
if ou:
|
||||
mirror_role_ou = ou
|
||||
elif getattr(instance, 'ou', None):
|
||||
mirror_role_ou = instance.ou
|
||||
else:
|
||||
mirror_role_ou = None
|
||||
admin_role = self.get_mirror_role(perm, name, slug, ou=mirror_role_ou,
|
||||
update_name=update_name,
|
||||
update_slug=update_slug,
|
||||
create=create)
|
||||
|
@ -76,23 +94,26 @@ class RoleManager(BaseRoleManager):
|
|||
|
||||
def get_mirror_role(self, instance, name, slug, ou=None,
|
||||
update_name=False, update_slug=False, create=True):
|
||||
'''Get or create a role which mirror another model, for example a
|
||||
'''Get or create a role which mirrors another model, for example a
|
||||
permission.
|
||||
'''
|
||||
ct = ContentType.objects.get_for_model(instance)
|
||||
update_fields = {}
|
||||
kwargs = {}
|
||||
if ou or getattr(instance, 'ou', None):
|
||||
kwargs['ou'] = ou or instance.ou
|
||||
if ou:
|
||||
update_fields['ou'] = ou
|
||||
else:
|
||||
kwargs['ou__isnull'] = True
|
||||
update_fields['ou'] = None
|
||||
if update_name:
|
||||
update_fields['name'] = name
|
||||
if update_slug:
|
||||
update_fields['slug'] = slug
|
||||
|
||||
if create:
|
||||
role, created = self.prefetch_related('permissions').get_or_create(
|
||||
role, _ = self.prefetch_related('permissions').update_or_create(
|
||||
admin_scope_ct=ct,
|
||||
admin_scope_id=instance.pk,
|
||||
defaults={
|
||||
'name': name,
|
||||
'slug': slug,
|
||||
},
|
||||
defaults=update_fields,
|
||||
**kwargs)
|
||||
else:
|
||||
try:
|
||||
|
@ -102,14 +123,9 @@ class RoleManager(BaseRoleManager):
|
|||
**kwargs)
|
||||
except self.model.DoesNotExist:
|
||||
return None
|
||||
created = False
|
||||
|
||||
if update_name and not created and role.name != name:
|
||||
role.name = name
|
||||
role.save()
|
||||
if update_slug and not created and role.slug != slug:
|
||||
role.slug = slug
|
||||
role.save()
|
||||
for field, value in update_fields.items():
|
||||
setattr(role, field, value)
|
||||
role.save(update_fields=update_fields)
|
||||
return role
|
||||
|
||||
def get_by_natural_key(self, slug, ou_natural_key, service_natural_key):
|
||||
|
|
|
@ -150,7 +150,7 @@ class OrganizationalUnit(OrganizationalUnitAbstractBase):
|
|||
slug = '_a2-managers-of-{ou.slug}'.format(ou=self)
|
||||
return Role.objects.get_admin_role(
|
||||
instance=self, name=name, slug=slug, operation=VIEW_OP,
|
||||
update_name=True, update_slug=True)
|
||||
update_name=True, update_slug=True, create=True)
|
||||
|
||||
def delete(self, *args, **kwargs):
|
||||
Permission.objects.filter(ou=self).delete()
|
||||
|
@ -239,7 +239,7 @@ class Role(RoleAbstractBase):
|
|||
view_user_perm = utils.get_view_user_perm()
|
||||
|
||||
admin_role = self.__class__.objects.get_admin_role(
|
||||
self, ou=self.ou,
|
||||
self,
|
||||
name=_('Managers of role "{role}"').format(
|
||||
role=six.text_type(self)),
|
||||
slug='_a2-managers-of-role-{role}'.format(
|
||||
|
|
|
@ -110,7 +110,7 @@ def test_admin_roles_update_slug(db):
|
|||
user = User.objects.create(username='john.doe')
|
||||
name1 = 'Can manage john.doe'
|
||||
slug1 = 'can-manage-john-doe'
|
||||
admin_role1 = Role.objects.get_admin_role(user, name1, slug1)
|
||||
admin_role1 = Role.objects.get_admin_role(user, name1, slug1, update_name=True, update_slug=True)
|
||||
assert admin_role1.name == name1
|
||||
assert admin_role1.slug == slug1
|
||||
name2 = 'Should manage john.doe'
|
||||
|
@ -405,7 +405,7 @@ def test_role_rename(db):
|
|||
assert ar1.slug == '_a2-managers-of-role-r1ter'
|
||||
|
||||
|
||||
def test_admin_role_user_view(settings, app, admin, simple_user, ou1, user_ou1, role_ou1):
|
||||
def test_admin_role_user_view(db, settings, app, admin, simple_user, ou1, user_ou1, role_ou1):
|
||||
role_ou1.get_admin_role().members.add(simple_user)
|
||||
|
||||
# Default: all users are visible
|
||||
|
@ -495,7 +495,7 @@ def test_manager_roles_multi_ou(db, ou1):
|
|||
@pytest.mark.parametrize(
|
||||
'alert,deletion', [(-1, 31), (31, -1), (0, 31), (31, 0), (None, 31), (31, None), (32, 31)]
|
||||
)
|
||||
def test_unused_account_settings_validation(ou1, alert, deletion):
|
||||
def test_unused_account_settings_validation(db, ou1, alert, deletion):
|
||||
ou1.clean_unused_accounts_alert = alert
|
||||
ou1.clean_unused_accounts_deletion = deletion
|
||||
with pytest.raises(ValidationError):
|
||||
|
|
Loading…
Reference in New Issue