utils: add DjangoRBACPermission DRF's permission class (#62013)
This commit is contained in:
parent
fdf7374113
commit
b2ae197378
|
@ -16,7 +16,7 @@
|
|||
|
||||
|
||||
from django.db import models
|
||||
from rest_framework import exceptions, serializers
|
||||
from rest_framework import exceptions, permissions, serializers
|
||||
|
||||
|
||||
class NaturalKeyRelatedField(serializers.RelatedField):
|
||||
|
@ -77,3 +77,76 @@ class NaturalKeyRelatedField(serializers.RelatedField):
|
|||
except model.MultipleObjectsReturned:
|
||||
raise exceptions.ValidationError('multiple objects returned')
|
||||
raise exceptions.ValidationError('object not found')
|
||||
|
||||
|
||||
class DjangoRBACPermission(permissions.BasePermission):
|
||||
perms_map = {
|
||||
'GET': [],
|
||||
'OPTIONS': [],
|
||||
'HEAD': [],
|
||||
'POST': ['add'],
|
||||
'PUT': ['change'],
|
||||
'PATCH': ['change'],
|
||||
'DELETE': ['delete'],
|
||||
}
|
||||
object_perms_map = {
|
||||
'GET': ['view'],
|
||||
}
|
||||
|
||||
def __init__(self, perms_map=None, object_perms_map=None):
|
||||
self.perms_map = perms_map or dict(self.perms_map)
|
||||
if object_perms_map:
|
||||
self.object_perms_map = object_perms_map
|
||||
else:
|
||||
self.object_perms_map = dict(self.object_perms_map)
|
||||
for k, v in self.perms_map.items():
|
||||
if v:
|
||||
self.object_perms_map[k] = v
|
||||
|
||||
def _get_queryset(self, view):
|
||||
assert hasattr(view, 'get_queryset') or getattr(view, 'queryset', None) is not None, (
|
||||
'Cannot apply {} on a view that does not set ' '`.queryset` or have a `.get_queryset()` method.'
|
||||
).format(self.__class__.__name__)
|
||||
|
||||
if hasattr(view, 'get_queryset'):
|
||||
queryset = view.get_queryset()
|
||||
assert queryset is not None, f'{view.__class__.__name__}.get_queryset() returned None'
|
||||
return queryset
|
||||
return view.queryset
|
||||
|
||||
def _get_required_permissions(self, method, model_cls, perms_map):
|
||||
"""
|
||||
Given a model and an HTTP method, return the list of permission
|
||||
codes that the user is required to have.
|
||||
"""
|
||||
app_label = model_cls._meta.app_label
|
||||
model_name = model_cls._meta.model_name
|
||||
|
||||
if method not in perms_map:
|
||||
raise exceptions.MethodNotAllowed(method)
|
||||
|
||||
return [f'{app_label}.{perm}_{model_name}' if '.' not in perm else perm for perm in perms_map[method]]
|
||||
|
||||
def has_permission(self, request, view):
|
||||
if not request.user or not request.user.is_authenticated:
|
||||
return False
|
||||
|
||||
queryset = self._get_queryset(view)
|
||||
perms = self._get_required_permissions(request.method, queryset.model, self.perms_map)
|
||||
|
||||
return request.user.has_perms(perms)
|
||||
|
||||
def has_object_permission(self, request, view, obj):
|
||||
if not request.user or not request.user.is_authenticated:
|
||||
return False
|
||||
|
||||
queryset = self._get_queryset(view)
|
||||
perms = self._get_required_permissions(request.method, queryset.model, self.object_perms_map)
|
||||
|
||||
return request.user.has_perms(perms, obj=obj)
|
||||
|
||||
def __call__(self):
|
||||
return self
|
||||
|
||||
def __repr__(self):
|
||||
return f'<DjangoRBACPermission perms_map={self.perms_map} object_perms_map={self.object_perms_map}>'
|
||||
|
|
|
@ -14,13 +14,15 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from rest_framework.exceptions import ValidationError
|
||||
from rest_framework.exceptions import MethodNotAllowed, ValidationError
|
||||
|
||||
from authentic2.a2_rbac.models import OrganizationalUnit as OU
|
||||
from authentic2.a2_rbac.models import Role
|
||||
from authentic2.models import Service
|
||||
from authentic2.utils.api import NaturalKeyRelatedField
|
||||
from authentic2.utils.api import DjangoRBACPermission, NaturalKeyRelatedField
|
||||
from tests.utils import scoped_db_fixture
|
||||
|
||||
|
||||
|
@ -132,3 +134,88 @@ class TestNaturalKeyRelatedField:
|
|||
assert (
|
||||
NaturalKeyRelatedField(queryset=Role.objects.all()).to_internal_value(value) == fixture.role
|
||||
)
|
||||
|
||||
|
||||
class TestDjangoRBACPermission:
|
||||
@pytest.fixture
|
||||
def permission(self):
|
||||
return DjangoRBACPermission(
|
||||
perms_map={
|
||||
'GET': [],
|
||||
'POST': ['create'],
|
||||
'DELETE': [],
|
||||
},
|
||||
object_perms_map={
|
||||
'GET': [],
|
||||
'DELETE': ['delete'],
|
||||
},
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def view(self):
|
||||
view = mock.Mock()
|
||||
view.get_queryset.return_value = Role.objects.all()
|
||||
return view
|
||||
|
||||
class TestHasPermission:
|
||||
def test_user_must_be_authenticated(self, permission):
|
||||
request = mock.Mock()
|
||||
request.user.is_authenticated = False
|
||||
assert not permission.has_permission(request=request, view=mock.Mock())
|
||||
|
||||
def test_method_is_not_allowed(self, rf, permission):
|
||||
request = mock.Mock()
|
||||
request.method = 'PATCH'
|
||||
request.user.is_authenticated = True
|
||||
|
||||
with pytest.raises(MethodNotAllowed):
|
||||
permission.has_permission(request=request, view=mock.Mock())
|
||||
|
||||
def test_method_post(self, permission, view):
|
||||
request = mock.Mock()
|
||||
request.method = 'POST'
|
||||
request.user.is_authenticated = True
|
||||
request.user.has_perms = lambda perms, obj=None: not obj and set(perms) <= {'a2_rbac.create_role'}
|
||||
assert permission.has_permission(request=request, view=view)
|
||||
|
||||
request.user.has_perms = lambda perms, obj=None: not obj and set(perms) <= set()
|
||||
assert not permission.has_permission(request=request, view=view)
|
||||
|
||||
def test_method_get(self, permission, view):
|
||||
request = mock.Mock()
|
||||
request.user.is_authenticated = True
|
||||
request.method = 'GET'
|
||||
request.user.has_perms = lambda perms, obj=None: not obj and (not perms or set(perms) <= set())
|
||||
assert permission.has_permission(request=request, view=view)
|
||||
|
||||
class TestHasObjectPermission:
|
||||
def test_user_must_be_authenticated(self, permission):
|
||||
request = mock.Mock()
|
||||
request.user.is_authenticated = False
|
||||
assert not permission.has_object_permission(request=request, view=mock.Mock(), obj=mock.Mock())
|
||||
|
||||
def test_method_is_not_allowed(self, rf, permission):
|
||||
request = mock.Mock()
|
||||
request.method = 'PATCH'
|
||||
request.user.is_authenticated = True
|
||||
|
||||
with pytest.raises(MethodNotAllowed):
|
||||
permission.has_object_permission(request=request, view=mock.Mock(), obj=mock.Mock())
|
||||
|
||||
def test_method_delete(self, permission, view):
|
||||
request = mock.Mock()
|
||||
request.method = 'DELETE'
|
||||
request.user.is_authenticated = True
|
||||
mock_obj = mock.Mock()
|
||||
request.user.has_perms = (
|
||||
lambda perms, obj=None: set(perms) <= {'a2_rbac.delete_role'} and obj is mock_obj
|
||||
)
|
||||
assert permission.has_object_permission(request=request, view=view, obj=mock_obj)
|
||||
|
||||
request.user.has_perms = mock.Mock(return_value=False)
|
||||
assert not permission.has_object_permission(request=request, view=view, obj=mock_obj)
|
||||
assert request.user.has_perms.call_args[1]['obj'] is mock_obj
|
||||
|
||||
request.method = 'GET'
|
||||
request.user.has_perms = lambda perms, obj=None: not obj and set(perms) <= {'a2_rbac.create_role'}
|
||||
assert permission.has_permission(request=request, view=view)
|
||||
|
|
Loading…
Reference in New Issue