api: add role summary view (#79620)

This commit is contained in:
Emmanuel Cazenave 2023-07-19 14:35:27 +02:00
parent 16af375476
commit 46522676c0
5 changed files with 293 additions and 2 deletions

View File

@ -57,6 +57,9 @@ urlpatterns = [
api_views.roles_parents_relationships,
name='a2-api-role-parents-relationships',
),
re_path(
'^roles/(?P<role_uuid>[0-9a-z]{32})/summary/$', api_views.role_summary, name='a2-api-role-summary'
),
path('check-password/', api_views.check_password, name='a2-api-check-password'),
path('check-api-client/', api_views.check_api_client, name='a2-api-check-api-client'),
path('validate-password/', api_views.validate_password, name='a2-api-validate-password'),

View File

@ -15,8 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
import logging
import smtplib
import urllib
from functools import partial
import requests
@ -56,6 +58,7 @@ from rest_framework.viewsets import ModelViewSet, ViewSet
from authentic2.apps.journal.journal import journal
from authentic2.apps.journal.models import reference_integer
from authentic2.compat.drf import action
from authentic2.utils.rest_framework import APIError, APIErrorBadRequest
from authentic2.utils.text import slugify_keep_underscore
from . import api_mixins, app_settings, decorators
@ -71,6 +74,11 @@ from .utils import misc as utils_misc
from .utils.api import DjangoRBACPermission, NaturalKeyRelatedField
from .utils.lookups import Unaccent
try:
from hobo.requests_wrapper import Requests
except ImportError: # fallback on python requests, no Publik signature
from requests.sessions import Session as Requests # # pylint: disable=ungrouped-imports
User = get_user_model()
@ -1860,3 +1868,103 @@ class StatisticsAPI(ViewSet):
router.register(r'statistics', StatisticsAPI, basename='a2-api-statistics')
class ServiceSerializer(serializers.Serializer):
service = serializers.SlugRelatedField(
queryset=Service.objects.all(),
slug_field='slug',
required=True,
allow_null=True,
)
def exception_handler(func):
def f(*args, **kwargs):
try:
return func(*args, **kwargs)
except APIError as exc:
return exc.to_response()
return f
class RoleSummaryAPI(PublikMixin, GenericAPIView):
permission_classes = [
DjangoRBACPermission(
perms_map={
'GET': [],
},
object_perms_map={
'GET': ['a2_rbac.view_role'],
},
)
]
queryset = Role.objects.all()
def _requests(self, url, service_data):
try:
resp = Requests().get(url=url, timeout=settings.REQUESTS_TIMEOUT)
resp.raise_for_status()
except requests.RequestException as e:
raise APIError(
'Service (%s) error' % service_data['title'], errors=str(e), err_class='service request error'
)
try:
json_data = resp.json()
except (json.JSONDecodeError, requests.exceptions.RequestException):
raise APIError(
'Service (%s) responded with no json' % service_data['title'], err_class='json decode error'
)
if json_data.get('err', 0) != 0:
raise APIError('Invalid service (%s) response' % service_data['title'], 'application error')
return json_data
def _summary(self, service_data, slugs):
url = urllib.parse.urljoin(service_data['url'], 'api/export-import/')
data = []
for type_object in self._requests(url, service_data).get('data', []):
hit = []
for object in self._requests(type_object['urls']['list'], service_data).get('data', []):
for dep in self._requests(object['urls']['dependencies'], service_data).get('data', []):
if dep['type'] == 'roles' and dep['id'] in slugs:
hit.append(object)
break
if hit:
type_object['hit'] = hit
data.append(type_object)
return data
@exception_handler
def get(self, request, role_uuid, **kwargs):
role = get_object_or_404(Role, uuid=role_uuid)
self.check_object_permissions(self.request, role)
role_qs = self.queryset.filter(pk=role.pk)
if 'parents' in self.request.GET:
role_qs = role_qs.parents(include_self=False, annotate=False, direct=None)
slugs = [r.slug for r in role_qs]
serializer = ServiceSerializer(data=request.query_params)
if not serializer.is_valid():
raise APIErrorBadRequest(_('Unkown service'))
slug = serializer.validated_data.get('service').slug
try:
if not settings.KNOWN_SERVICES:
raise AttributeError()
except AttributeError:
raise APIError('Missing KNOWN_SERVICES setting')
service_data = None
for services in settings.KNOWN_SERVICES.values():
for service_slug, d in services.items():
if service_slug == slug:
service_data = d
break
if service_data:
break
else:
raise APIError('Service (%s) adress not found' % slug)
return Response({'err': 0, 'data': self._summary(service_data, slugs)})
role_summary = RoleSummaryAPI.as_view()

View File

@ -0,0 +1,178 @@
import pytest
import responses
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse
from authentic2.a2_rbac.models import VIEW_OP, Permission, Role
from authentic2.a2_rbac.utils import get_default_ou, get_operation
from authentic2.models import Service
@pytest.fixture
def service(db):
return Service.objects.create(name='wahetever', slug='eservices', ou=get_default_ou())
def test_role_summary_access(app, simple_role, simple_user):
url = '%s?service=eservices' % reverse('a2-api-role-summary', kwargs={'role_uuid': simple_role.uuid})
app.get(url, status=401)
app.authorization = ('Basic', (simple_user.username, simple_user.username))
app.get(url, status=403)
view_role_perm = Permission.objects.create(
operation=get_operation(VIEW_OP),
target_ct=ContentType.objects.get_for_model(Role),
target_id=simple_role.pk,
)
simple_role.permissions.add(view_role_perm)
simple_user.roles.add(simple_role)
Service.objects.create(name='wahetever', slug='eservices', ou=get_default_ou())
app.get(url)
def test_role_summary_unkown_service(app, simple_role, superuser):
url = '%s?service=unkown' % reverse('a2-api-role-summary', kwargs={'role_uuid': simple_role.uuid})
app.authorization = ('Basic', (superuser.username, superuser.username))
resp = app.get(url, status=400)
assert resp.json == {'err': 1, 'err_class': 'Unkown service', 'err_desc': 'Unkown service'}
def test_role_summary_unkown_role(app, superuser):
url = '%s?service=unkown' % reverse('a2-api-role-summary', kwargs={'role_uuid': 'x' * 32})
app.authorization = ('Basic', (superuser.username, superuser.username))
resp = app.get(url, status=404)
assert resp.json == {'err': 1, 'err_class': 'not_found', 'err_desc': 'Not found.'}
def test_role_summary_empty(app, simple_role, superuser):
url = '%s?service=eservices' % reverse('a2-api-role-summary', kwargs={'role_uuid': simple_role.uuid})
app.authorization = ('Basic', (superuser.username, superuser.username))
Service.objects.create(name='forms', slug='eservices')
with responses.RequestsMock() as rsps:
rsps.get('http://example.org/api/export-import/', json={'data': []})
response = app.get(url)
assert response.json == {'data': [], 'err': 0}
def test_role_summary_match(app, simple_role, superuser):
url = '%s?service=eservices' % reverse('a2-api-role-summary', kwargs={'role_uuid': simple_role.uuid})
app.authorization = ('Basic', (superuser.username, superuser.username))
Service.objects.create(name='forms', slug='eservices')
with responses.RequestsMock() as rsps:
rsps.get(
'http://example.org/api/export-import/',
json={
'data': [
{
'id': 'forms',
'text': 'Formulaires',
'singular': 'Formulaire',
'urls': {'list': 'http://example.org/api/export-import/forms/'},
}
]
},
)
rsps.get(
'http://example.org/api/export-import/forms/',
json={
'data': [
{
'id': 'foo',
'text': 'Foo',
'type': 'forms',
'urls': {
'dependencies': 'http://example.org/api/export-import/forms/foo/dependencies/',
'redirect': 'http://example.org/api/export-import/forms/foo/redirect/',
},
}
]
},
)
rsps.get(
'http://example.org/api/export-import/forms/foo/dependencies/',
json={
'data': [
{
'id': 'simple-role',
'text': 'simple role',
'type': 'roles',
}
]
},
)
response = app.get(url)
assert response.json == {
'data': [
{
'id': 'forms',
'text': 'Formulaires',
'singular': 'Formulaire',
'urls': {'list': 'http://example.org/api/export-import/forms/'},
'hit': [
{
'id': 'foo',
'text': 'Foo',
'type': 'forms',
'urls': {
'dependencies': 'http://example.org/api/export-import/forms/foo/dependencies/',
'redirect': 'http://example.org/api/export-import/forms/foo/redirect/',
},
}
],
}
],
'err': 0,
}
def test_role_summary_not_role_matching(app, simple_role, superuser):
url = '%s?service=eservices' % reverse('a2-api-role-summary', kwargs={'role_uuid': simple_role.uuid})
app.authorization = ('Basic', (superuser.username, superuser.username))
Service.objects.create(name='forms', slug='eservices')
with responses.RequestsMock() as rsps:
rsps.get(
'http://example.org/api/export-import/',
json={
'data': [
{
'id': 'forms',
'text': 'Formulaires',
'singular': 'Formulaire',
'urls': {'list': 'http://example.org/api/export-import/forms/'},
}
]
},
)
rsps.get(
'http://example.org/api/export-import/forms/',
json={
'data': [
{
'id': 'foo',
'text': 'Foo',
'type': 'forms',
'urls': {
'dependencies': 'http://example.org/api/export-import/forms/foo/dependencies/',
'redirect': 'http://example.org/api/export-import/forms/foo/redirect/',
},
}
]
},
)
rsps.get(
'http://example.org/api/export-import/forms/foo/dependencies/',
json={
'data': [
{
'id': 'not-simple-role',
'text': 'not simple role',
'type': 'roles',
}
]
},
)
response = app.get(url)
assert response.json == {
'data': [],
'err': 0,
}

View File

@ -48,7 +48,7 @@ ALLOWED_HOSTS = ALLOWED_HOSTS + [ # pylint: disable=used-before-assignment
KNOWN_SERVICES = {
'wcs': {
'default': {
'eservices': {
'title': 'test',
'url': 'http://example.org',
'secret': 'chrono',
@ -57,7 +57,7 @@ KNOWN_SERVICES = {
}
},
'passerelle': {
'default': {
'passerelle': {
'title': 'test',
'url': 'https://foo.whatever.none',
'secret': 'passerelle',

View File

@ -74,6 +74,7 @@ deps =
ldaptools>=0.24
numpy
django-filter
responses
stable: djangorestframework>=3.14,<3.15
oldstable: djangorestframework>=3.12,<3.13
stable: jwcrypto>=1.1,<1.3
@ -120,6 +121,7 @@ deps =
uwsgidecorators
pyquery
numpy
responses
allowlist_externals =
./getlasso3.sh
./pylint.sh