api: role members direct definition (#36377)

This commit is contained in:
Paul Marillonnet 2019-09-27 11:24:13 +02:00
parent a93c66455f
commit 1cedef29c9
4 changed files with 232 additions and 4 deletions

View File

@ -22,8 +22,10 @@ urlpatterns = [
url(r'^register/$', api_views.register, name='a2-api-register'),
url(r'^password-change/$', api_views.password_change, name='a2-api-password-change'),
url(r'^user/$', api_views.user, name='a2-api-user'),
url(r'^roles/(?P<role_uuid>[\w+]*)/members/(?P<member_uuid>[^/]+)/$', api_views.role_memberships,
url(r'^roles/(?P<role_uuid>[\w+]*)/members/(?P<member_uuid>[^/]+)/$', api_views.role_membership,
name='a2-api-role-member'),
url(r'^roles/(?P<role_uuid>[\w+]*)/relationships/members/$', api_views.role_memberships,
name='a2-api-role-members'),
url(r'^check-password/$', api_views.check_password, name='a2-api-check-password'),
url(r'^validate-password/$', api_views.validate_password, name='a2-api-validate-password'),
]

View File

@ -37,7 +37,8 @@ from rest_framework.routers import SimpleRouter
from rest_framework.generics import GenericAPIView
from rest_framework.response import Response
from rest_framework import permissions, status, authentication
from rest_framework.exceptions import PermissionDenied, AuthenticationFailed
from rest_framework.exceptions import (PermissionDenied, AuthenticationFailed,
ValidationError)
from rest_framework.fields import CreateOnlyDefault
from rest_framework.decorators import list_route, detail_route
from rest_framework.authentication import SessionAuthentication
@ -731,11 +732,11 @@ class RolesAPI(api_mixins.GetOrCreateMixinView, ExceptionHandlerMixin, ModelView
super(RolesAPI, self).perform_destroy(instance)
class RoleMembershipsAPI(ExceptionHandlerMixin, APIView):
class RoleMembershipAPI(ExceptionHandlerMixin, APIView):
permission_classes = (permissions.IsAuthenticated,)
def initial(self, request, *args, **kwargs):
super(RoleMembershipsAPI, self).initial(request, *args, **kwargs)
super(RoleMembershipAPI, self).initial(request, *args, **kwargs)
Role = get_role_model()
User = get_user_model()
self.role = get_object_or_404(Role, uuid=kwargs['role_uuid'])
@ -756,6 +757,72 @@ class RoleMembershipsAPI(ExceptionHandlerMixin, APIView):
return Response({'result': 1, 'detail': _('User successfully removed from role')},
status=status.HTTP_200_OK)
role_membership = RoleMembershipAPI.as_view()
class RoleMembershipsAPI(ExceptionHandlerMixin, APIView):
permission_classes = (permissions.IsAuthenticated,)
http_method_names = ['post', 'put', 'patch', 'delete']
def initial(self, request, *args, **kwargs):
super(RoleMembershipsAPI, self).initial(request, *args, **kwargs)
Role = get_role_model()
User = get_user_model()
self.role = get_object_or_404(Role, uuid=kwargs['role_uuid'])
self.members = []
perm = 'a2_rbac.change_role'
authorized = request.user.has_perm(perm, obj=self.role)
if not authorized:
raise PermissionDenied(u'User not allowed to change role')
if not isinstance(request.data, dict):
raise ValidationError(_('Payload must be a dictionary'))
for entry in request.data.get('data', ()):
try:
self.members.append(User.objects.get(uuid=entry['uuid']))
except KeyError:
raise ValidationError(_("Missing 'uuid' key for dict entry %s "
"of the 'data' payload") % entry)
except User.DoesNotExist:
raise ValidationError(
_('No known user for UUID %s') % entry['uuid'])
if not len(self.members) and \
request.method.lower() in self.http_method_names:
raise ValidationError(_('No valid user UUID'))
def post(self, request, *args, **kwargs):
self.role.members.add(*self.members)
return Response(
{
'result': 1,
'detail': _('Users successfully added to role')
},
status=status.HTTP_201_CREATED)
def delete(self, request, *args, **kwargs):
self.role.members.remove(*self.members)
return Response(
{
'result': 1,
'detail': _('Users successfully removed from role')
},
status=status.HTTP_200_OK)
def patch(self, request, *args, **kwargs):
self.role.members.set(self.members)
return Response(
{
'result': 1,
'detail': _('Users successfully assigned to role')
},
status=status.HTTP_200_OK)
def put(self, request, *args, **kwargs):
return self.patch(request, *args, **kwargs)
role_memberships = RoleMembershipsAPI.as_view()

View File

@ -191,6 +191,12 @@ def member_rando(db, ou_rando):
ou=ou_rando)
@pytest.fixture
def member_rando2(db, ou_rando):
return create_user(username='test2', first_name='test2', last_name='test2', email='test2@test.org',
ou=ou_rando)
@pytest.fixture
def member_fake():
return type('user', (object,), {'username': 'fake', 'uuid': 'fake_uuid'})

View File

@ -663,6 +663,159 @@ def test_api_role_remove_member(app, api_user, role, member):
assert resp.json['errors'] == 'User not allowed to change role'
def test_api_role_add_members(app, api_user, role, member, member_rando2):
app.authorization = ('Basic', (api_user.username, api_user.username))
authorized = api_user.has_perm('a2_rbac.change_role', role)
if role.name == 'fake':
status = 404
elif not authorized:
status = 403
elif member.username == 'fake':
status = 400
else:
status = 201
payload = {
"data": []
}
for m in [member, member_rando2, member_rando2]: # test no duplicate
payload['data'].append({"uuid": m.uuid})
resp = app.post_json(
'/api/roles/{}/relationships/members/'.format(role.uuid),
params=payload, status=status)
if status in (400, 404):
pass
elif authorized:
assert resp.json['result'] == 1
assert resp.json['detail'] == 'Users successfully added to role'
for m in [member, member_rando2]:
assert m in role.members.all()
else:
assert resp.json['result'] == 0
assert resp.json['errors'] == 'User not allowed to change role'
def test_api_role_remove_members(app, api_user, role, member, member_rando2):
app.authorization = ('Basic', (api_user.username, api_user.username))
authorized = api_user.has_perm('a2_rbac.change_role', role)
if role.name == 'fake':
status = 404
elif not authorized:
status = 403
elif member.username == 'fake':
status = 400
else:
status = 200
payload = {
"data": []
}
for m in [member, member_rando2, member_rando2]: # test no duplicate
payload['data'].append({"uuid": m.uuid})
resp = app.delete_json(
'/api/roles/{}/relationships/members/'.format(role.uuid),
params=payload, status=status)
if status in (400, 404):
pass
elif authorized:
assert resp.json['result'] == 1
assert resp.json['detail'] == 'Users successfully removed from role'
for m in [member, member_rando2]:
assert m not in role.members.all()
else:
assert resp.json['result'] == 0
assert resp.json['errors'] == 'User not allowed to change role'
def test_api_role_set_members(app, api_user, role, member, member_rando2):
app.authorization = ('Basic', (api_user.username, api_user.username))
authorized = api_user.has_perm('a2_rbac.change_role', role)
if role.name == 'fake':
status = 404
elif not authorized:
status = 403
elif member.username == 'fake':
status = 400
else:
status = 200
payload = {
"data": []
}
for m in [member, member_rando2, member_rando2]: # test no duplicate
payload['data'].append({"uuid": m.uuid})
resp = app.put_json(
'/api/roles/{}/relationships/members/'.format(role.uuid),
params=payload, status=status)
if status in (400, 404):
pass
elif authorized:
assert resp.json['result'] == 1
assert resp.json['detail'] == 'Users successfully assigned to role'
assert len(role.members.all()) == 2
for m in [member, member_rando2]:
assert m in role.members.all()
else:
assert resp.json['result'] == 0
assert resp.json['errors'] == 'User not allowed to change role'
def test_api_role_get_members(app, api_user, role):
app.authorization = ('Basic', (api_user.username, api_user.username))
authorized = api_user.has_perm('a2_rbac.change_role', role)
status = 405 if authorized else 403
app.get('/api/roles/{}/relationships/members/'.format(role.uuid), status=status)
def test_api_role_members_payload_missing(app, api_user, role):
app.authorization = ('Basic', (api_user.username, api_user.username))
authorized = api_user.has_perm('a2_rbac.change_role', role)
status = 400 if authorized else 403
app.post_json(
'/api/roles/{}/relationships/members/'.format(role.uuid),
status=status)
app.delete_json(
'/api/roles/{}/relationships/members/'.format(role.uuid),
status=status)
app.put_json(
'/api/roles/{}/relationships/members/'.format(role.uuid),
status=status)
def test_api_role_members_wrong_payload_type(app, superuser, role_random, member_rando2):
app.authorization = ('Basic', (superuser.username, superuser.username))
payload = [{
"data": [{'uuid': member_rando2.uuid}]
}]
resp = app.post_json(
'/api/roles/{}/relationships/members/'.format(role_random.uuid),
params=payload, status=400)
assert resp.json['result'] == 0
assert resp.json['errors'][0] == 'Payload must be a dictionary'
def test_register_no_email_validation(app, admin, django_user_model):
User = django_user_model
password = '12XYab'