add role-creation API (#20706)
This commit is contained in:
parent
eef27f8314
commit
85da1be801
|
@ -444,6 +444,50 @@ class BaseUserSerializer(serializers.ModelSerializer):
|
|||
exclude = ('date_joined', 'user_permissions', 'groups', 'last_login')
|
||||
|
||||
|
||||
class RoleSerializer(serializers.ModelSerializer):
|
||||
ou = serializers.SlugRelatedField(
|
||||
many=False,
|
||||
required=False,
|
||||
default=get_default_ou,
|
||||
queryset=get_ou_model().objects.all(),
|
||||
slug_field='slug')
|
||||
|
||||
@property
|
||||
def user(self):
|
||||
return self.context['request'].user
|
||||
|
||||
def __init__(self, instance=None, **kwargs):
|
||||
super(RoleSerializer, self).__init__(instance, **kwargs)
|
||||
if self.instance:
|
||||
self.fields['ou'].read_only = True
|
||||
|
||||
def create(self, validated_data):
|
||||
ou = validated_data.get('ou')
|
||||
# Creating roles also means being allowed to within the OU:
|
||||
if not self.user.has_ou_perm('a2_rbac.add_role', ou):
|
||||
raise PermissionDenied(u'User %s can\'t create role in OU %s' % (self.user, ou))
|
||||
return super(RoleSerializer, self).create(validated_data)
|
||||
|
||||
def update(self, instance, validated_data):
|
||||
# Check role-updating permissions:
|
||||
if not self.user.has_perm('a2_rbac.change_role', obj=instance):
|
||||
raise PermissionDenied(u'User %s can\'t change role %s' % (self.user, instance))
|
||||
super(RoleSerializer, self).update(instance, validated_data)
|
||||
return instance
|
||||
|
||||
def partial_update(self, instance, validated_data):
|
||||
# Check role-updating permissions:
|
||||
if not self.user.has_perm('a2_rbac.change_role', obj=instance):
|
||||
raise PermissionDenied(u'User %s can\'t change role %s' % (self.user, instance))
|
||||
super(RoleSerializer, self).partial_update(instance, validated_data)
|
||||
return instance
|
||||
|
||||
class Meta:
|
||||
model = get_role_model()
|
||||
fields = ('uuid', 'name', 'slug', 'description', 'ou',)
|
||||
extra_kwargs = {'uuid': {'read_only': True}}
|
||||
|
||||
|
||||
class UsersFilter(FilterSet):
|
||||
class Meta:
|
||||
model = get_user_model()
|
||||
|
@ -576,6 +620,20 @@ class UsersAPI(HookMixin, ExceptionHandlerMixin, ModelViewSet):
|
|||
return Response({'result': 1})
|
||||
|
||||
|
||||
class RolesAPI(ExceptionHandlerMixin, ModelViewSet):
|
||||
permission_classes = (permissions.IsAuthenticated,)
|
||||
serializer_class = RoleSerializer
|
||||
lookup_field = 'slug'
|
||||
|
||||
def get_queryset(self):
|
||||
return self.request.user.filter_by_perm('a2_rbac.view_role', get_role_model().objects.all())
|
||||
|
||||
def perform_destroy(self, instance):
|
||||
if not self.request.user.has_perm(perm='a2_rbac.delete_role', obj=instance):
|
||||
raise PermissionDenied(u'User %s can\'t create role %s' % (request.user, instance))
|
||||
super(RolesAPI, self).perform_destroy(instance)
|
||||
|
||||
|
||||
class RoleMembershipsAPI(ExceptionHandlerMixin, APIView):
|
||||
permission_classes = (permissions.IsAuthenticated,)
|
||||
|
||||
|
@ -620,6 +678,7 @@ class OrganizationalUnitAPI(ExceptionHandlerMixin, ModelViewSet):
|
|||
router = SimpleRouter()
|
||||
router.register(r'users', UsersAPI, base_name='a2-api-users')
|
||||
router.register(r'ous', OrganizationalUnitAPI, base_name='a2-api-ous')
|
||||
router.register(r'roles', RolesAPI, base_name='a2-api-roles')
|
||||
|
||||
|
||||
class CheckPasswordSerializer(serializers.Serializer):
|
||||
|
|
|
@ -668,3 +668,159 @@ def test_users_email(app, ou1, admin, user_ou1, mailoutbox):
|
|||
|
||||
assert mail.to[0] == new_email
|
||||
assert 'http://testserver/accounts/change-email/verify/' in mail.body
|
||||
|
||||
|
||||
def test_api_delete_role(app, admin_ou1, role_ou1):
|
||||
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
|
||||
Role = get_role_model()
|
||||
|
||||
app.delete('/api/roles/role_ou1/')
|
||||
assert not len(Role.objects.filter(slug='role_ou1'))
|
||||
|
||||
|
||||
def test_api_delete_role_unauthorized(app, simple_user, role_ou1):
|
||||
app.authorization = ('Basic', (simple_user.username, simple_user.username))
|
||||
Role = get_role_model()
|
||||
|
||||
app.delete('/api/roles/role_ou1/', status=404)
|
||||
assert len(Role.objects.filter(slug='role_ou1'))
|
||||
|
||||
|
||||
def test_api_patch_role(app, admin_ou1, role_ou1):
|
||||
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
|
||||
Role = get_role_model()
|
||||
|
||||
role_data = {
|
||||
'slug': 'updated-role',
|
||||
}
|
||||
resp = app.patch_json('/api/roles/role_ou1/', params=role_data)
|
||||
assert not len(Role.objects.filter(slug='role_ou1'))
|
||||
assert len(Role.objects.filter(slug='updated-role'))
|
||||
|
||||
# The role API won't change the organizational unit attribute:
|
||||
role = Role.objects.get(slug='updated-role')
|
||||
assert role.ou.slug == get_default_ou().slug
|
||||
assert role.ou.slug != 'ou1'
|
||||
|
||||
|
||||
def test_api_patch_role_unauthorized(app, simple_user, role_ou1):
|
||||
app.authorization = ('Basic', (simple_user.username, simple_user.username))
|
||||
Role = get_role_model()
|
||||
|
||||
role_data = {
|
||||
'slug': 'updated-role',
|
||||
}
|
||||
resp = app.patch_json('/api/roles/role_ou1/', params=role_data, status=404)
|
||||
assert len(Role.objects.filter(slug='role_ou1'))
|
||||
assert not len(Role.objects.filter(slug='updated-role'))
|
||||
|
||||
|
||||
def test_api_put_role(app, admin_ou1, role_ou1, ou1):
|
||||
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
|
||||
Role = get_role_model()
|
||||
|
||||
role_data = {
|
||||
'name': 'updated-role',
|
||||
'slug': 'updated-role',
|
||||
'ou': 'ou1'
|
||||
}
|
||||
resp = app.put_json('/api/roles/role_ou1/', params=role_data)
|
||||
assert not len(Role.objects.filter(slug='role_ou1'))
|
||||
assert len(Role.objects.filter(slug='updated-role'))
|
||||
|
||||
# The role API won't change the organizational unit attribute:
|
||||
role = Role.objects.get(slug='updated-role')
|
||||
assert role.ou.slug == get_default_ou().slug
|
||||
assert role.ou.slug != 'ou1'
|
||||
|
||||
|
||||
def test_api_put_role_unauthorized(app, simple_user, role_ou1, ou1):
|
||||
app.authorization = ('Basic', (simple_user.username, simple_user.username))
|
||||
Role = get_role_model()
|
||||
|
||||
role_data = {
|
||||
'name': 'updated-role',
|
||||
'slug': 'updated-role',
|
||||
'ou': 'ou1'
|
||||
}
|
||||
resp = app.put_json('/api/roles/role_ou1/', params=role_data, status=404)
|
||||
assert len(Role.objects.filter(slug='role_ou1'))
|
||||
assert not len(Role.objects.filter(slug='updated-role'))
|
||||
|
||||
|
||||
def test_api_post_role(app, admin_ou1, ou1):
|
||||
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
|
||||
|
||||
role_data = {
|
||||
'slug': 'coffee-manager',
|
||||
'name': 'Coffee Manager',
|
||||
'ou': 'ou1'
|
||||
}
|
||||
resp = app.post_json('/api/roles/', params=role_data)
|
||||
assert isinstance(resp.json, dict)
|
||||
Role = get_role_model()
|
||||
|
||||
# Check attribute values against the server's response:
|
||||
for key, value in role_data.items():
|
||||
assert key in resp.json.keys()
|
||||
assert value in resp.json.values()
|
||||
|
||||
# Check attributes values against the DB:
|
||||
posted_role = Role.objects.get(slug='coffee-manager')
|
||||
assert posted_role.slug == role_data['slug']
|
||||
assert posted_role.name == role_data['name']
|
||||
assert posted_role.ou.slug == 'ou1'
|
||||
|
||||
|
||||
def test_api_post_role_no_ou(app, superuser):
|
||||
app.authorization = ('Basic', (superuser.username, superuser.username))
|
||||
Role = get_role_model()
|
||||
|
||||
role_data = {
|
||||
'slug': 'tea-manager',
|
||||
'name': 'Tea Manager',
|
||||
}
|
||||
resp = app.post_json('/api/roles/', params=role_data)
|
||||
new_role = Role.objects.get(slug='tea-manager')
|
||||
default_ou = get_default_ou()
|
||||
assert new_role.ou == default_ou
|
||||
|
||||
|
||||
def test_api_post_role_unauthorized(app, simple_user, ou1):
|
||||
app.authorization = ('Basic', (simple_user.username, simple_user.username))
|
||||
Role = get_role_model()
|
||||
|
||||
role_data = {
|
||||
'slug': 'mocca-manager',
|
||||
'name': 'Mocca Manager',
|
||||
'ou': 'ou1'
|
||||
}
|
||||
|
||||
resp = app.post_json('/api/roles/', params=role_data, status=403)
|
||||
assert not len(Role.objects.filter(slug='mocca-manager'))
|
||||
|
||||
|
||||
def test_api_get_role_description(app, admin_rando_role, role_random):
|
||||
app.authorization = ('Basic', (admin_rando_role.username, admin_rando_role.username))
|
||||
resp = app.get('/api/roles/rando/')
|
||||
|
||||
assert resp.json['slug'] == 'rando'
|
||||
assert resp.json['ou'] == 'ou_rando'
|
||||
|
||||
|
||||
def test_api_get_role_not_found(app, superuser):
|
||||
app.authorization = ('Basic', (superuser.username, superuser.username))
|
||||
resp = app.get('/api/roles/thisisnotavalidroleslug/', status=404)
|
||||
|
||||
|
||||
def test_api_get_role_list(app, admin_ou1, role_ou1, role_random):
|
||||
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
|
||||
resp = app.get('/api/roles/')
|
||||
|
||||
role_fields = ['slug', 'uuid', 'name', 'ou']
|
||||
|
||||
assert len(resp.json['results'])
|
||||
|
||||
for role_dict in resp.json['results']:
|
||||
for field in role_fields:
|
||||
assert field in role_dict
|
||||
|
|
Loading…
Reference in New Issue