idp_oidc: restrict client's returned qs to authorized users (#65942)
gitea/authentic/pipeline/head This commit looks good Details

This commit is contained in:
Paul Marillonnet 2022-06-02 17:28:32 +02:00
parent bfa653ba78
commit 2bf67721b7
4 changed files with 137 additions and 11 deletions

View File

@ -143,17 +143,37 @@ class AppConfig(django.apps.AppConfig):
def a2_hook_api_modify_queryset(self, view, qs):
from django.contrib.auth import get_user_model
from django.db.models import Q
from django.utils.timezone import now
from .models import OIDCAuthorization, OIDCClient
client = self.get_oidc_client(view)
User = get_user_model()
# fast path
if (
not issubclass(qs.model, get_user_model())
or client is None
or not client.authorized_roles.exists()
):
if not issubclass(qs.model, User) or client is None:
return qs
qs = qs.filter(roles__in=client.authorized_roles.children())
qs = qs.distinct()
# reduce qs to users having previously granted an authorization to the client
if (
client
and client.authorization_mode != OIDCClient.AUTHORIZATION_MODE_NONE
and view.request.method == 'GET'
and issubclass(qs.model, User)
):
authorizations = OIDCAuthorization.objects.none()
if client.authorization_mode == OIDCClient.AUTHORIZATION_MODE_BY_SERVICE:
authorizations = client.authorizations
elif client.ou and hasattr(client.ou, 'oidc_authorizations'):
authorizations = client.ou.oidc_authorizations
qs = qs.filter(
id__in=authorizations.filter(Q(expired__isnull=True) | Q(expired__gt=now())).values_list(
'user__id', flat=True
)
)
if client.authorized_roles.exists():
qs = qs.filter(roles__in=client.authorized_roles.children())
qs = qs.distinct()
return qs

View File

@ -30,6 +30,7 @@ from django.core import mail
from django.urls import reverse
from django.utils.encoding import force_str
from django.utils.text import slugify
from django.utils.timezone import now
from requests.models import Response
from authentic2.a2_rbac.models import SEARCH_OP
@ -471,6 +472,8 @@ def test_api_member_users_list_search_text(app, superuser, simple_role):
def test_api_users_create(settings, app, api_user):
from authentic2_idp_oidc.models import OIDCAuthorization, OIDCClient
at = Attribute.objects.create(kind='title', name='title', label='title')
app.authorization = ('Basic', (api_user.username, api_user.username))
# test missing first_name
@ -538,6 +541,22 @@ def test_api_users_create(settings, app, api_user):
assert AttributeValue.objects.with_owner(new_user).filter(verified=True).count() == 0
assert AttributeValue.objects.with_owner(new_user).filter(attribute=at).exists()
assert AttributeValue.objects.with_owner(new_user).get(attribute=at).content == payload['title']
if (
hasattr(api_user, 'oidc_client')
and api_user.oidc_client.authorization_mode != OIDCClient.AUTHORIZATION_MODE_NONE
):
if api_user.oidc_client.authorization_mode == OIDCClient.AUTHORIZATION_MODE_BY_SERVICE:
client_id = api_user.oidc_client.id
client_ct = ContentType.objects.get_for_model(OIDCClient)
else:
client_id = api_user.oidc_client.ou.id
client_ct = ContentType.objects.get_for_model(OU)
OIDCAuthorization.objects.create(
user=new_user,
client_id=client_id,
client_ct=client_ct,
expired=now() + datetime.timedelta(hours=1),
)
resp2 = app.get('/api/users/%s/' % resp.json['uuid'])
assert resp.json == resp2.json
payload.update({'uuid': '1234567890', 'email': 'foo@example.com', 'username': 'foobar'})
@ -546,6 +565,13 @@ def test_api_users_create(settings, app, api_user):
assert 'title' in resp.json
at.disabled = True
at.save()
if (
hasattr(api_user, 'oidc_client')
and api_user.oidc_client.authorization_mode != OIDCClient.AUTHORIZATION_MODE_NONE
):
authz = OIDCAuthorization.objects.get(user=new_user)
authz.user = User.objects.get(uuid='1234567890')
authz.save()
resp = app.get('/api/users/1234567890/')
assert 'title' not in resp.json
@ -1173,13 +1199,14 @@ def test_user_synchronization(app, simple_user):
def test_user_synchronization_modification_profile(app):
from authentic2_idp_oidc.models import OIDCClient
from authentic2_idp_oidc.models import OIDCAuthorization, OIDCClient
from authentic2_idp_oidc.utils import make_sub
uuids = []
users = []
precreate_dt = datetime.datetime.now()
profile_type = ProfileType.objects.create(name='Referee', slug='referee')
client_ct = ContentType.objects.get_for_model(OIDCClient)
oidc_client = OIDCClient.objects.create(
name='Synchronized client',
slug='synchronized-client',
@ -1208,6 +1235,14 @@ def test_user_synchronization_modification_profile(app):
profile,
)
)
if i % 4:
OIDCAuthorization.objects.create(
client_ct=client_ct,
client_id=oidc_client.id,
user=user,
scopes='openid email profile',
expired=datetime.datetime.now() + datetime.timedelta(hours=5),
)
url = reverse('a2-api-users-synchronization')
@ -1232,7 +1267,7 @@ def test_user_synchronization_modification_profile(app):
response = app.get(
'/api/users/?modified__gt=%s' % precreate_dt.strftime('%Y-%m-%dT%H:%M:%S'), headers=headers
)
assert len(response.json['results']) == 100
assert len(response.json['results']) == 75
def test_user_synchronization_full(app, admin):
@ -1270,6 +1305,8 @@ def test_user_synchronization_full(app, admin):
def test_api_drf_authentication_class(app, admin, user_ou1, oidc_client):
from authentic2_idp_oidc.models import OIDCAuthorization, OIDCClient
url = '/api/users/%s/' % user_ou1.uuid
# test invalid client
app.authorization = ('Basic', ('foo', 'bar'))
@ -1283,8 +1320,16 @@ def test_api_drf_authentication_class(app, admin, user_ou1, oidc_client):
resp = app.get(url, status=401)
assert resp.json['result'] == 0
assert resp.json['errors'] == "User inactive or deleted."
# test oidc client
# test oidc client unauthorized for user_ou1
app.authorization = ('Basic', (oidc_client.username, oidc_client.username))
app.get(url, status=404)
OIDCAuthorization.objects.create(
client_id=oidc_client.id,
client_ct=ContentType.objects.get_for_model(OIDCClient),
user=user_ou1,
expired=now() + datetime.timedelta(hours=1),
)
# test oidc client
app.get(url, status=200)
# test oidc client without has API access
oidc_client.oidc_client.has_api_access = False

View File

@ -291,6 +291,10 @@ def oidc_client(db, ou1):
def username(self):
return self.oidc_client.client_id
@property
def id(self):
return self.oidc_client.id
@property
def is_superuser(self):
return False

View File

@ -14,8 +14,16 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random
from datetime import timedelta
from django.contrib.contenttypes.models import ContentType
from django.utils.timezone import now
from authentic2.a2_rbac.models import OrganizationalUnit as OU
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.custom_user.models import User
from authentic2_idp_oidc.models import OIDCClient
from authentic2_idp_oidc.models import OIDCAuthorization, OIDCClient
from authentic2_idp_oidc.utils import make_sub
@ -39,3 +47,52 @@ def test_api_synchronization(app, oidc_client):
if status == 200:
assert response.json['result'] == 1
assert set(response.json['unknown_uuids']) == deleted_subs
def test_api_users_list_queryset_reduction(app, oidc_client):
oidc_client.has_api_access = True
oidc_client.identifier_policy = OIDCClient.POLICY_PAIRWISE_REVERSIBLE
oidc_client.save()
pre_modification = now().strftime('%Y-%m-%dT%H:%M:%S')
users = [User.objects.create(username=f'user-{i}', last_name=f'Name-{i}') for i in range(20)]
expired = now() + timedelta(hours=1)
for user in random.sample(users, k=5):
OIDCAuthorization.objects.create(
client_id=oidc_client.id,
client_ct=ContentType.objects.get_for_model(OIDCClient),
user=user,
expired=expired,
)
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
url = f'/api/users/?modified__gt={pre_modification}&claim_resolution'
response = app.get(url, status=200)
assert len(response.json['results']) == 5
oidc_client.authorization_mode = OIDCClient.AUTHORIZATION_MODE_NONE
oidc_client.save()
response = app.get(url, status=200)
assert len(response.json['results']) == User.objects.count()
oidc_client.authorization_mode = OIDCClient.AUTHORIZATION_MODE_BY_OU
oidc_client.ou = get_default_ou()
oidc_client.save()
response = app.get(url, status=200)
assert not response.json['results']
for user in random.sample(users, k=8):
OIDCAuthorization.objects.create(
client_id=get_default_ou().id,
client_ct=ContentType.objects.get_for_model(OU),
user=user,
expired=expired,
)
response = app.get(url, status=200)
assert len(response.json['results']) == 8