api: filter users based on OIDC client authorized roles (#35191)

This commit is contained in:
Benjamin Dauvergne 2019-09-10 10:47:53 +02:00
parent a61d2bb55d
commit d9918e3467
2 changed files with 52 additions and 5 deletions

View File

@ -37,15 +37,19 @@ class AppConfig(django.apps.AppConfig):
serializer.fields['uuid'] = serializers.SerializerMethodField(
method_name='get_oidc_uuid')
@classmethod
def get_oidc_client(cls, view):
request = view.request
if not hasattr(request.user, 'oidc_client'):
return None
return request.user.oidc_client
def a2_hook_api_modify_view_before_get_object(self, view):
'''Decrypt sub used as pk argument in URL.'''
import uuid
from . import utils
request = view.request
if not hasattr(request.user, 'oidc_client'):
return
client = request.user.oidc_client
client = self.get_oidc_client(view)
if client.identifier_policy != client.POLICY_PAIRWISE_REVERSIBLE:
return
lookup_url_kwarg = view.lookup_url_kwarg or view.lookup_field
@ -106,3 +110,18 @@ class AppConfig(django.apps.AppConfig):
new_unknown_uuids.append(uuid_map[u])
new_unknown_uuids.extend(request.unknown_uuids)
data['unknown_uuids'] = new_unknown_uuids
def a2_hook_api_modify_queryset(self, view, qs):
from django.contrib.auth import get_user_model
client = self.get_oidc_client(view)
# fast path
if (not issubclass(qs.model, get_user_model())
or client is None
or not client.authorized_roles.exists()):
return qs
qs = qs.filter(roles__in=client.authorized_roles.children())
qs = qs.distinct()
return qs

View File

@ -36,7 +36,7 @@ from django.utils.six.moves.urllib import parse as urlparse
User = get_user_model()
from authentic2.models import Attribute
from authentic2.models import Attribute, AuthorizedRole
from authentic2_idp_oidc.models import OIDCClient, OIDCAuthorization, OIDCCode, OIDCAccessToken, OIDCClaim
from authentic2_idp_oidc.utils import make_sub
from authentic2.a2_rbac.utils import get_default_ou
@ -1133,3 +1133,31 @@ http://example5.com/toto*
assert not client.is_valid_redirect_uri('http://example5.com/tototata/')
assert not client.is_valid_redirect_uri('http://example5.com/tototata')
def test_filter_api_users(app, oidc_client, admin, simple_user, role_random):
oidc_client.has_api_access = True
oidc_client.save()
if (oidc_client.identifier_policy not in
(oidc_client.POLICY_UUID, oidc_client.POLICY_PAIRWISE_REVERSIBLE)):
return
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
response = app.get('/api/users/')
count = len(response.json['results'])
assert count > 0
AuthorizedRole.objects.create(service=oidc_client, role=role_random)
response = app.get('/api/users/')
assert len(response.json['results']) == 0
role_random.members.add(simple_user)
response = app.get('/api/users/')
assert len(response.json['results']) == 1
AuthorizedRole.objects.all().delete()
response = app.get('/api/users/')
assert len(response.json['results']) == count