API: routes utilisant le slug pour les OUs et les rôles (#52226) #224

Merged
bdauvergne merged 5 commits from wip/52226-API-utilisation-avec-slug-des-ro into main 2024-01-15 14:18:22 +01:00
13 changed files with 947 additions and 1122 deletions

View File

@ -14,7 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.urls import path, re_path
from django.urls import include, path, re_path
from . import api_views
@ -31,31 +31,6 @@ urlpatterns = [
api_views.user_service_data,
name='a2-api-user-service-data',
),
re_path(
r'^roles/(?P<role_uuid>[\w+]*)/members/$',
api_views.roles_members,
name='a2-api-role-members-list',
),
re_path(
r'^roles/(?P<role_uuid>[\w+]*)/members/(?P<member_uuid>[^/]+)/$',
api_views.role_membership,
name='a2-api-role-member',
),
re_path(
r'^roles/(?P<role_uuid>[\w+]*)/relationships/members/$',
api_views.role_memberships,
name='a2-api-role-members',
),
re_path(
r'^roles/(?P<role_uuid>[0-9a-z]{32})/parents/$',
api_views.roles_parents,
name='a2-api-role-parents',
),
re_path(
r'^roles/(?P<role_uuid>[0-9a-z]{32})/relationships/parents/$',
api_views.roles_parents_relationships,
name='a2-api-role-parents-relationships',
),
path('check-password/', api_views.check_password, name='a2-api-check-password'),
path('check-api-client/', api_views.check_api_client, name='a2-api-check-api-client'),
path('validate-password/', api_views.validate_password, name='a2-api-validate-password'),
@ -64,4 +39,21 @@ urlpatterns = [
path('authn-healthcheck/', api_views.authn_healthcheck, name='a2-api-authn-healthcheck'),
]
# other roles APIs
roles_urls = [
path('members/', api_views.roles_members),
path('members/<user_uuid:member_uuid>/', api_views.role_membership),
path('relationships/members/', api_views.role_memberships),
path('parents/', api_views.roles_parents),
path('relationships/parents/', api_views.roles_parents_relationships),
]
urlpatterns += [
path('roles/<a2_uuid:role_uuid>/', include(roles_urls)),
path('roles/<slug:ou_slug>:<slug:service_slug>:<slug:role_slug>/', include(roles_urls)),
path('roles/<slug:ou_slug>:<slug:role_slug>/', include(roles_urls)),
path('roles/<slug:role_slug>/', include(roles_urls)),
]
# main router
urlpatterns += api_views.router.urls

View File

@ -883,6 +883,7 @@ class RolesAPI(api_mixins.GetOrCreateMixinView, ExceptionHandlerMixin, ModelView
filterset_class = RolesFilter
lookup_field = 'uuid'
queryset = Role.objects.all()
lookup_value_regex = r'[0-9a-f]{32}'
def filter_queryset(self, queryset):
queryset = super().filter_queryset(queryset)
@ -903,11 +904,52 @@ class RolesAPI(api_mixins.GetOrCreateMixinView, ExceptionHandlerMixin, ModelView
self.request.journal.record('manager.role.edit', role=serializer.instance, form=serializer, api=True)
class RolesMembersAPI(UsersAPI):
def initial(self, request, *args, **kwargs):
super().initial(request, *args, **kwargs)
self.role = get_object_or_404(Role, uuid=kwargs['role_uuid'])
class RolesBySlugAPI(RolesAPI):
lookup_field = 'slug'
lookup_value_regex = r'(?:[-a-zA-Z0-9_]+:)?(?:[-a-zA-Z0-9_]+:)?[-a-zA-Z0-9_]+'
def get_object(self):
queryset = self.filter_queryset(self.get_queryset())
lookup_value = self.kwargs['slug']
if lookup_value.count(':') > 1:
ou_slug, service_slug, role_slug = lookup_value.split(':', 2)
filter_kwargs = {'ou__slug': ou_slug, 'service_slug': service_slug, 'slug': role_slug}
elif lookup_value.count(':') == 1:
ou_slug, role_slug = lookup_value.split(':', 1)
filter_kwargs = {'ou__slug': ou_slug, 'slug': role_slug}
else:
filter_kwargs = {'slug': lookup_value}
try:
obj = get_object_or_404(queryset, **filter_kwargs)
except Role.MultipleObjectsReturned:
raise api_mixins.Conflict(_('Multiple roles found'), 'multiple-roles-found')
# May raise a permission denied
self.check_object_permissions(self.request, obj)
return obj
class RolesMixin:
def initial(self, request, *, role_uuid=None, ou_slug=None, service_slug=None, role_slug=None, **kwargs):
super().initial(request, **kwargs)
role_qs = request.user.filter_by_perm('a2_rbac.view_role', Role.objects.all())
if role_uuid:
filter_kwargs = {'uuid': role_uuid}
elif ou_slug and service_slug and role_slug:
filter_kwargs = {'ou__slug': ou_slug, 'service_slug': service_slug, 'slug': role_slug}
elif ou_slug and role_slug:
filter_kwargs = {'ou__slug': ou_slug, 'slug': role_slug}
else:
filter_kwargs = {'slug': role_slug}
try:
self.role = get_object_or_404(role_qs, **filter_kwargs)
except Role.MultipleObjectsReturned:
raise api_mixins.Conflict(_('Multiple roles found'), 'multiple-roles-found')
class RolesMembersAPI(RolesMixin, UsersAPI):
def get_queryset(self):
if self.request.GET.get('nested', 'false').lower() in ('true', '1'):
qs = self.role.all_members()
@ -1080,30 +1122,29 @@ class UserServiceDataAPI(ExceptionHandlerMixin, APIView):
user_service_data = UserServiceDataAPI.as_view()
class RoleMembershipAPI(ExceptionHandlerMixin, APIView):
class RoleMembershipAPI(ExceptionHandlerMixin, RolesMixin, APIView):
permission_classes = (permissions.IsAuthenticated,)
def initial(self, request, *args, **kwargs):
super().initial(request, *args, **kwargs)
User = get_user_model()
self.role = get_object_or_404(Role, uuid=kwargs['role_uuid'])
self.member = get_object_or_404(User, uuid=kwargs['member_uuid'])
def initial(self, request, *, member_uuid=None, **kwargs):
super().initial(request, **kwargs)
self.member_uuid = member_uuid
def get(self, request, *args, **kwargs):
if not request.user.has_perm('a2_rbac.view_role', obj=self.role):
raise PermissionDenied('User not allowed to view role')
if self.request.GET.get('nested', 'false').lower() in ('true', '1'):
qs = self.role.all_members()
user_qs = self.role.all_members()
else:
qs = self.role.members.all()
member = get_object_or_404(qs, uuid=kwargs['member_uuid'])
user_qs = self.role.members.all()
user_qs = request.user.filter_by_perm('custom_user.view_user', user_qs)
member = get_object_or_404(user_qs, uuid=self.member_uuid)
return Response(BaseUserSerializer(member).data)
def post(self, request, *args, **kwargs):
if not request.user.has_perm('a2_rbac.manage_members_role', obj=self.role):
raise PermissionDenied('User not allowed to manage role members')
self.role.members.add(self.member)
request.journal.record('manager.role.membership.grant', role=self.role, member=self.member, api=True)
user_qs = request.user.filter_by_perm('custom_user.view_user', User.objects.all())
new_member = get_object_or_404(user_qs, uuid=self.member_uuid)
self.role.members.add(new_member)
request.journal.record('manager.role.membership.grant', role=self.role, member=new_member, api=True)
return Response(
{'result': 1, 'detail': _('User successfully added to role')}, status=status.HTTP_201_CREATED
)
@ -1111,10 +1152,10 @@ class RoleMembershipAPI(ExceptionHandlerMixin, APIView):
def delete(self, request, *args, **kwargs):
if not request.user.has_perm('a2_rbac.manage_members_role', obj=self.role):
raise PermissionDenied('User not allowed to manage role members')
self.role.members.remove(self.member)
request.journal.record(
'manager.role.membership.removal', role=self.role, member=self.member, api=True
)
user_qs = request.user.filter_by_perm('custom_user.view_user', User.objects.all())
member = get_object_or_404(user_qs, uuid=self.member_uuid)
self.role.members.remove(member)
request.journal.record('manager.role.membership.removal', role=self.role, member=member, api=True)
return Response(
{'result': 1, 'detail': _('User successfully removed from role')}, status=status.HTTP_200_OK
)
@ -1127,10 +1168,13 @@ class RoleMembershipsAPI(ExceptionHandlerMixin, APIView):
permission_classes = (permissions.IsAuthenticated,)
http_method_names = ['post', 'put', 'patch', 'delete']
def initial(self, request, *args, **kwargs):
super().initial(request, *args, **kwargs)
def initial(self, request, *, role_uuid=None, role_slug=None, **kwargs):
super().initial(request, role_uuid=role_uuid, role_slug=role_slug, **kwargs)
User = get_user_model()
self.role = get_object_or_404(Role, uuid=kwargs['role_uuid'])
if role_uuid:
self.role = get_object_or_404(Role, uuid=role_uuid)
if role_slug:
self.role = get_object_or_404(Role, slug=role_slug)
self.members = set()
perm = 'a2_rbac.manage_members_role'
@ -1218,7 +1262,7 @@ class RoleParentSerializer(RoleSerializer):
fields = RoleSerializer.Meta.fields + ('direct',)
class RolesParentsAPI(PublikMixin, GenericAPIView):
class RolesParentsAPI(PublikMixin, RolesMixin, GenericAPIView):
permission_classes = [
DjangoRBACPermission(
perms_map={
@ -1232,13 +1276,9 @@ class RolesParentsAPI(PublikMixin, GenericAPIView):
serializer_class = RoleParentSerializer
queryset = Role.objects.all()
def get(self, request, *, role_uuid, **kwargs):
def get(self, request, *args, **kwargs):
direct = None if 'all' in self.request.GET else True
role = get_object_or_404(Role, uuid=role_uuid)
self.check_object_permissions(self.request, role)
qs = self.get_queryset()
qs = self.queryset.filter(pk=role.pk)
qs = qs.parents(include_self=False, annotate=not direct, direct=direct)
qs = self.role.parents(include_self=False, annotate=not direct, direct=direct)
qs = request.user.filter_by_perm('a2_rbac.search_role', qs)
qs = qs.order_by('id')
serializer = self.get_serializer(qs, many=True)
@ -1260,7 +1300,7 @@ class RoleParentingSerializer(serializers.ModelSerializer):
]
class RolesParentsRelationshipsAPI(PublikMixin, GenericAPIView):
class RolesParentsRelationshipsAPI(PublikMixin, RolesMixin, GenericAPIView):
permission_classes = [
DjangoRBACPermission(
perms_map={
@ -1278,42 +1318,33 @@ class RolesParentsRelationshipsAPI(PublikMixin, GenericAPIView):
serializer_class = RoleParentingSerializer
queryset = RoleParenting.alive.all()
def filter_queryset(self, queryset):
if 'all' in self.request.GET:
qs = queryset.filter(child__uuid=self.kwargs['role_uuid'])
else:
qs = queryset.filter(child__uuid=self.kwargs['role_uuid'], direct=True)
def get(self, request, *args, **kwargs):
return self.list(request)
def list(self, request):
qs = RoleParenting.alive.filter(child=self.role)
if 'all' not in self.request.GET:
qs = qs.filter(direct=True)
qs = qs.filter(parent__in=self.request.user.filter_by_perm('a2_rbac.view_role', Role.objects.all()))
qs = qs.order_by('id')
return qs
def get(self, request, *, role_uuid, **kwargs):
role = get_object_or_404(Role, uuid=role_uuid)
self.check_object_permissions(self.request, role)
return self.list()
def list(self):
queryset = self.filter_queryset(self.get_queryset())
serializer = self.get_serializer(queryset, many=True)
serializer = self.get_serializer(qs, many=True)
return Response({'err': 0, 'data': serializer.data})
def post(self, request, *, role_uuid, **kwargs):
def post(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
parent = serializer.validated_data['parent']
self.check_object_permissions(self.request, parent)
child = get_object_or_404(Role.objects.all(), uuid=role_uuid)
child.add_parent(parent)
return self.list()
self.role.add_parent(parent)
return self.list(request)
def delete(self, request, *, role_uuid, **kwargs):
def delete(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
parent = serializer.validated_data['parent']
self.check_object_permissions(self.request, parent)
role = get_object_or_404(Role, uuid=role_uuid)
role.remove_parent(parent)
return self.list()
self.role.remove_parent(parent)
return self.list(request)
roles_parents_relationships = RolesParentsRelationshipsAPI.as_view()
@ -1336,15 +1367,23 @@ class OrganizationalUnitAPI(api_mixins.GetOrCreateMixinView, ExceptionHandlerMix
permission_classes = (DjangoPermission('a2_rbac.search_organizationalunit'),)
serializer_class = BaseOrganizationalUnitSerializer
lookup_field = 'uuid'
lookup_value_regex = r'[0-9a-f]{32}'
def get_queryset(self):
return OrganizationalUnit.objects.all()
class OrganizationalUnitBySlugAPI(OrganizationalUnitAPI):
lookup_field = 'slug'
lookup_value_regex = r'[-a-zA-Z0-9_]+'
router = SimpleRouter()
router.register(r'users', UsersAPI, basename='a2-api-users')
router.register(r'ous', OrganizationalUnitAPI, basename='a2-api-ous')
router.register(r'roles', RolesAPI, basename='a2-api-roles')
router.register(r'ous', OrganizationalUnitAPI, basename='a2-api-ous-by-uuid')
router.register(r'ous', OrganizationalUnitBySlugAPI, basename='a2-api-ous-by-slug')
router.register(r'roles', RolesAPI)
router.register(r'roles', RolesBySlugAPI)
class CheckPasswordSerializer(serializers.Serializer):

View File

@ -28,3 +28,8 @@ class Authentic2Config(AppConfig):
def ready(self):
plugins.init()
debug.HIDDEN_SETTINGS = re.compile('API|TOKEN|KEY|SECRET|PASS|PROFANITIES_LIST|SIGNATURE|LDAP')
# register convertes
from authentic2.utils.converters import register_converters
register_converters()

View File

@ -629,6 +629,14 @@ class User(AbstractBaseUser):
self.email_verified = False
self.email_verified_date = None
def add_role(self, role, ou=None):
from authentic2.a2_rbac.models import Role
if isinstance(role, Role):
role.members.add(self)
elif isinstance(role, str):
Role.objects.get(name=role).members.add(self)
class DeletedUser(models.Model):
deleted = models.DateTimeField(verbose_name=_('Deletion date'), auto_now_add=True)

View File

@ -0,0 +1,19 @@
# Authentic2 © Entr'ouvert
from django.urls.converters import SlugConverter, register_converter
class ObjectUUID:
regex = r'[0-9a-f]{32}'
def to_python(self, value):
return value.replace('-', '')
def to_url(self, value):
return str(value)
def register_converters():
register_converter(ObjectUUID, 'a2_uuid')
# legacy, some instances have non UUID like string as User.uuid
register_converter(SlugConverter, 'user_uuid')

View File

@ -27,7 +27,6 @@ from django.core import mail
from django.test.utils import override_settings
from django.urls import reverse
from django.utils.encoding import force_str
from django.utils.text import slugify
from django.utils.timezone import now
from requests.models import Response
@ -211,19 +210,6 @@ def test_api_users_list_limit(app, superuser):
assert next_url == resp.json['next']
def test_api_member_users_list(app, user, simple_role):
app.authorization = ('Basic', (user.username, user.username))
url = reverse('a2-api-role-members-list', kwargs={'role_uuid': simple_role.uuid})
simple_role.members.add(user)
resp = app.get(url)
assert isinstance(resp.json, dict)
assert {'previous', 'next', 'results'} == set(resp.json.keys())
assert resp.json['previous'] is None
assert resp.json['next'] is None
if resp.json['results']:
assert resp.json['results'][0]['full_name']
def test_api_users_update_with_email_verified(settings, app, admin, simple_user):
simple_user.set_email_verified(True)
simple_user.save()
@ -387,39 +373,6 @@ def test_api_users_list_by_authorized_service(app, superuser):
assert len(resp.json['results']) == 4
def test_api_member_users_list_by_authorized_service(app, superuser, simple_role):
app.authorization = ('Basic', (superuser.username, superuser.username))
url = reverse('a2-api-role-members-list', kwargs={'role_uuid': simple_role.uuid})
user1 = User.objects.create(username='user1')
user2 = User.objects.create(username='user2')
user3 = User.objects.create(username='user3')
role1 = Role.objects.create(name='role1')
role2 = Role.objects.create(name='role2')
role1.add_child(role2)
user1.roles.set([role1])
user2.roles.set([role2])
service1 = Service.objects.create(ou=get_default_ou(), name='service1', slug='service1')
service1.add_authorized_role(role1)
Service.objects.create(ou=get_default_ou(), name='service2', slug='service2')
for user in superuser, user1, user2, user3:
simple_role.members.add(user)
resp = app.get(url)
assert len(resp.json['results']) == 4
resp = app.get(url + '?service-ou=default&service-slug=service1')
assert len(resp.json['results']) == 2
assert {user['username'] for user in resp.json['results']} == {'user1', 'user2'}
resp = app.get(url + '?service-ou=default&service-slug=service2')
assert len(resp.json['results']) == 4
def test_api_users_list_search_text(app, superuser):
app.authorization = ('Basic', (superuser.username, superuser.username))
User = get_user_model()
@ -435,23 +388,6 @@ def test_api_users_list_search_text(app, superuser):
assert len(results) == 0
def test_api_member_users_list_search_text(app, superuser, simple_role):
app.authorization = ('Basic', (superuser.username, superuser.username))
url = reverse('a2-api-role-members-list', kwargs={'role_uuid': simple_role.uuid})
User = get_user_model()
someuser = User.objects.create(username='someuser')
simple_role.members.add(someuser)
resp = app.get(url + '?q=some')
results = resp.json['results']
assert len(results) == 1
assert results[0]['username'] == 'someuser'
someuser.delete()
resp = app.get(url + '?q=some')
results = resp.json['results']
assert len(results) == 0
def test_api_users_create(settings, app, api_user):
from authentic2_idp_oidc.models import OIDCAuthorization, OIDCClient
@ -873,223 +809,6 @@ def test_users_email(app, ou1, admin, user_ou1, mailoutbox):
assert 'https://testserver/accounts/change-email/verify/' in mail.body
def test_api_delete_role(app, admin_ou1, role_ou1):
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
app.delete(f'/api/roles/{role_ou1.uuid}/')
assert not Role.objects.filter(slug='role_ou1').exists()
assert_event('manager.role.deletion', user=admin_ou1, api=True, role_name=role_ou1.name)
def test_api_delete_role_unauthorized(app, simple_user, role_ou1):
app.authorization = ('Basic', (simple_user.username, simple_user.username))
app.delete(f'/api/roles/{role_ou1.uuid}/', status=404)
assert len(Role.objects.filter(slug='role_ou1'))
def test_api_patch_role(app, admin_ou1, role_ou1):
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
role_data = {
'slug': 'updated-role',
}
app.patch_json(f'/api/roles/{role_ou1.uuid}/', params=role_data)
assert_event('manager.role.edit', user=admin_ou1, api=True, role_name=role_ou1.name)
# The role API won't change the organizational unit attribute:
role_ou1.refresh_from_db()
assert role_ou1.name == 'role_ou1'
assert role_ou1.slug == 'updated-role'
assert role_ou1.ou.slug == 'ou1'
def test_api_patch_role_unauthorized(app, simple_user, role_ou1):
app.authorization = ('Basic', (simple_user.username, simple_user.username))
role_data = {
'slug': 'updated-role',
}
app.patch_json(f'/api/roles/{role_ou1.uuid}/', params=role_data, status=404)
role_ou1.refresh_from_db()
assert role_ou1.slug == 'role_ou1'
assert not Role.objects.filter(slug='updated-role').exists()
def test_api_put_role(app, admin_ou1, role_ou1, ou1):
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
role_data = {'name': 'updated-role', 'slug': 'updated-role', 'ou': 'ou2'}
app.put_json(f'/api/roles/{role_ou1.uuid}/', params=role_data)
role_ou1.refresh_from_db()
assert role_ou1.name == 'updated-role'
assert role_ou1.slug == 'updated-role'
assert role_ou1.ou.slug == 'ou1'
assert_event('manager.role.edit', user=admin_ou1, api=True, role_name=role_ou1.name)
def test_api_put_role_unauthorized(app, simple_user, role_ou1, ou1):
app.authorization = ('Basic', (simple_user.username, simple_user.username))
role_data = {'name': 'updated-role', 'slug': 'updated-role', 'ou': 'ou2'}
app.put_json(f'/api/roles/{role_ou1.uuid}/', params=role_data, status=404)
role_ou1.refresh_from_db()
assert role_ou1.name == 'role_ou1'
assert role_ou1.slug == 'role_ou1'
assert role_ou1.ou.slug == 'ou1'
def test_api_post_role(app, admin_ou1, ou1):
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
role_data = {'slug': 'coffee-manager', 'name': 'Coffee Manager', 'ou': 'ou1'}
resp = app.post_json('/api/roles/', params=role_data)
assert_event('manager.role.creation', user=admin_ou1, api=True, role_name='Coffee Manager')
assert isinstance(resp.json, dict)
uuid = resp.json['uuid']
# Check attribute values against the server's response:
assert set(role_data.items()) < set(resp.json.items())
# Check attributes values against the DB:
role = Role.objects.get(uuid=uuid)
assert role.slug == role_data['slug']
assert role.name == role_data['name']
assert role.ou.slug == role_data['ou']
def test_api_post_role_no_ou(app, superuser):
app.authorization = ('Basic', (superuser.username, superuser.username))
role_data = {
'slug': 'tea-manager',
'name': 'Tea Manager',
}
resp = app.post_json('/api/roles/', params=role_data)
uuid = resp.json['uuid']
role = Role.objects.get(uuid=uuid)
assert role.ou == get_default_ou()
def test_api_post_role_no_slug(app, superuser):
app.authorization = ('Basic', (superuser.username, superuser.username))
role_data = {
'name': 'Some Role',
}
resp = app.post_json('/api/roles/', params=role_data)
uuid = resp.json['uuid']
role = Role.objects.get(uuid=uuid)
assert role.slug == slugify(role.name)
assert role.slug == slugify(role_data['name'])
# another call with same role name
role_data = {
'name': 'Some Role',
}
resp = app.post_json('/api/roles/', params=role_data, status=400)
assert resp.json['errors']['__all__'] == [
'The fields name, ou must make a unique set.',
'The fields slug, ou must make a unique set.',
]
# no slug no name
role_data = {
'id': 42,
}
resp = app.post_json('/api/roles/', params=role_data, status=400)
assert resp.json['errors']['name'] == ['This field is required.']
def test_api_post_ou_no_slug(app, superuser):
app.authorization = ('Basic', (superuser.username, superuser.username))
ou_data = {
'name': 'Some Organizational Unit',
}
resp = app.post_json('/api/ous/', params=ou_data)
uuid = resp.json['uuid']
ou = OU.objects.get(uuid=uuid)
assert ou.id != get_default_ou().id
assert ou.slug == slugify(ou.name)
assert ou.slug == slugify(ou_data['name'])
# another call with same ou name
ou_data = {
'name': 'Some Organizational Unit',
}
resp = app.post_json('/api/ous/', params=ou_data, status=400)
assert resp.json['errors']['__all__'] == [
'The fields name must make a unique set.',
'The fields slug must make a unique set.',
]
# no slug no name
ou_data = {
'id': 42,
}
resp = app.post_json('/api/ous/', params=ou_data, status=400)
assert resp.json['errors']['name'] == ['This field is required.']
def test_api_post_role_unauthorized(app, simple_user, ou1):
app.authorization = ('Basic', (simple_user.username, simple_user.username))
role_data = {'slug': 'mocca-manager', 'name': 'Mocca Manager', 'ou': 'ou1'}
app.post_json('/api/roles/', params=role_data, status=403)
assert not Role.objects.filter(slug='mocca-manager').exists()
def test_api_get_role_description(app, admin_rando_role, role_random):
app.authorization = ('Basic', (admin_rando_role.username, admin_rando_role.username))
resp = app.get(f'/api/roles/{role_random.uuid}/')
assert resp.json['slug'] == 'rando'
assert resp.json['ou'] == 'ou_rando'
def test_api_get_role_not_found(app, superuser):
app.authorization = ('Basic', (superuser.username, superuser.username))
app.get('/api/roles/thisisnotavalidroleslug/', status=404)
def test_api_get_role_list(app, admin_ou1, role_ou1, role_random):
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
resp = app.get('/api/roles/')
role_fields = ['slug', 'uuid', 'name', 'ou']
assert len(resp.json['results'])
for role_dict in resp.json['results']:
for field in role_fields:
assert field in role_dict
def test_api_get_role_member_list(app, admin_ou1, user_ou1, role_ou1, role_random):
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
url = reverse('a2-api-role-members-list', kwargs={'role_uuid': role_ou1.uuid})
Attribute.objects.create(kind='birthdate', name='birthdate', label='birthdate', required=True)
user_ou1.attributes.birthdate = datetime.date(2019, 2, 2)
user_ou1.save()
# john.doe is a direct member of role_ou1
role_ou1.members.add(user_ou1)
assert user_ou1 in role_ou1.members.all()
# admin.ou1 is an inherited member of role_ou1
role_random.members.add(admin_ou1)
role_random.add_parent(role_ou1)
assert admin_ou1 not in role_ou1.members.all()
assert admin_ou1 in role_ou1.all_members()
# default api call without nested users
resp = app.get(url)
assert len(resp.json['results']) > 0
for user_dict in resp.json['results']:
assert USER_ATTRIBUTES_SET | {'birthdate', 'birthdate_verified'} == set(user_dict)
assert [x['username'] for x in resp.json['results']] == ['john.doe']
# api call with nested users
resp = app.get(url, params={'nested': 'true'})
assert [x['username'] for x in resp.json['results']] == ['john.doe', 'admin.ou1']
# get users ordered by usernames
resp = app.get(url, params={'nested': 'true', 'ordering': 'username'})
assert [x['username'] for x in resp.json['results']] == ['admin.ou1', 'john.doe']
def test_no_opened_session_cookie_on_api(app, user, settings):
settings.A2_OPENED_SESSION_COOKIE_DOMAIN = 'testserver.local'
app.authorization = ('Basic', (user.username, user.username))

View File

@ -16,7 +16,6 @@
import pytest
from django.contrib.auth import get_user_model
from django.urls import reverse
from authentic2.a2_rbac.models import OrganizationalUnit as OU
from authentic2.a2_rbac.models import Role
@ -83,28 +82,6 @@ def test_filter_users_by_service(app, admin, simple_user, role_random, service):
assert len(resp.json['results']) == 1
def test_filter_member_users_by_service(app, admin, simple_user, simple_role, role_random, service):
app.authorization = ('Basic', (admin.username, admin.username))
url = reverse('a2-api-role-members-list', kwargs={'role_uuid': simple_role.uuid})
simple_role.members.add(admin)
simple_role.members.add(simple_user)
resp = app.get(url)
assert len(resp.json['results']) == 2
resp = app.get(url + '?service-slug=xxx')
assert len(resp.json['results']) == 0
resp = app.get(url + '?service-slug=service&service-ou=default')
assert len(resp.json['results']) == 2
role_random.members.add(simple_user)
AuthorizedRole.objects.get_or_create(service=service, role=role_random)
resp = app.get(url + '?service-slug=service&service-ou=default')
assert len(resp.json['results']) == 1
def test_filter_users_by_last_modification(app, admin, simple_user, freezer):
app.authorization = ('Basic', (admin.username, admin.username))
@ -122,19 +99,3 @@ def test_filter_users_by_last_modification(app, admin, simple_user, freezer):
assert len(resp.json['results']) == 0
resp = app.get('/api/users/', params={'modified__lt': '2019-10-27T02:58:07+00:00'})
assert len(resp.json['results']) == 2
def test_filter_member_users_by_last_modification(app, admin, simple_user, simple_role, freezer):
app.authorization = ('Basic', (admin.username, admin.username))
url = reverse('a2-api-role-members-list', kwargs={'role_uuid': simple_role.uuid})
simple_role.members.add(admin)
simple_role.members.add(simple_user)
freezer.move_to('2019-10-27T02:00:00Z')
admin.save()
simple_user.save()
# AmbiguousTimeError
app.get(url, params={'modified__gt': '2019-10-27T02:58:07'}, status=400)
app.get(url, params={'modified__lt': '2019-10-27T02:58:07'}, status=400)

View File

@ -18,7 +18,6 @@ import pytest
from django.contrib.auth import get_user_model
from authentic2.a2_rbac.models import OrganizationalUnit as OU
from authentic2.a2_rbac.models import Role
from ..utils import login
@ -27,94 +26,6 @@ pytestmark = pytest.mark.django_db
User = get_user_model()
def test_api_post_ou_get_or_create(app, superuser):
app.authorization = ('Basic', (superuser.username, superuser.username))
# first get-or-create? -> create
ou_data = {
'name': 'Some Organizational Unit',
}
resp = app.post_json('/api/ous/', params=ou_data)
uuid = resp.json['uuid']
ou = OU.objects.get(uuid=uuid)
# second get-or-create? -> get
ou_data = {
'name': 'Some Organizational Unit',
'slug': ou.slug,
}
resp = app.post_json('/api/ous/?get_or_create=slug', params=ou_data)
assert resp.json['uuid'] == ou.uuid
# update-or-create? -> update
ou_data = {
'name': 'Another name',
'slug': ou.slug,
}
resp = app.post_json('/api/ous/?update_or_create=slug', params=ou_data)
assert resp.json['uuid'] == ou.uuid
assert OU.objects.get(uuid=resp.json['uuid']).name == ou_data['name']
def test_api_post_role_get_or_create(app, superuser):
app.authorization = ('Basic', (superuser.username, superuser.username))
# first get-or-create? -> create
role_data = {
'name': 'Some Role',
}
resp = app.post_json('/api/roles/', params=role_data)
uuid = resp.json['uuid']
role = Role.objects.get(uuid=uuid)
# second get-or-create? -> get
role_data = {
'name': 'Some Role',
'slug': role.slug,
}
resp = app.post_json('/api/roles/?get_or_create=slug', params=role_data)
assert resp.json['uuid'] == role.uuid
# update-or-create? -> update
role_data = {
'name': 'Another name',
'slug': role.slug,
}
resp = app.post_json('/api/roles/?update_or_create=slug', params=role_data)
assert resp.json['uuid'] == role.uuid
assert Role.objects.get(uuid=resp.json['uuid']).name == role_data['name']
def test_api_post_role_get_or_create_with_ou_selection(app, superuser, ou1, ou2):
app.authorization = ('Basic', (superuser.username, superuser.username))
role_data = {
'name': 'Some Role',
}
resp = app.post_json('/api/roles/', params=role_data)
uuid = resp.json['uuid']
role = Role.objects.get(uuid=uuid)
role.ou = ou1
role.save()
role_data = {'name': 'Some Role', 'slug': role.slug, 'ou': ou1.slug}
resp = app.post_json('/api/roles/?get_or_create=slug&get_or_create=ou', params=role_data)
assert resp.json['uuid'] == role.uuid
role_data = {'name': 'Another name', 'slug': role.slug, 'ou': ou1.slug}
resp = app.post_json('/api/roles/?update_or_create=slug&update_or_create=ou', params=role_data)
assert resp.json['uuid'] == role.uuid
assert Role.objects.get(uuid=resp.json['uuid']).name == role_data['name']
role_data = {'name': 'Some new name', 'slug': role.slug, 'ou': ou2.slug}
resp = app.post_json('/api/roles/?get_or_create=slug&get_or_create=ou', params=role_data)
role.refresh_from_db()
assert role.ou == ou1
assert resp.json['uuid'] != role.uuid
role2 = Role.objects.get(uuid=resp.json['uuid'])
assert role2.ou == ou2
assert role2.name == 'Some new name'
assert role2.slug == role.slug
role_data = {'name': 'Another new name', 'slug': role2.slug, 'ou': ou2.slug}
resp = app.post_json('/api/roles/?update_or_create=slug&update_or_create=ou', params=role_data)
role2.refresh_from_db()
assert resp.json['uuid'] == role2.uuid
assert role2.name == 'Another new name'
def test_api_users_get_or_create(settings, app, admin):
app.authorization = ('Basic', (admin.username, admin.username))
# test missing first_name
@ -325,27 +236,3 @@ def test_api_users_get_or_create_multi_key(settings, app, admin):
assert User.objects.get(id=id).email == 'john.doe@example2.net'
assert User.objects.get(id=id).password != password
assert User.objects.get(id=id).check_password('secret')
def test_api_roles_get_or_create(settings, ou1, app, admin):
app.authorization = ('Basic', (admin.username, admin.username))
# test missing first_name
payload = {
'ou': 'ou1',
'name': 'Role 1',
'slug': 'role-1',
}
resp = app.post_json('/api/roles/?get_or_create=slug', params=payload, status=201)
uuid = resp.json['uuid']
assert Role.objects.get(uuid=uuid).name == 'Role 1'
assert Role.objects.get(uuid=uuid).slug == 'role-1'
assert Role.objects.get(uuid=uuid).ou == ou1
resp = app.post_json('/api/roles/?get_or_create=slug', params=payload, status=200)
assert uuid == resp.json['uuid']
payload['name'] = 'Role 2'
resp = app.post_json('/api/roles/?update_or_create=slug', params=payload, status=200)
assert uuid == resp.json['uuid']
assert Role.objects.get(uuid=uuid).name == 'Role 2'
assert Role.objects.get(uuid=uuid).slug == 'role-1'

137
tests/api/test_ou.py Normal file
View File

@ -0,0 +1,137 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
from django.utils.text import slugify
from authentic2.a2_rbac.models import OrganizationalUnit as OU
from authentic2.a2_rbac.utils import get_default_ou
OU_SERIALIZATION_FIELDS = [
'check_required_on_login_attributes',
'clean_unused_accounts_alert',
'clean_unused_accounts_deletion',
'colour',
'default',
'description',
'email_is_unique',
'home_url',
'id',
'logo',
'name',
'show_username',
'slug',
'user_add_password_policy',
'user_can_reset_password',
'username_is_unique',
'uuid',
'validate_emails',
'phone_is_unique',
]
def test_unauthorized(app):
app.get('/api/ous/', status=401)
class TestAuthenticated:
@pytest.fixture
def app(self, app, admin):
app.authorization = ('Basic', ('admin', 'admin'))
return app
def test_get_by_uuid(self, app):
resp = app.get(f'/api/ous/{get_default_ou().uuid}/')
assert set(resp.json) == set(OU_SERIALIZATION_FIELDS)
assert resp.json['uuid'] == get_default_ou().uuid
def test_get_by_slug(self, app):
resp = app.get('/api/ous/default/')
assert set(resp.json) == set(OU_SERIALIZATION_FIELDS)
assert resp.json['uuid'] == get_default_ou().uuid
def test_post_name_is_required(self, app):
# no slug no name
ou_data = {
'id': 42,
}
assert OU.objects.all().count() == 1
resp = app.post_json('/api/ous/', params=ou_data, status=400)
assert OU.objects.all().count() == 1
assert resp.json == {'errors': {'name': ['This field is required.']}, 'result': 0}
def test_post_name_and_slug_not_unique(self, app):
OU.objects.create(name='Some Organizational Unit')
# another call with same ou name
ou_data = {
'name': 'Some Organizational Unit',
}
assert OU.objects.all().count() == 2
resp = app.post_json('/api/ous/', params=ou_data, status=400)
assert OU.objects.all().count() == 2
assert resp.json == {
'errors': {
'__all__': [
'The fields name must make a unique set.',
'The fields slug must make a unique set.',
]
},
'result': 0,
}
def test_post_no_slug(self, app):
ou_data = {
'name': 'Some Organizational Unit',
}
assert OU.objects.all().count() == 1
resp = app.post_json('/api/ous/', params=ou_data)
assert OU.objects.all().count() == 2
uuid = resp.json['uuid']
ou = OU.objects.get(uuid=uuid)
assert ou.id != get_default_ou().id
assert ou.slug == slugify(ou.name)
assert ou.slug == slugify(ou_data['name'])
def test_get_or_create(self, app):
# first get-or-create? -> create
ou_data = {
'name': 'Some Organizational Unit',
}
slug = 'some-organizational-unit'
resp = app.post_json('/api/ous/', params=ou_data)
assert resp.json['slug'] == slug
uuid = resp.json['uuid']
ou_data = {
'name': 'Another name',
'slug': slug,
}
resp = app.post_json('/api/ous/?get_or_create=slug', params=ou_data)
assert resp.json['uuid'] == uuid
assert resp.json['name'] == 'Some Organizational Unit'
# update-or-create? -> update
ou_data = {
'name': 'Another name',
'slug': slug,
}
resp = app.post_json('/api/ous/?update_or_create=slug', params=ou_data)
assert resp.json['uuid'] == uuid
assert resp.json['name'] == 'Another name'
ou = OU.objects.get(uuid=uuid)
assert ou.name == 'Another name'
assert ou.slug == slug

View File

@ -1,351 +0,0 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import pytest
from django.contrib.auth import get_user_model
from django.urls import reverse
from authentic2.a2_rbac.models import Role
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.models import Attribute
from ..utils import USER_ATTRIBUTES_SET, assert_event
pytestmark = pytest.mark.django_db
User = get_user_model()
def test_api_role_get_member(app, api_user, role, member):
app.authorization = ('Basic', (api_user.username, api_user.username))
authorized = api_user.has_perm('a2_rbac.view_role', role)
if member.username == 'fake' or role.name == 'fake':
status = 404
elif authorized:
status = 404
else:
status = 403
resp = app.get(f'/api/roles/{role.uuid}/members/{member.uuid}/', status=status)
if member.username == 'fake' or role.name == 'fake':
assert resp.json == {'result': 0, 'errors': {'detail': 'Not found.'}}
elif status == 404:
assert resp.json == {'result': 0, 'errors': {'detail': 'Not found.'}}
member.roles.add(role)
resp = app.get(f'/api/roles/{role.uuid}/members/{member.uuid}/')
assert resp.json['uuid'] == member.uuid
assert USER_ATTRIBUTES_SET == set(resp.json)
else:
assert resp.json['result'] == 0
assert resp.json['errors'] == 'User not allowed to view role'
def test_api_role_get_member_nested(app, admin_ou1, user_ou1, role_ou1, role_random):
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
url = reverse(
'a2-api-role-member',
kwargs={
'role_uuid': role_ou1.uuid,
'member_uuid': admin_ou1.uuid,
},
)
Attribute.objects.create(kind='birthdate', name='birthdate', label='birthdate', required=True)
user_ou1.attributes.birthdate = datetime.date(2019, 2, 2)
user_ou1.save()
# admin.ou1 is an inherited member of role_ou1
role_random.members.add(admin_ou1)
role_random.add_parent(role_ou1)
assert admin_ou1 not in role_ou1.members.all()
assert admin_ou1 in role_ou1.all_members()
# default api call without nested users
resp = app.get(url, status=404)
assert resp.json == {'result': 0, 'errors': {'detail': 'Not found.'}}
# api call with nested users
resp = app.get(url, params={'nested': 'true'})
assert resp.json['username'] == 'admin.ou1'
assert USER_ATTRIBUTES_SET | {'birthdate', 'birthdate_verified'} == set(resp.json)
def test_api_role_add_member(app, api_user, role, member):
app.authorization = ('Basic', (api_user.username, api_user.username))
authorized = api_user.has_perm('a2_rbac.manage_members_role', role)
if member.username == 'fake' or role.name == 'fake':
status = 404
elif authorized:
status = 201
else:
status = 403
resp = app.post_json(f'/api/roles/{role.uuid}/members/{member.uuid}/', status=status)
if status == 404:
pass
elif authorized:
assert resp.json['result'] == 1
assert resp.json['detail'] == 'User successfully added to role'
assert_event(
'manager.role.membership.grant',
user=api_user if isinstance(api_user, User) else None,
api=True,
role_name=role.name,
member_name=member.get_full_name(),
)
else:
assert resp.json['result'] == 0
assert resp.json['errors'] == 'User not allowed to manage role members'
def test_api_role_remove_member(app, api_user, role, member):
app.authorization = ('Basic', (api_user.username, api_user.username))
authorized = api_user.is_superuser or api_user.has_perm('a2_rbac.admin_role', role)
if member.username == 'fake' or role.name == 'fake':
status = 404
elif authorized:
status = 200
else:
status = 403
resp = app.delete_json(f'/api/roles/{role.uuid}/members/{member.uuid}/', status=status)
if status == 404:
pass
elif authorized:
assert resp.json['result'] == 1
assert resp.json['detail'] == 'User successfully removed from role'
resp = app.get(f'/api/roles/{role.uuid}/members/{member.uuid}/', status=404)
assert resp.json == {'result': 0, 'errors': {'detail': 'Not found.'}}
assert_event(
'manager.role.membership.removal',
user=api_user if isinstance(api_user, User) else None,
api=True,
role_name=role.name,
member_name=member.get_full_name(),
)
else:
assert resp.json['result'] == 0
assert resp.json['errors'] == 'User not allowed to manage role members'
def test_api_role_add_members(app, api_user, role, member, member_rando2):
app.authorization = ('Basic', (api_user.username, api_user.username))
authorized = api_user.has_perm('a2_rbac.manage_members_role', role)
if role.name == 'fake':
status = 404
elif not authorized:
status = 403
elif member.username == 'fake':
status = 400
else:
status = 201
payload = {'data': []}
for m in [member, member_rando2, member_rando2]: # test no duplicate
payload['data'].append({'uuid': m.uuid})
resp = app.post_json(f'/api/roles/{role.uuid}/relationships/members/', params=payload, status=status)
if status in (400, 404):
pass
elif authorized:
assert resp.json['result'] == 1
assert resp.json['detail'] == 'Users successfully added to role'
for m in [member, member_rando2]:
assert m in role.members.all()
assert_event(
'manager.role.membership.grant',
user=api_user if isinstance(api_user, User) else None,
api=True,
role_name=role.name,
member_name=m.get_full_name(),
)
else:
assert resp.json['result'] == 0
assert resp.json['errors'] == 'User not allowed to manage role members'
def test_api_role_remove_members(app, api_user, role, member, member_rando2):
app.authorization = ('Basic', (api_user.username, api_user.username))
authorized = api_user.has_perm('a2_rbac.manage_members_role', role)
if role.name == 'fake':
status = 404
elif not authorized:
status = 403
elif member.username == 'fake':
status = 400
else:
status = 200
payload = {'data': []}
for m in [member, member_rando2, member_rando2]: # test no duplicate
payload['data'].append({'uuid': m.uuid})
resp = app.delete_json(f'/api/roles/{role.uuid}/relationships/members/', params=payload, status=status)
if status in (400, 404):
pass
elif authorized:
assert resp.json['result'] == 1
assert resp.json['detail'] == 'Users successfully removed from role'
for m in [member, member_rando2]:
assert m not in role.members.all()
assert_event(
'manager.role.membership.removal',
user=api_user if isinstance(api_user, User) else None,
api=True,
role_name=role.name,
member_name=m.get_full_name(),
)
else:
assert resp.json['result'] == 0
assert resp.json['errors'] == 'User not allowed to manage role members'
def test_api_role_set_members(app, api_user, role, member, member_rando2, ou_rando):
user = User.objects.create(
username='test3', first_name='test3', last_name='test3', email='test3@test.org', ou=ou_rando
)
app.authorization = ('Basic', (api_user.username, api_user.username))
authorized = api_user.has_perm('a2_rbac.manage_members_role', role)
if role.name == 'fake':
status = 404
elif not authorized:
status = 403
elif member.username == 'fake':
status = 400
else:
status = 200
payload = {'data': []}
role.members.add(user)
for m in [member, member_rando2, member_rando2]: # test no duplicate
payload['data'].append({'uuid': m.uuid})
resp = app.put_json(f'/api/roles/{role.uuid}/relationships/members/', params=payload, status=status)
if status in (400, 404):
pass
elif authorized:
assert resp.json['result'] == 1
assert resp.json['detail'] == 'Users successfully assigned to role'
assert len(role.members.all()) == 2
for m in [member, member_rando2]:
assert m in role.members.all()
assert_event(
'manager.role.membership.grant',
user=api_user if isinstance(api_user, User) else None,
api=True,
role_name=role.name,
member_name=m.get_full_name(),
)
assert_event(
'manager.role.membership.removal',
user=api_user if isinstance(api_user, User) else None,
api=True,
role_name=role.name,
member_name=user.get_full_name(),
)
else:
assert resp.json['result'] == 0
assert resp.json['errors'] == 'User not allowed to manage role members'
def test_api_role_set_empty_members(app, api_user):
app.authorization = ('Basic', (api_user.username, api_user.username))
ou = get_default_ou()
user = User.objects.create(
ou=ou, username='john.doe', first_name='Jôhn', last_name='Doe', email='john.doe@example.net'
)
user.save()
role = Role.objects.create(name='Role1', ou=ou)
role.members.add(user)
status = 200
if not api_user.has_perm('a2_rbac.manage_members_role', role):
status = 403
app.put_json(f'/api/roles/{role.uuid}/relationships/members/', params={'data': []}, status=status)
if api_user.has_perm('a2_rbac.manage_members_role', role):
assert len(role.members.all()) == 0
else:
assert len(role.members.all()) == 1
def test_api_role_get_members(app, api_user, role):
app.authorization = ('Basic', (api_user.username, api_user.username))
authorized = api_user.has_perm('a2_rbac.manage_members_role', role)
status = 405 if authorized else 403
app.get(f'/api/roles/{role.uuid}/relationships/members/', status=status)
def test_api_role_members_payload_missing(app, api_user, role):
app.authorization = ('Basic', (api_user.username, api_user.username))
authorized = api_user.has_perm('a2_rbac.manage_members_role', role)
status = 400 if authorized else 403
app.post_json(f'/api/roles/{role.uuid}/relationships/members/', status=status)
app.delete_json(f'/api/roles/{role.uuid}/relationships/members/', status=status)
app.put_json(f'/api/roles/{role.uuid}/relationships/members/', status=status)
def test_api_role_members_wrong_payload_types(app, superuser, role_random, member_rando2):
app.authorization = ('Basic', (superuser.username, superuser.username))
payload = [{'data': [{'uuid': member_rando2.uuid}]}]
resp = app.post_json(f'/api/roles/{role_random.uuid}/relationships/members/', params=payload, status=400)
assert resp.json['result'] == 0
assert resp.json['errors'] == ['Payload must be a dictionary']
payload = {'data': [[member_rando2.uuid]]}
resp = app.post_json(f'/api/roles/{role_random.uuid}/relationships/members/', params=payload, status=400)
assert resp.json['result'] == 0
assert resp.json['errors'] == ["List elements of the 'data' dict entry must be dictionaries"]
payload = {'data': [member_rando2.uuid]}
resp = app.post_json(f'/api/roles/{role_random.uuid}/relationships/members/', params=payload, status=400)
assert resp.json['result'] == 0
assert resp.json['errors'] == ["List elements of the 'data' dict entry must be dictionaries"]

View File

@ -14,24 +14,84 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import types
import pytest
from authentic2.a2_rbac.models import Role
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.api_views import OrganizationalUnit as OU
from authentic2.api_views import RoleParentingSerializer, RoleParentSerializer
from tests.utils import scoped_db_fixture
from authentic2.custom_user.models import User
from tests.utils import USER_ATTRIBUTES_SET, assert_event, create_user, scoped_db_fixture
OU_JSON = {'slug': 'default', 'uuid': '1' * 32, 'name': 'Default organizational unit'}
@scoped_db_fixture(scope='module', autouse=True)
def roles():
class Namespace:
parent = Role.objects.create(name='parent', uuid='a' * 32)
child = Role.objects.create(name='child', uuid='1' * 32)
parent.add_child(child)
grandchild = Role.objects.create(name='grandchild', uuid='2' * 32)
child.add_child(grandchild)
def setup():
OU.objects.filter(default=True).update(uuid='1' * 32)
ou = get_default_ou()
parent = Role.objects.create(name='Parent', uuid='a' * 32, ou=ou)
role = Role.objects.create(name='Role', uuid='b' * 32, ou=ou)
child = Role.objects.create(name='Child', uuid='c' * 32, ou=ou)
parent.add_child(role)
role.add_child(child)
return Namespace
user = create_user(username='user', password='user')
user_role = Role.objects.create(name='User role', uuid='d' * 32, ou=ou)
user_role.members.add(user)
admin = create_user(username='admin', password='admin')
admin.add_role('Manager')
return types.SimpleNamespace(
parent=parent, role=role, child=child, user_role=user_role, user=user, admin=admin
)
@pytest.fixture
def parent(request, setup):
return copy.deepcopy(setup.parent)
@pytest.fixture
def role(request, setup):
name = getattr(request, 'param', 'role')
return copy.deepcopy(getattr(setup, name))
@pytest.fixture
def child(request, setup):
return copy.deepcopy(setup.child)
@pytest.fixture(params=['by-uuid', 'by-slug', 'by-full-slug'])
def role_ref(request, role):
if request.param == 'by-uuid':
return role.uuid
if request.param == 'by-slug':
return role.slug
if request.param == 'by-full-slug':
return '%s:%s' % (role.ou.slug, role.slug)
@pytest.fixture
def user(setup):
return copy.deepcopy(setup.user)
@pytest.fixture
def user_role(setup):
return copy.deepcopy(setup.user_role)
@pytest.fixture
def admin(setup):
return copy.deepcopy(setup.admin)
ROLE_SERIALIZATION_FIELDS = {'uuid', 'name', 'slug', 'ou'}
class TestSerializer:
@ -47,259 +107,608 @@ class TestSerializer:
'ou': 'ou',
}
def test_role_parenting(self, db, roles):
assert RoleParentingSerializer(roles.parent.child_relation.first()).data == {
'parent': {'service': None, 'slug': 'parent', 'ou': None, 'name': 'parent', 'uuid': 'a' * 32},
def test_role_parenting(self, db, role):
assert RoleParentingSerializer(role.child_relation.first()).data == {
'parent': {'service': None, 'slug': 'role', 'ou': OU_JSON, 'name': 'Role', 'uuid': 'b' * 32},
'direct': True,
}
class TestViews:
class TestParents:
def test_not_authenticated(self, db, app, admin, roles):
app.get('/api/roles/%s/parents/' % roles.grandchild.uuid, status=401)
class AdminTestMixin:
@pytest.fixture
def app(self, db, app, admin):
app.authorization = ('Basic', ('admin', 'admin'))
return app
class TestAuthenticated:
@pytest.fixture
def app(self, app, admin):
app.authorization = ('Basic', (admin.username, admin.username))
return app
def test_default(self, app, roles):
resp = app.get('/api/roles/%s/parents/' % roles.grandchild.uuid)
assert resp.json == {
'err': 0,
'data': [
{
'uuid': '1' * 32,
'name': 'child',
'slug': 'child',
'ou': None,
}
],
}
class UserTestMixin:
@pytest.fixture
def app(self, db, app, admin):
app.authorization = ('Basic', ('user', 'user'))
return app
def test_all(self, app, roles):
resp = app.get('/api/roles/%s/parents/?all' % roles.grandchild.uuid)
assert resp.json == {
'err': 0,
'data': [
{
'uuid': 'a' * 32,
'name': 'parent',
'slug': 'parent',
'ou': None,
'direct': False,
},
{
'uuid': '1' * 32,
'name': 'child',
'slug': 'child',
'ou': None,
'direct': True,
},
],
}
def test_permission(self, app, simple_user, roles):
role = Role.objects.create(name='admin')
role.members.add(simple_user)
app.authorization = ('Basic', (simple_user.username, simple_user.username))
app.get('/api/roles/%s/parents/' % roles.grandchild.uuid, status=403)
role.add_permission(roles.grandchild, 'view')
resp = app.get('/api/roles/%s/parents/?all' % roles.grandchild.uuid, status=200)
assert not resp.json['data']
role.add_permission(roles.child, 'view')
resp = app.get('/api/roles/%s/parents/?all' % roles.grandchild.uuid, status=200)
assert len(resp.json['data']) == 1
role.add_permission(roles.parent, 'view')
resp = app.get('/api/roles/%s/parents/?all' % roles.grandchild.uuid, status=200)
assert len(resp.json['data']) == 2
class TestRolesAPI:
def test_unauthenticated(self, app, role_ref):
app.get('/api/roles/', status=401)
app.post('/api/roles/', params={}, status=401)
app.get(f'/api/roles/{role_ref}/', status=401)
app.get(f'/api/roles/{role_ref}/', status=401)
app.delete(f'/api/roles/{role_ref}/', status=401)
app.patch_json(f'/api/roles/{role_ref}/', params={}, status=401)
class TestParentsRelationships:
def test_not_authenticated(self, db, app, admin, roles):
app.get('/api/roles/%s/relationships/parents/' % roles.parent.uuid, status=401)
class TestAdmin(AdminTestMixin):
def test_list(self, app):
resp = app.get('/api/roles/')
assert len(resp.json['results'])
assert all(set(result) == ROLE_SERIALIZATION_FIELDS for result in resp.json['results'])
class TestAuthenticated:
@pytest.fixture
def app(self, app, admin):
app.authorization = ('Basic', (admin.username, admin.username))
return app
resp = app.get('/api/roles/?ou__slug=default')
assert len(resp.json['results']) == 4
def test_default(self, app, roles):
resp = app.get('/api/roles/%s/relationships/parents/' % roles.grandchild.uuid)
assert resp.json == {
'err': 0,
'data': [
{
'parent': {
'uuid': '1' * 32,
'name': 'child',
'slug': 'child',
'ou': None,
'service': None,
},
'direct': True,
}
],
}
def test_all(self, app, roles):
resp = app.get('/api/roles/%s/relationships/parents/?all' % roles.grandchild.uuid)
assert resp.json == {
'err': 0,
'data': [
{
'parent': {
'uuid': '1' * 32,
'name': 'child',
'slug': 'child',
'ou': None,
'service': None,
},
'direct': True,
},
{
'parent': {
'uuid': 'a' * 32,
'name': 'parent',
'slug': 'parent',
'ou': None,
'service': None,
},
'direct': False,
},
],
}
@pytest.mark.parametrize(
'role_description',
[{'uuid': 'a' * 32}, {'slug': 'parent'}, {'name': 'parent'}],
ids=['by_uuid', 'by_slug', 'by_name'],
def test_post(self, app, admin):
resp = app.post_json(
'/api/roles/',
params={
'name': 'Coffee Manager',
'slug': 'role1',
'ou': 'default',
},
)
def test_create(self, role_description, app, roles):
assert set(roles.parent.children(include_self=False, direct=True)) == {roles.child}
resp = app.post_json(
'/api/roles/%s/relationships/parents/' % roles.grandchild.uuid,
params={'parent': role_description},
)
assert resp.json == {
'err': 0,
'data': [
{
'parent': {
'uuid': '1' * 32,
'name': 'child',
'slug': 'child',
'ou': None,
'service': None,
},
'direct': True,
assert set(resp.json) == ROLE_SERIALIZATION_FIELDS
assert resp.json['name'] == 'Coffee Manager'
assert resp.json['slug'] == 'role1'
assert resp.json['ou'] == 'default'
assert_event('manager.role.creation', user=admin, api=True, role_name='Coffee Manager')
assert Role.objects.get(uuid=resp.json['uuid'])
def test_post_auto_slug(self, app):
resp = app.post_json(
'/api/roles/',
params={
'name': 'Coffee Manager',
'ou': 'default',
},
)
assert resp.json['slug'] == 'coffee-manager'
def test_post_auto_ou(self, app):
resp = app.post_json(
'/api/roles/',
params={
'name': 'Coffee Manager',
},
)
assert Role.objects.get(uuid=resp.json['uuid']).ou == get_default_ou()
def test_api_post_get_or_create_slug(self, app, role):
resp = app.post_json('/api/roles/?get_or_create=slug', params={'name': 'foo', 'slug': 'role'})
# check name was not modified
assert resp.json['name'] == 'Role'
role.refresh_from_db()
assert role.name == 'Role'
def test_api_post_get_or_create_name(self, app, role):
resp = app.post_json('/api/roles/?get_or_create=name', params={'name': 'Role', 'slug': 'foo'})
# check slug was not modified
assert resp.json['slug'] == 'role'
role.refresh_from_db()
assert role.slug == 'role'
def test_api_post_update_or_create_slug(self, app, role):
resp = app.post_json('/api/roles/?update_or_create=slug', params={'name': 'foo', 'slug': 'role'})
# check name was modified
assert resp.json['name'] == 'foo'
role.refresh_from_db()
assert role.name == 'foo'
def test_api_post_update_or_create_name(self, app, role):
resp = app.post_json('/api/roles/?update_or_create=name', params={'name': 'Role', 'slug': 'foo'})
# check slug was modified
assert resp.json['slug'] == 'foo'
role.refresh_from_db()
assert role.slug == 'foo'
def test_api_post_get_or_create_multiple_ou(self, app):
other = Role.objects.create(name='Role', ou=OU.objects.create(name='ou'))
app.post_json(
'/api/roles/?update_or_create=slug', params={'name': 'foo', 'slug': 'role'}, status=409
)
assert (
app.post_json(
'/api/roles/?get_or_create=slug&get_or_create=ou',
params={'name': 'foo', 'slug': 'role', 'ou': 'ou'},
).json['uuid']
== other.uuid
)
def test_api_post_update_or_create_multiple_ou(self, app, role):
other = Role.objects.create(name='Role', ou=OU.objects.create(name='ou'))
app.post_json(
'/api/roles/?update_or_create=slug',
params={'name': 'foo', 'slug': 'role', 'ou': 'ou'},
status=409,
)
resp = app.post_json(
'/api/roles/?update_or_create=slug&update_or_create=ou',
params={'name': 'foo', 'slug': 'role', 'ou': 'ou'},
)
assert resp.json['uuid'] == other.uuid
assert resp.json['name'] == 'foo'
other.refresh_from_db()
assert other.name == 'foo'
role.refresh_from_db()
assert role.name == 'Role'
def test_get(self, app, role_ref, role):
resp = app.get(f'/api/roles/{role_ref}/')
assert set(resp.json) == set(ROLE_SERIALIZATION_FIELDS)
assert resp.json['uuid'] == role.uuid
def test_get_by_slug_multiple(self, app, role):
other = Role.objects.create(name='Role', ou=OU.objects.create(name='ou'))
app.get('/api/roles/role/', status=409)
assert app.get('/api/roles/default:role/').json['uuid'] == role.uuid
assert app.get('/api/roles/ou:role/').json['uuid'] == other.uuid
def test_delete(self, app, role_ref, role, admin):
app.delete(f'/api/roles/{role_ref}/')
assert not Role.objects.filter(pk=role.pk).exists()
assert_event('manager.role.deletion', user=admin, api=True, role_name='Role')
def test_patch(self, app, role_ref):
resp1 = app.get(f'/api/roles/{role_ref}/')
resp2 = app.patch_json(f'/api/roles/{role_ref}/', params={'name': 'update-role'})
resp3 = app.get(f'/api/roles/{role_ref}/')
assert resp2.json == resp3.json
assert {key for key in resp1.json if resp1.json[key] != resp2.json[key]} == {'name'}
def test_put(self, app, role_ref):
resp1 = app.get(f'/api/roles/{role_ref}/')
resp2 = app.put_json(f'/api/roles/{role_ref}/', params={'name': 'update-role'})
resp3 = app.get('/api/roles/update-role/')
assert resp2.json == resp3.json
# on PUT slug is reset and recomputed
assert {key for key in resp1.json if resp1.json[key] != resp2.json[key]} == {'name', 'slug'}
class TestUser(UserTestMixin):
def test_no_permission(self, app, role_ref):
app.delete(f'/api/roles/{role_ref}/', status=404)
app.patch_json(f'/api/roles/{role_ref}/', status=404)
def test_list(self, app, role_ref, role, user_role):
assert app.get('/api/roles/').json['results'] == []
user_role.add_permission(role, 'view')
assert app.get('/api/roles/').json['results'] != []
def test_get(self, app, role_ref, role, user_role):
app.get(f'/api/roles/{role_ref}/', status=404)
user_role.add_permission(role, 'view')
app.get(f'/api/roles/{role_ref}/')
def test_post(self, app, user_role):
app.post('/api/roles/', params={'name': 'foo'}, status=403)
user_role.add_permission(Role, 'add')
app.post('/api/roles/', params={'name': 'foo'})
def test_delete(self, app, role_ref, role, user_role):
app.delete(f'/api/roles/{role_ref}/', status=404)
user_role.add_permission(role, 'view')
app.delete(f'/api/roles/{role_ref}/', status=403)
user_role.add_permission(role, 'delete')
app.delete(f'/api/roles/{role_ref}/')
class TestRolesMembersAPI:
@scoped_db_fixture(scope='class', autouse=True)
def setup(self, setup):
# add user to the role in the middle of the role chain
# parent -> role -> child
# | user |
setup.role.members.add(setup.user)
return setup
def test_list(self, app, role_ref):
app.get(f'/api/roles/{role_ref}/members/', status=401)
class TestAdmin(AdminTestMixin):
def test_list(self, app, role_ref, user):
resp = app.get(f'/api/roles/{role_ref}/members/')
assert resp.json['results']
first_result = resp.json['results'][0]
assert first_result['uuid'] == user.uuid
assert first_result['username'] == 'user'
assert set(first_result) == USER_ATTRIBUTES_SET
@pytest.mark.parametrize('role', ['parent'], indirect=True)
def test_get_nested(self, app, role_ref, role):
assert not app.get(f'/api/roles/{role_ref}/members/').json['results']
assert app.get(f'/api/roles/{role_ref}/members/?nested=true').json['results']
assert not app.get(f'/api/roles/{role_ref}/members/?nested=false').json['results']
class TestUser(UserTestMixin):
def test_list(self, app, role_ref, role, user_role, user):
app.get(f'/api/roles/{role_ref}/members/', status=404)
user_role.add_permission(role, 'view')
assert not app.get(f'/api/roles/{role_ref}/members/').json['results']
user_role.add_permission(user, 'view')
assert app.get(f'/api/roles/{role_ref}/members/').json['results']
class TestRoleMembershipAPI:
def test_get(self, app, role_ref, user):
app.get(f'/api/roles/{role_ref}/members/{user.uuid}/', status=401)
class TestAdmin(AdminTestMixin):
def test_get(self, app, role_ref, role, user):
app.get(f'/api/roles/{role_ref}/members/{user.uuid}/', status=404)
role.members.add(user)
resp = app.get(f'/api/roles/{role_ref}/members/{user.uuid}/')
assert resp.json['uuid'] == user.uuid
assert set(resp.json) == USER_ATTRIBUTES_SET
def test_post(self, app, role_ref, role, admin, user):
assert user not in role.members.all()
app.post(f'/api/roles/{role_ref}/members/{user.uuid}/', status=201)
assert user in role.members.all()
assert_event(
'manager.role.membership.grant',
user=admin,
api=True,
role_name='Role',
member_name=user.get_full_name(),
)
def test_delete(self, app, role_ref, role, admin, user):
role.members.add(user)
app.delete(f'/api/roles/{role_ref}/members/{user.uuid}/')
assert user not in role.members.all()
assert_event(
'manager.role.membership.removal',
user=admin,
api=True,
role_name='Role',
member_name=user.get_full_name(),
)
class TestSimpleUser(UserTestMixin):
def test_get(self, app, role_ref, role, user_role, user):
role.members.add(user)
app.get(f'/api/roles/{role_ref}/members/{user.uuid}/', status=404)
user_role.add_permission(role, 'view')
app.get(f'/api/roles/{role_ref}/members/{user.uuid}/', status=404)
user_role.add_permission(User, 'view')
app.get(f'/api/roles/{role_ref}/members/{user.uuid}/', status=200)
def test_post(self, app, role_ref, role, user_role, user):
assert user not in role.members.all()
app.post(f'/api/roles/{role_ref}/members/{user.uuid}/', status=404)
assert user not in role.members.all()
user_role.members.add(user)
user_role.add_permission(Role, 'view')
user_role.add_permission(User, 'view')
app.post(f'/api/roles/{role_ref}/members/{user.uuid}/', status=403)
assert user not in role.members.all()
user_role.add_permission(role, 'manage_members')
app.post(f'/api/roles/{role_ref}/members/{user.uuid}/', status=201)
assert user in role.members.all()
def test_delete(self, app, role_ref, role, user_role, user):
role.members.add(user)
app.delete(f'/api/roles/{role_ref}/members/{user.uuid}/', status=404)
assert user in role.members.all()
user_role.add_permission(Role, 'view')
user_role.add_permission(User, 'view')
app.delete(f'/api/roles/{role_ref}/members/{user.uuid}/', status=403)
assert user in role.members.all()
user_role.add_permission(role, 'manage_members')
app.delete(f'/api/roles/{role_ref}/members/{user.uuid}/', status=200)
assert user not in role.members.all()
class TestRoleMembershipsAPI:
def test_unauthenticated(self, app, role_ref):
app.get(f'/api/roles/{role_ref}/relationships/members/', status=401)
class TestAdmin(AdminTestMixin):
def test_post(self, app, role_ref, role, admin, user):
role.members.add(admin)
assert {admin} == set(role.members.all())
app.post_json(
f'/api/roles/{role_ref}/relationships/members/',
params={
'data': [{'uuid': user.uuid}],
},
)
assert {admin, user} == set(role.members.all())
assert_event(
'manager.role.membership.grant',
user=admin,
api=True,
role_name='Role',
member_name='user',
)
def test_patch(self, app, role_ref, role, admin, user):
role.members.add(admin)
assert {admin} == set(role.members.all())
app.patch_json(
f'/api/roles/{role_ref}/relationships/members/',
params={
'data': [{'uuid': user.uuid}],
},
)
assert {user} == set(role.members.all())
assert_event(
'manager.role.membership.grant',
user=admin,
api=True,
role_name='Role',
member_name='user',
)
assert_event(
'manager.role.membership.removal',
user=admin,
api=True,
role_name='Role',
member_name='admin',
)
def test_delete(self, app, role_ref, role, admin, user):
role.members.add(user)
assert {user} == set(role.members.all())
app.delete_json(
f'/api/roles/{role_ref}/relationships/members/',
params={
'data': [{'uuid': user.uuid}],
},
)
assert set() == set(role.members.all())
assert_event(
'manager.role.membership.removal',
user=admin,
api=True,
role_name='Role',
member_name='user',
)
# Test input validation
def test_missing_payload(self, app):
app.post_json('/api/roles/role/relationships/members/', status=400)
app.patch_json('/api/roles/role/relationships/members/', status=400)
app.delete_json('/api/roles/role/relationships/members/', status=400)
@pytest.mark.parametrize(
'payload',
[[], {'data': [[]]}, {'data': ['a' * 32]}],
ids=['list', 'data is list of list', 'data is list of uuid'],
)
def test_bad_payload(self, app, payload):
app.post_json('/api/roles/role/relationships/members/', params=payload, status=400)
app.patch_json('/api/roles/role/relationships/members/', params=payload, status=400)
app.delete_json('/api/roles/role/relationships/members/', params=payload, status=400)
class TestUser(UserTestMixin):
def test_post(self, app, role_ref, role, admin, user_role, user):
role.members.add(admin)
payload = {'data': [{'uuid': user.uuid}]}
assert {admin} == set(role.members.all())
app.post_json(f'/api/roles/{role_ref}/relationships/members/', params=payload, status=403)
user_role.add_permission(role, 'manage_members')
app.post_json(f'/api/roles/{role_ref}/relationships/members/', params=payload, status=201)
assert {admin, user} == set(role.members.all())
assert_event(
'manager.role.membership.grant',
user=user,
api=True,
role_name='Role',
member_name='user',
)
def test_patch(self, app, role_ref, role, admin, user_role, user):
role.members.add(admin)
payload = {'data': [{'uuid': user.uuid}]}
assert {admin} == set(role.members.all())
app.patch_json(f'/api/roles/{role_ref}/relationships/members/', params=payload, status=403)
user_role.add_permission(role, 'manage_members')
app.patch_json(f'/api/roles/{role_ref}/relationships/members/', params=payload, status=200)
assert {user} == set(role.members.all())
assert_event(
'manager.role.membership.grant',
user=user,
api=True,
role_name='Role',
member_name='user',
)
assert_event(
'manager.role.membership.removal',
user=user,
api=True,
role_name='Role',
member_name='admin',
)
def test_delete(self, app, role_ref, role, admin, user_role, user):
role.members.add(user)
payload = {'data': [{'uuid': user.uuid}]}
assert {user} == set(role.members.all())
app.delete_json(f'/api/roles/{role_ref}/relationships/members/', params=payload, status=403)
user_role.add_permission(role, 'manage_members')
app.delete_json(f'/api/roles/{role_ref}/relationships/members/', params=payload, status=200)
assert set() == set(role.members.all())
assert_event(
'manager.role.membership.removal',
user=user,
api=True,
role_name='Role',
member_name='user',
)
class TestRolesParentsRelationshipsAPI:
def test_unauthenticated(self, app, role_ref):
app.get(f'/api/roles/{role_ref}/relationships/parents/', status=401)
class TestAdmin(AdminTestMixin):
def test_list(self, app, role_ref):
resp = app.get(f'/api/roles/{role_ref}/relationships/parents/')
doc = resp.json
assert doc == {
'err': 0,
'data': [
{
'direct': True,
'parent': {
'uuid': 'a' * 32,
'name': 'Parent',
'slug': 'parent',
'ou': OU_JSON,
'service': None,
},
{
'parent': {
'uuid': 'a' * 32,
'name': 'parent',
'slug': 'parent',
'ou': None,
'service': None,
},
'direct': True,
},
],
}
],
}
@pytest.mark.parametrize('role', ['parent'], indirect=True)
def test_list_parent(self, app, role_ref):
resp = app.get(f'/api/roles/{role_ref}/relationships/parents/')
assert len(resp.json['data']) == 0
@pytest.mark.parametrize('role', ['child'], indirect=True)
def test_list_child(self, app, role_ref):
resp = app.get(f'/api/roles/{role_ref}/relationships/parents/')
assert len(resp.json['data']) == 1
@pytest.mark.parametrize('role', ['child'], indirect=True)
def test_list_child_all(self, app, role_ref):
resp = app.get(f'/api/roles/{role_ref}/relationships/parents/?all')
assert len(resp.json['data']) == 2
def test_post(self, app, role_ref, role, parent):
role.remove_parent(parent)
payload = {
'parent': {
'slug': 'parent',
}
assert set(roles.parent.children(include_self=False, direct=True)) == {
roles.child,
roles.grandchild,
}
assert parent not in role.parents(direct=True)
assert app.post_json(f'/api/roles/{role_ref}/relationships/parents/', params=payload).json['data']
assert parent in role.parents(direct=True)
def test_delete(self, app, role_ref, role, parent):
payload = {
'parent': {
'slug': 'parent',
}
}
assert parent in role.parents(direct=True)
assert not app.delete_json(f'/api/roles/{role_ref}/relationships/parents/', params=payload).json[
'data'
]
assert parent not in role.parents(direct=True)
def test_delete(self, app, roles):
assert set(roles.parent.children(include_self=False, direct=True)) == {roles.child}
resp = app.delete_json(
'/api/roles/%s/relationships/parents/' % roles.grandchild.uuid,
params={'parent': {'uuid': roles.child.uuid}},
)
assert resp.json == {'err': 0, 'data': []}
assert not set(roles.grandchild.children(include_self=False, direct=True))
class TestUser(UserTestMixin):
def test_list(self, app, role_ref, role, user_role):
app.get(f'/api/roles/{role_ref}/relationships/parents/', status=404)
user_role.add_permission(role, 'view')
assert not app.get(f'/api/roles/{role_ref}/relationships/parents/').json['data']
user_role.add_permission(Role, 'view')
assert app.get(f'/api/roles/{role_ref}/relationships/parents/').json['data']
def test_delete_unflatten(self, app, roles):
assert set(roles.parent.children(include_self=False, direct=True)) == {roles.child}
resp = app.delete_json(
'/api/roles/%s/relationships/parents/' % roles.grandchild.uuid,
params={'parent/uuid': roles.child.uuid},
)
assert resp.json == {'err': 0, 'data': []}
assert not set(roles.grandchild.children(include_self=False, direct=True))
def test_post(self, app, role_ref, role, parent, user_role):
role.remove_parent(parent)
class TestPermission:
@pytest.fixture
def user(self, simple_user):
return simple_user
@pytest.fixture
def admin_role(self, user):
role = Role.objects.create(name='admin')
role.members.add(user)
return role
@pytest.fixture
def app(self, app, user):
app.authorization = ('Basic', (user.username, user.username))
return app
def test_list(self, app, admin_role, roles):
resp = app.get('/api/roles/%s/relationships/parents/' % roles.grandchild.uuid, status=403)
assert resp.json == {
'err': 1,
'err_class': 'permission_denied',
'err_desc': 'You do not have permission to perform this action.',
payload = {
'parent': {
'slug': 'parent',
}
}
assert parent not in role.parents(direct=True)
app.post_json(f'/api/roles/{role_ref}/relationships/parents/', params=payload, status=404)
assert parent not in role.parents(direct=True)
user_role.add_permission(role, 'view')
app.post_json(f'/api/roles/{role_ref}/relationships/parents/', params=payload, status=403)
assert parent not in role.parents(direct=True)
user_role.add_permission(parent, 'manage_members')
assert app.post_json(f'/api/roles/{role_ref}/relationships/parents/', params=payload).json['data']
assert parent in role.parents(direct=True)
admin_role.add_permission(roles.grandchild, 'view')
resp = app.get('/api/roles/%s/relationships/parents/?all' % roles.grandchild.uuid, status=200)
assert not resp.json['data']
admin_role.add_permission(roles.child, 'view')
resp = app.get('/api/roles/%s/relationships/parents/?all' % roles.grandchild.uuid, status=200)
assert len(resp.json['data']) == 1
admin_role.add_permission(roles.parent, 'view')
resp = app.get('/api/roles/%s/relationships/parents/?all' % roles.grandchild.uuid, status=200)
assert len(resp.json['data']) == 2
def test_delete(self, app, role_ref, role, parent, user_role):
payload = {
'parent': {
'slug': 'parent',
}
}
assert parent in role.parents(direct=True)
app.delete_json(f'/api/roles/{role_ref}/relationships/parents/', params=payload, status=404)
assert parent in role.parents(direct=True)
user_role.add_permission(role, 'view')
app.delete_json(f'/api/roles/{role_ref}/relationships/parents/', params=payload, status=403)
assert parent in role.parents(direct=True)
user_role.add_permission(parent, 'manage_members')
assert not app.delete_json(f'/api/roles/{role_ref}/relationships/parents/', params=payload).json[
'data'
]
assert parent not in role.parents(direct=True)
def test_create(self, app, admin_role, roles):
app.post_json(
'/api/roles/%s/relationships/parents/' % roles.grandchild.uuid,
params={'parent': {'uuid': roles.parent.uuid}},
status=403,
)
admin_role.add_permission(roles.grandchild, 'manage_members')
app.post_json(
'/api/roles/%s/relationships/parents/' % roles.grandchild.uuid,
params={'parent': {'uuid': roles.parent.uuid}},
status=403,
)
admin_role.add_permission(roles.parent, 'manage_members')
app.post_json(
'/api/roles/%s/relationships/parents/' % roles.grandchild.uuid,
params={'parent': {'uuid': roles.parent.uuid}},
status=200,
)
def test_delete(self, app, admin_role, roles):
app.delete_json(
'/api/roles/%s/relationships/parents/' % roles.grandchild.uuid,
params={'parent': {'uuid': roles.child.uuid}},
status=403,
)
admin_role.add_permission(roles.grandchild, 'manage_members')
app.delete_json(
'/api/roles/%s/relationships/parents/' % roles.grandchild.uuid,
params={'parent': {'uuid': roles.child.uuid}},
status=403,
)
admin_role.add_permission(roles.child, 'manage_members')
app.delete_json(
'/api/roles/%s/relationships/parents/' % roles.grandchild.uuid,
params={'parent': {'uuid': roles.child.uuid}},
status=200,
)
assert not set(roles.grandchild.parents(include_self=False, direct=True))
class TestRolesParentsAPI:
def test_unauthenticated(self, app, role_ref):
app.get(f'/api/roles/{role_ref}/parents/', status=401)
class TestAdmin(AdminTestMixin):
@pytest.mark.parametrize('role', ['child'], indirect=True)
def test_list(self, app, role_ref):
resp = app.get(f'/api/roles/{role_ref}/parents/')
doc = resp.json
assert doc == {
'err': 0,
'data': [{'name': 'Role', 'ou': 'default', 'slug': 'role', 'uuid': 'b' * 32}],
}
resp = app.get(f'/api/roles/{role_ref}/parents/?all')
doc = resp.json
assert doc == {
'err': 0,
'data': [
{'name': 'Parent', 'ou': 'default', 'slug': 'parent', 'uuid': 'a' * 32, 'direct': False},
{'name': 'Role', 'ou': 'default', 'slug': 'role', 'uuid': 'b' * 32, 'direct': True},
],
}
class TestUser(UserTestMixin):
def test_list(self, app, role_ref, role, parent, child, user_role):
app.get('/api/roles/child/parents/', status=404)
user_role.add_permission(child, 'view')
assert not app.get('/api/roles/child/parents/').json['data']
user_role.add_permission(role, 'search')
assert len(app.get('/api/roles/child/parents/').json['data']) == 1
assert len(app.get('/api/roles/child/parents/?all').json['data']) == 1

View File

@ -22,7 +22,6 @@ from unittest import mock
import django_webtest
import pytest
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.core.management import call_command
from django.db import connection, transaction
@ -41,6 +40,7 @@ from authentic2_auth_oidc.utils import get_provider_by_issuer
from authentic2_idp_oidc.models import OIDCClient
from . import utils
from .utils import create_user
@pytest.fixture
@ -94,17 +94,6 @@ def ou_rando(db):
return OrganizationalUnit.objects.create(name='ou_rando', slug='ou_rando')
def create_user(**kwargs):
User = get_user_model()
password = kwargs.pop('password', None) or kwargs['username']
user, dummy = User.objects.get_or_create(**kwargs)
if password:
user.clear_password = password
user.set_password(password)
user.save()
return user
@pytest.fixture
def simple_user(db, ou1):
return create_user(

View File

@ -83,6 +83,17 @@ USER_ATTRIBUTES_SET = {
}
def create_user(**kwargs):
User = get_user_model()
password = kwargs.pop('password', None) or kwargs['username']
user, dummy = User.objects.get_or_create(**kwargs)
if password:
user.clear_password = password
user.set_password(password)
user.save()
return user
def login(
app,
user,