This commit is contained in:
parent
31dc47946d
commit
a55092a5ce
|
@ -321,6 +321,87 @@ def user(request):
|
|||
return request.user.to_json()
|
||||
|
||||
|
||||
class RoleSerializer(serializers.ModelSerializer):
|
||||
ou = serializers.SlugRelatedField(
|
||||
many=False,
|
||||
required=False,
|
||||
default=CreateOnlyDefault(get_default_ou),
|
||||
queryset=get_ou_model().objects.all(),
|
||||
slug_field='slug')
|
||||
|
||||
@property
|
||||
def user(self):
|
||||
return self.context['request'].user
|
||||
|
||||
def __init__(self, instance=None, **kwargs):
|
||||
super(RoleSerializer, self).__init__(instance, **kwargs)
|
||||
if self.instance:
|
||||
self.fields['ou'].read_only = True
|
||||
|
||||
def create(self, validated_data):
|
||||
ou = validated_data.get('ou')
|
||||
# Creating roles also means being allowed to within the OU:
|
||||
if not self.user.has_ou_perm('a2_rbac.add_role', ou):
|
||||
raise PermissionDenied(u'User %s can\'t create role in OU %s' % (self.user, ou))
|
||||
return super(RoleSerializer, self).create(validated_data)
|
||||
|
||||
def update(self, instance, validated_data):
|
||||
# Check role-updating permissions:
|
||||
if not self.user.has_perm('a2_rbac.change_role', obj=instance):
|
||||
raise PermissionDenied(u'User %s can\'t change role %s' % (self.user, instance))
|
||||
super(RoleSerializer, self).update(instance, validated_data)
|
||||
return instance
|
||||
|
||||
def partial_update(self, instance, validated_data):
|
||||
# Check role-updating permissions:
|
||||
if not self.user.has_perm('a2_rbac.change_role', obj=instance):
|
||||
raise PermissionDenied(u'User %s can\'t change role %s' % (self.user, instance))
|
||||
super(RoleSerializer, self).partial_update(instance, validated_data)
|
||||
return instance
|
||||
|
||||
class Meta:
|
||||
model = get_role_model()
|
||||
fields = ('uuid', 'name', 'slug', 'ou',)
|
||||
extra_kwargs = {'uuid': {'read_only': True}}
|
||||
|
||||
|
||||
class BaseOrganizationalUnitSerializer(serializers.ModelSerializer):
|
||||
class Meta:
|
||||
model = get_ou_model()
|
||||
fields = '__all__'
|
||||
|
||||
|
||||
class OrganizationalUnitConciseSerializer(BaseOrganizationalUnitSerializer):
|
||||
class Meta(BaseOrganizationalUnitSerializer.Meta):
|
||||
fields = ('uuid', 'slug', 'name',)
|
||||
|
||||
|
||||
class ServiceConciseSerializer(serializers.ModelSerializer):
|
||||
ou = serializers.SlugRelatedField(
|
||||
many=False,
|
||||
required=False,
|
||||
read_only=True,
|
||||
slug_field='slug')
|
||||
|
||||
class Meta:
|
||||
model = Service
|
||||
fields = ('ou', 'slug', 'name')
|
||||
|
||||
|
||||
class RoleCustomField(RoleSerializer):
|
||||
ou = OrganizationalUnitConciseSerializer(
|
||||
many=False,
|
||||
required=False,
|
||||
read_only=True)
|
||||
service = ServiceConciseSerializer(
|
||||
many=False,
|
||||
required=False,
|
||||
read_only=True)
|
||||
|
||||
class Meta(RoleSerializer.Meta):
|
||||
fields = ('description', 'name', 'ou', 'service', 'slug', 'uuid',)
|
||||
|
||||
|
||||
class BaseUserSerializer(serializers.ModelSerializer):
|
||||
ou = serializers.SlugRelatedField(
|
||||
queryset=get_ou_model().objects.all(),
|
||||
|
@ -335,6 +416,7 @@ class BaseUserSerializer(serializers.ModelSerializer):
|
|||
default=CreateOnlyDefault(utils.generate_password),
|
||||
required=False)
|
||||
force_password_reset = serializers.BooleanField(write_only=True, required=False, default=False)
|
||||
roles = RoleCustomField(many=True, read_only=True, source='roles_and_parents')
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(BaseUserSerializer, self).__init__(*args, **kwargs)
|
||||
|
@ -490,50 +572,6 @@ class BaseUserSerializer(serializers.ModelSerializer):
|
|||
exclude = ('date_joined', 'user_permissions', 'groups', 'last_login')
|
||||
|
||||
|
||||
class RoleSerializer(serializers.ModelSerializer):
|
||||
ou = serializers.SlugRelatedField(
|
||||
many=False,
|
||||
required=False,
|
||||
default=CreateOnlyDefault(get_default_ou),
|
||||
queryset=get_ou_model().objects.all(),
|
||||
slug_field='slug')
|
||||
|
||||
@property
|
||||
def user(self):
|
||||
return self.context['request'].user
|
||||
|
||||
def __init__(self, instance=None, **kwargs):
|
||||
super(RoleSerializer, self).__init__(instance, **kwargs)
|
||||
if self.instance:
|
||||
self.fields['ou'].read_only = True
|
||||
|
||||
def create(self, validated_data):
|
||||
ou = validated_data.get('ou')
|
||||
# Creating roles also means being allowed to within the OU:
|
||||
if not self.user.has_ou_perm('a2_rbac.add_role', ou):
|
||||
raise PermissionDenied(u'User %s can\'t create role in OU %s' % (self.user, ou))
|
||||
return super(RoleSerializer, self).create(validated_data)
|
||||
|
||||
def update(self, instance, validated_data):
|
||||
# Check role-updating permissions:
|
||||
if not self.user.has_perm('a2_rbac.change_role', obj=instance):
|
||||
raise PermissionDenied(u'User %s can\'t change role %s' % (self.user, instance))
|
||||
super(RoleSerializer, self).update(instance, validated_data)
|
||||
return instance
|
||||
|
||||
def partial_update(self, instance, validated_data):
|
||||
# Check role-updating permissions:
|
||||
if not self.user.has_perm('a2_rbac.change_role', obj=instance):
|
||||
raise PermissionDenied(u'User %s can\'t change role %s' % (self.user, instance))
|
||||
super(RoleSerializer, self).partial_update(instance, validated_data)
|
||||
return instance
|
||||
|
||||
class Meta:
|
||||
model = get_role_model()
|
||||
fields = ('uuid', 'name', 'slug', 'ou',)
|
||||
extra_kwargs = {'uuid': {'read_only': True}}
|
||||
|
||||
|
||||
class UsersFilter(FilterSet):
|
||||
class Meta:
|
||||
model = get_user_model()
|
||||
|
@ -732,12 +770,6 @@ class RoleMembershipsAPI(ExceptionHandlerMixin, APIView):
|
|||
role_memberships = RoleMembershipsAPI.as_view()
|
||||
|
||||
|
||||
class BaseOrganizationalUnitSerializer(serializers.ModelSerializer):
|
||||
class Meta:
|
||||
model = get_ou_model()
|
||||
fields = '__all__'
|
||||
|
||||
|
||||
class OrganizationalUnitAPI(ExceptionHandlerMixin, ModelViewSet):
|
||||
permission_classes = (DjangoPermission('a2_rbac.search_organizationalunit'),)
|
||||
serializer_class = BaseOrganizationalUnitSerializer
|
||||
|
|
|
@ -15,6 +15,7 @@ from pytest_django.migrations import DisableMigrations
|
|||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
from authentic2_idp_oidc.models import OIDCClient
|
||||
from authentic2.authentication import OIDCUser
|
||||
from authentic2. models import Service
|
||||
|
||||
import utils
|
||||
|
||||
|
@ -41,6 +42,22 @@ def ou2(db):
|
|||
return OU.objects.create(name='OU2', slug='ou2')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def service1(db):
|
||||
return Service.objects.create(
|
||||
ou=get_default_ou(),
|
||||
name='Service 1',
|
||||
slug='service1')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def service2(db):
|
||||
return Service.objects.create(
|
||||
ou=get_default_ou(),
|
||||
name='Service 2',
|
||||
slug='service2')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ou_rando(db):
|
||||
OU = get_ou_model()
|
||||
|
|
|
@ -365,7 +365,7 @@ def test_api_users_create(settings, app, api_user):
|
|||
|
||||
resp = app.post_json('/api/users/', params=payload, status=status)
|
||||
if api_user.is_superuser or api_user.roles.exists():
|
||||
assert set(['ou', 'id', 'uuid', 'is_staff', 'is_superuser',
|
||||
assert set(['ou', 'id', 'uuid', 'is_staff', 'is_superuser', 'roles',
|
||||
'first_name', 'first_name_verified', 'last_name',
|
||||
'last_name_verified', 'date_joined', 'last_login',
|
||||
'username', 'password', 'email', 'is_active', 'title',
|
||||
|
@ -431,7 +431,7 @@ def test_api_users_create(settings, app, api_user):
|
|||
|
||||
resp = app.post_json('/api/users/', params=payload, status=status)
|
||||
if api_user.is_superuser or api_user.roles.exists():
|
||||
assert set(['ou', 'id', 'uuid', 'is_staff', 'is_superuser',
|
||||
assert set(['ou', 'id', 'uuid', 'is_staff', 'is_superuser', 'roles',
|
||||
'first_name', 'first_name_verified', 'last_name',
|
||||
'last_name_verified', 'date_joined', 'last_login',
|
||||
'username', 'password', 'email', 'is_active', 'title',
|
||||
|
@ -1107,3 +1107,64 @@ def test_validate_password_regex(app, settings):
|
|||
assert response.json['checks'][3]['result'] is True
|
||||
assert response.json['checks'][4]['label'] == 'must contain "ok"'
|
||||
assert response.json['checks'][4]['result'] is True
|
||||
|
||||
|
||||
def test_roles_in_users_api(app, admin, ou1, ou2, service1, service2):
|
||||
User = get_user_model()
|
||||
user1 = User(username='john.doe', email='john.doe@example.com')
|
||||
user1.set_password('password')
|
||||
user1.save()
|
||||
user2 = User(username='bob.smith', email='bob.smith@example.com')
|
||||
user2.set_password('password')
|
||||
user2.save()
|
||||
|
||||
Role = get_role_model()
|
||||
role1 = Role.objects.create(name='Role1')
|
||||
role1.ou = ou1
|
||||
role1.service = service1
|
||||
role1.members.add(user1)
|
||||
role2 = Role.objects.create(name='Role2')
|
||||
role2.ou = ou2
|
||||
role2.service = service2
|
||||
role2.members.add(user1)
|
||||
role2.members.add(user2)
|
||||
role2.add_parent(role1)
|
||||
role3 = Role.objects.create(name='Role3')
|
||||
role3.ou = get_default_ou()
|
||||
role3.members.add(user2)
|
||||
role3.add_parent(role2)
|
||||
|
||||
app.authorization = ('Basic', (admin.username, admin.username))
|
||||
response = app.get(u'/api/users/', status=200)
|
||||
assert len(response.json['results']) == 3
|
||||
for user in response.json['results']:
|
||||
assert user['roles']
|
||||
for role in user['roles']:
|
||||
assert set(['slug', 'name', 'uuid', 'service', 'description',
|
||||
'ou']) == set(role.keys())
|
||||
role_object = Role.objects.get(uuid=role['uuid'])
|
||||
if getattr(role_object, 'ou', None) is not None:
|
||||
for ou_attr in ('uuid', 'slug', 'name'):
|
||||
assert getattr(role_object.ou, ou_attr) == role['ou'][ou_attr]
|
||||
if getattr(role_object, 'service', None) is not None:
|
||||
for service_attr in ('slug','name'):
|
||||
assert getattr(role_object.service, service_attr) == role['service'][attr]
|
||||
# Service OUs identified by their slug:
|
||||
if getattr(role_object.service, 'ou', None) is not None:
|
||||
assert role_object.service.ou.slug == role['service']['ou']
|
||||
|
||||
url = u'/api/users/%s/' % admin.uuid
|
||||
response = app.get(url, status=200)
|
||||
assert len(response.json['roles']) == 4
|
||||
assert set(role['slug'] for role in response.json['roles']) == set([
|
||||
'_a2-manager',
|
||||
'_a2-manager-of-users',
|
||||
'_a2-manager-of-roles',
|
||||
'_a2-manager-of-organizational-units'])
|
||||
|
||||
for user in [user1, user2]:
|
||||
url = u'/api/users/%s/' % user.uuid
|
||||
response = app.get(url, status=200)
|
||||
roles = user.roles_and_parents().all()
|
||||
assert len(response.json['roles']) == roles.count()
|
||||
assert set(role['name'] for role in response.json['roles']) == set(roles.values_list('name', flat=True))
|
||||
|
|
Loading…
Reference in New Issue