a2_rbac: add helpers on PermissionQueryset
gitea/authentic/pipeline/head There was a failure building this commit Details

This commit is contained in:
Benjamin Dauvergne 2024-02-29 17:20:51 +01:00
parent 35e0425386
commit 3fd42c87a7
2 changed files with 38 additions and 33 deletions

View File

@ -22,6 +22,7 @@ from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.db import connection, models
from django.db.models import query
from django.db.models.base import ModelBase
from django.db.models.query import Prefetch, Q
from django.db.transaction import atomic
@ -101,6 +102,39 @@ class PermissionQueryset(query.QuerySet):
roles = a2_models.Role.objects.for_user(user=user)
return self.filter(roles__in=roles)
@classmethod
def permission_kwargs(
cls, target: Model | ModelBase, operation: str | Operation, ou: OrganizationalUnit = None
):
if isinstance(taget, models.Model):
target_ct = ContentType.objects.get_for_model(target)
target_id = target.pk
elif issubclass(target, models.Model):
target_ct = ContentType.objects.get_for_model(ContentType)
target_id = ContentType.objects.get_for_model(target).pk
else:
raise ValueError('invalid target must be Model type or instance')
if isinstance(operation, str):
operation = Operation.objects.get(slug=operation)
else:
operation = get_operation(operation)
return {'operation': operation, 'target_ct': target_ct, 'target_id': target_id, 'ou': ou}
def filter_simple(
self, target: Model | ModelBase, operation: str | Operation, ou: OrganizationalUnit = None
):
return self.filter(
**self.permission_kwargs(operation=operation, target_ct=target_ct, target_id=target_id, ou=ou)
)
def get_or_create(
self, target: Model | ModelBase, operation: str | Operation, ou: OrganizationalUnit = None
):
return self.get_or_create(
**self.permission_kwargs(operation=operation, target_ct=target_ct, target_id=target_id, ou=ou)
)
def cleanup(self):
count = 0
for p in self:

View File

@ -522,42 +522,13 @@ class Role(AbstractBase):
return self.slug.startswith('_')
def add_permission(self, model_or_instance, operation_tpl, ou=None):
if isinstance(model_or_instance, models.Model):
target_ct = ContentType.objects.get_for_model(model_or_instance)
target_id = model_or_instance.pk
elif isinstance(model_or_instance, type) and issubclass(model_or_instance, models.Model):
target_ct = ContentType.objects.get_for_model(ContentType)
target_id = ContentType.objects.get_for_model(model_or_instance).pk
else:
raise ValueError('invalid model_or_instance')
if isinstance(operation_tpl, str):
operation = Operation.objects.get(slug=operation_tpl)
else:
operation = utils.get_operation(operation_tpl)
permission, _ = Permission.objects.get_or_create(
operation=operation, target_ct=target_ct, target_id=target_id, ou=ou
)
permission, _ = Permission.objects.get_or_create(model_or_instance, operation_tpl, ou=ou)
self.permissions.add(permission)
def remove_permission(self, model_or_instance, operation_tpl, ou=None):
if isinstance(model_or_instance, models.Model):
target_ct = ContentType.objects.get_for_model(model_or_instance)
target_id = model_or_instance.pk
elif isinstance(model_or_instance, type) and issubclass(model_or_instance, models.Model):
target_ct = ContentType.objects.get_for_model(ContentType)
target_id = ContentType.objects.get_for_model(model_or_instance).pk
else:
raise ValueError('invalid model_or_instance')
if isinstance(operation_tpl, str):
operation = Operation.objects.get(slug=operation_tpl)
else:
operation = utils.get_operation(operation_tpl)
qs = Permission.objects.filter(target_ct=target_ct, target_id=target_id, operation=operation)
if ou:
qs = qs.filter(ou=ou)
else:
qs = qs.filter(ou__isnull=True)
self.permissions.through.objects.filter(permission__in=qs).delete()
self.permissions.remove(
Permision.objects.filter_simple(model_or_instance, operation_tpl, ou=ou).delete()
)
objects = managers.RoleManager()