diff --git a/tests/idp_oidc/__init__.py b/tests/idp_oidc/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/idp_oidc/conftest.py b/tests/idp_oidc/conftest.py
new file mode 100644
index 000000000..2aa431c43
--- /dev/null
+++ b/tests/idp_oidc/conftest.py
@@ -0,0 +1,148 @@
+# authentic2 - versatile identity manager
+# Copyright (C) 2010-2021 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import base64
+from importlib import import_module
+
+import pytest
+from django.urls import reverse
+
+from authentic2.a2_rbac.utils import get_default_ou
+from authentic2.models import Attribute
+from authentic2_idp_oidc import app_settings
+from authentic2_idp_oidc.models import OIDCClaim, OIDCClient
+from tests import utils
+
+JWKSET = {
+ "keys": [
+ {
+ "qi": "h_zifVD-ChelxZUVxhICNcgGkQz26b-EdIlLY9rN7SX_aD3sLI_JHEHV4Bz3kV5eW8O4qJ8SHhfUdHGK-gRH7FVOGoXnXACf47QoXowHzsPLL64wCuZENTl7hIRGLY-BInULkfTQfuiVSMoxPjsVNTMBzMiz0bNjMQyMyvW5xH4",
+ "kty": "RSA",
+ "d": "pUcL4-LDBy3rqJWip269h5Hd6nLvqjXltfkVe_mL-LwZPHmCrUaj_SX54SnCY3Wyf7kxhoMYUac62lQ71923uJPFFdiavAujbNrtZPq32i4C-1apWXW8OGJr8VoVDqalxj9SAq1G54wbbsaAPrZdyuqy-esNxDqDigfbM-cWgngBBYo5CSsfnmnd05N2cUS26L7QzWbNHwilnBTE9e_J7rK3xUCDKrobv6_LiI-AhMmBHJSrCxjexh0wzfBi_Ntj9BGCcPThDjG8SQvaV-aLNdLfIy2XO3i076RLBB6Hm_yHuAparrwp-pPE48eQdiYjrSAFalz4ojWQ3_ByLA6uAQ",
+ "q": "2FvfeWnIlWNUipan7DIBlJrmz5EinJNxrQ-BNwPHrAoIM8qvyC7jPy09YxZs5Y9CMMZSal6C4Nm2LHBFxHU9z1qd5XDzbk19G-y1lDqZizVXr876TpiAjuq03rcoMQm8dQru_pVjUdgxR64vKyJ9CaFMAqcpZeEMIqAvzhQG8uE",
+ "dp": "Kg4HPGpzenhK2ser6nfM1Yt-pkqBbWQotvqsxGptECXpbN7vweupvL5kJPeRrbsXKp9QE7DXTN1sG9puJxMSwtgiv4hr9Va9e9WOC6PMd2VY7tgw5uKMpPLMc5y82PusRhBoRh0SUUsjyQxK9PGtWYnGZXbAoaIYPdMyDlosfqU",
+ "dq": "QuUNEHYTjZTbo8n2-4FumarXKGBAalbwM8jyc7cYemnTpWfKt8M_gd4T99oMK2IC3h_DhZ3ZK3pE6DKCb76sMLtczH8C1RziTMsATWdc5_zDMtl07O4b-ZQ5_g51P8w515pc0JwRzFFi0z3Y2aZdMKgNX1id5SES5nXOshHhICE",
+ "n": "0lN6CiJGFD8BSPV_azLoEl6Nq-WlHkU743D5rqvzw1sOaxstMGxAhVk2YIhWwfvapV6XjO_yvc4778VBTELOdjRw6BGUdBJepdwkL__TPyjEVhqMQj9MKhEU4GUy9w0Lsilb5D01kfrOKpmdcYw4jhcDvb0H4-LZgh1Vk84vF4WaQCUg_AX4drVDQOjoU8kuWIM8gz9w6zEsbIw-gtMRpFwS8ncA0zDX5VfyC77iMxzFftDIP2gM5GvdevMzvP9IRkRRBhP9vV4JchBFPHSA9OPJcnySjJJNW6aAJn6P6JasN1z68khjufM09J8UzmLAZYOq7gUG95Ox1KsV-g337Q",
+ "e": "AQAB",
+ "p": "-Nyj_Sw3f2HUqSssCZv84y7b3blOtGGAhfYN_JtGfcTQv2bOtxrIUzeonCi-Z_1W4hO10tqxJcOB0ibtDqkDlLhnLaIYOBfriITRFK83EJG5sC-0KTmFzUXFTA2aMc1QgP-Fu6gUfQpPqLgWxhx8EFhkBlBZshKU5-C-385Sco0",
+ "kid": "46c686ea-7d4e-41cd-a462-2125fc1dee0e",
+ },
+ {
+ "kty": "EC",
+ "d": "wwULaR9UYWZW6U2oEbkz3sO1lhPSj6DyA6e7PiUfhog",
+ "use": "sig",
+ "crv": "P-256",
+ "x": "HZMHZkX-63heqA5pvWn-UR7bgcXZNEcQa5wfvG_BzTw",
+ "y": "SUCuwjjiyKvGq5Odr0sjDqjha_CBqks0JQFrR7Ei5OQ",
+ "alg": "ES256",
+ "kid": "ac85baf4-835b-49b2-8272-ffecce7654c9",
+ },
+ ]
+}
+
+
+@pytest.fixture
+def jwkset():
+ return JWKSET
+
+
+@pytest.fixture
+def oidc_settings(settings, jwkset):
+ settings.A2_IDP_OIDC_JWKSET = jwkset
+ settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m'
+ return settings
+
+
+def make_client(app, superuser, params=None):
+ Attribute.objects.create(
+ name='cityscape_image',
+ label='cityscape',
+ kind='profile_image',
+ asked_on_registration=True,
+ required=False,
+ user_visible=True,
+ user_editable=True,
+ )
+
+ client = OIDCClient(
+ name='oidcclient',
+ slug='oidcclient',
+ ou=get_default_ou(),
+ unauthorized_url='https://example.com/southpark/',
+ redirect_uris='https://example.com/callbac%C3%A9',
+ )
+
+ for key, value in (params or {}).items():
+ setattr(client, key, value)
+ client.save()
+ for mapping in app_settings.DEFAULT_MAPPINGS:
+ OIDCClaim.objects.create(
+ client=client, name=mapping['name'], value=mapping['value'], scopes=mapping['scopes']
+ )
+ return client
+
+
+@pytest.fixture
+def client(app, superuser):
+ return make_client(app, superuser, {})
+
+
+@pytest.fixture
+def simple_oidc_client(db):
+ return OIDCClient.objects.create(
+ name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
+ )
+
+
+@pytest.fixture
+def oidc_client(request, superuser, app, simple_user, oidc_settings):
+ return make_client(app, superuser, getattr(request, 'param', None) or {})
+
+
+@pytest.fixture
+def normal_oidc_client(superuser, app, simple_user):
+ url = reverse('admin:authentic2_idp_oidc_oidcclient_add')
+ assert OIDCClient.objects.count() == 0
+ response = utils.login(app, superuser, path=url)
+ response.form.set('name', 'oidcclient')
+ response.form.set('slug', 'oidcclient')
+ response.form.set('ou', get_default_ou().pk)
+ response.form.set('unauthorized_url', 'https://example.com/southpark/')
+ response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9')
+ response = response.form.submit(name='_save').follow()
+ assert OIDCClient.objects.count() == 1
+ client = OIDCClient.objects.get()
+ utils.logout(app)
+ return client
+
+
+@pytest.fixture
+def session(settings, db, simple_user):
+ engine = import_module(settings.SESSION_ENGINE)
+ session = engine.SessionStore()
+ session['_auth_user_id'] = str(simple_user.id)
+ session.create()
+ return session
+
+
+def client_authentication_headers(oidc_client):
+ client_creds = '%s:%s' % (oidc_client.client_id, oidc_client.client_secret)
+ token = base64.b64encode(client_creds.encode('ascii'))
+ return {'Authorization': 'Basic %s' % str(token.decode('ascii'))}
+
+
+def bearer_authentication_headers(access_token):
+ return {'Authorization': 'Bearer %s' % str(access_token)}
diff --git a/tests/idp_oidc/test_api.py b/tests/idp_oidc/test_api.py
new file mode 100644
index 000000000..ad36aeadf
--- /dev/null
+++ b/tests/idp_oidc/test_api.py
@@ -0,0 +1,41 @@
+# authentic2 - versatile identity manager
+# Copyright (C) 2010-2021 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+from authentic2.custom_user.models import User
+from authentic2_idp_oidc.models import OIDCClient
+from authentic2_idp_oidc.utils import make_sub
+
+
+def test_api_synchronization(app, oidc_client):
+ oidc_client.has_api_access = True
+ oidc_client.save()
+ users = [User.objects.create(username='user-%s' % i) for i in range(10)]
+ for user in users[5:]:
+ user.delete()
+ deleted_subs = set(make_sub(oidc_client, user) for user in users[5:])
+
+ app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
+ status = 200
+ if oidc_client.identifier_policy not in (OIDCClient.POLICY_PAIRWISE_REVERSIBLE, OIDCClient.POLICY_UUID):
+ status = 401
+ response = app.post_json(
+ '/api/users/synchronization/',
+ params={'known_uuids': [make_sub(oidc_client, user) for user in users]},
+ status=status,
+ )
+ if status == 200:
+ assert response.json['result'] == 1
+ assert set(response.json['unknown_uuids']) == deleted_subs
diff --git a/tests/idp_oidc/test_migrations.py b/tests/idp_oidc/test_migrations.py
new file mode 100644
index 000000000..e8c25d984
--- /dev/null
+++ b/tests/idp_oidc/test_migrations.py
@@ -0,0 +1,125 @@
+# authentic2 - versatile identity manager
+# Copyright (C) 2010-2021 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+
+def test_oidclient_claims_data_migration(migration):
+ app = 'authentic2_idp_oidc'
+ migrate_from = [(app, '0009_auto_20180313_1156')]
+ migrate_to = [(app, '0010_oidcclaim')]
+
+ old_apps = migration.before(migrate_from)
+ OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
+
+ client = OIDCClient(name='test', slug='test', redirect_uris='https://example.net/')
+ client.save()
+
+ new_apps = migration.apply(migrate_to)
+ OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
+ OIDCClaim = new_apps.get_model('authentic2_idp_oidc', 'OIDCClaim')
+
+ client = OIDCClient.objects.first()
+ assert OIDCClaim.objects.filter(client=client.id).count() == 5
+
+
+def test_oidclient_preferred_username_as_identifier_data_migration(migration):
+ app = 'authentic2_idp_oidc'
+ migrate_from = [(app, '0010_oidcclaim')]
+ migrate_to = [(app, '0011_auto_20180808_1546')]
+
+ old_apps = migration.before(migrate_from)
+ OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
+ OIDCClaim = old_apps.get_model('authentic2_idp_oidc', 'OIDCClaim')
+
+ client1 = OIDCClient.objects.create(name='test', slug='test', redirect_uris='https://example.net/')
+ client2 = OIDCClient.objects.create(name='test1', slug='test1', redirect_uris='https://example.net/')
+ client3 = OIDCClient.objects.create(name='test2', slug='test2', redirect_uris='https://example.net/')
+ client4 = OIDCClient.objects.create(name='test3', slug='test3', redirect_uris='https://example.net/')
+ for client in (client1, client2, client3, client4):
+ if client.name == 'test1':
+ continue
+ if client.name == 'test3':
+ OIDCClaim.objects.create(
+ client=client, name='preferred_username', value='django_user_full_name', scopes='profile'
+ )
+ else:
+ OIDCClaim.objects.create(
+ client=client, name='preferred_username', value='django_user_username', scopes='profile'
+ )
+ OIDCClaim.objects.create(
+ client=client, name='given_name', value='django_user_first_name', scopes='profile'
+ )
+ OIDCClaim.objects.create(
+ client=client, name='family_name', value='django_user_last_name', scopes='profile'
+ )
+ if client.name == 'test2':
+ continue
+ OIDCClaim.objects.create(client=client, name='email', value='django_user_email', scopes='email')
+ OIDCClaim.objects.create(
+ client=client, name='email_verified', value='django_user_email_verified', scopes='email'
+ )
+
+ new_apps = migration.apply(migrate_to)
+ OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
+
+ client = OIDCClient.objects.first()
+ for client in OIDCClient.objects.all():
+ claims = client.oidcclaim_set.all()
+ if client.name == 'test':
+ assert claims.count() == 5
+ assert sorted(claims.values_list('name', flat=True)) == [
+ 'email',
+ 'email_verified',
+ 'family_name',
+ 'given_name',
+ 'preferred_username',
+ ]
+ assert sorted(claims.values_list('value', flat=True)) == [
+ 'django_user_email',
+ 'django_user_email_verified',
+ 'django_user_first_name',
+ 'django_user_identifier',
+ 'django_user_last_name',
+ ]
+ elif client.name == 'test2':
+ assert claims.count() == 3
+ assert sorted(claims.values_list('name', flat=True)) == [
+ 'family_name',
+ 'given_name',
+ 'preferred_username',
+ ]
+ assert sorted(claims.values_list('value', flat=True)) == [
+ 'django_user_first_name',
+ 'django_user_last_name',
+ 'django_user_username',
+ ]
+ elif client.name == 'test3':
+ assert claims.count() == 5
+ assert sorted(claims.values_list('name', flat=True)) == [
+ 'email',
+ 'email_verified',
+ 'family_name',
+ 'given_name',
+ 'preferred_username',
+ ]
+ assert sorted(claims.values_list('value', flat=True)) == [
+ 'django_user_email',
+ 'django_user_email_verified',
+ 'django_user_first_name',
+ 'django_user_full_name',
+ 'django_user_last_name',
+ ]
+ else:
+ assert claims.count() == 0
diff --git a/tests/test_idp_oidc.py b/tests/idp_oidc/test_misc.py
similarity index 79%
rename from tests/test_idp_oidc.py
rename to tests/idp_oidc/test_misc.py
index 04745073f..b75a570ff 100644
--- a/tests/test_idp_oidc.py
+++ b/tests/idp_oidc/test_misc.py
@@ -1,5 +1,5 @@
# authentic2 - versatile identity manager
-# Copyright (C) 2010-2019 Entr'ouvert
+# Copyright (C) 2010-2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
@@ -19,7 +19,6 @@ import datetime
import functools
import json
import urllib.parse
-from importlib import import_module
import pytest
from django.contrib.auth import get_user_model
@@ -37,51 +36,17 @@ from authentic2.a2_rbac.utils import get_default_ou
from authentic2.models import Attribute, AuthorizedRole
from authentic2.utils import good_next_url, make_url
from authentic2_auth_oidc.utils import parse_timestamp
-from authentic2_idp_oidc import app_settings
from authentic2_idp_oidc.models import OIDCAccessToken, OIDCAuthorization, OIDCClaim, OIDCClient, OIDCCode
from authentic2_idp_oidc.utils import base64url, get_first_ec_sig_key, get_first_rsa_sig_key, make_sub
from django_rbac.utils import get_ou_model, get_role_model
-from . import utils
+from .. import utils
+from .conftest import bearer_authentication_headers, client_authentication_headers
User = get_user_model()
pytestmark = pytest.mark.django_db
-JWKSET = {
- "keys": [
- {
- "qi": "h_zifVD-ChelxZUVxhICNcgGkQz26b-EdIlLY9rN7SX_aD3sLI_JHEHV4Bz3kV5eW8O4qJ8SHhfUdHGK-gRH7FVOGoXnXACf47QoXowHzsPLL64wCuZENTl7hIRGLY-BInULkfTQfuiVSMoxPjsVNTMBzMiz0bNjMQyMyvW5xH4",
- "kty": "RSA",
- "d": "pUcL4-LDBy3rqJWip269h5Hd6nLvqjXltfkVe_mL-LwZPHmCrUaj_SX54SnCY3Wyf7kxhoMYUac62lQ71923uJPFFdiavAujbNrtZPq32i4C-1apWXW8OGJr8VoVDqalxj9SAq1G54wbbsaAPrZdyuqy-esNxDqDigfbM-cWgngBBYo5CSsfnmnd05N2cUS26L7QzWbNHwilnBTE9e_J7rK3xUCDKrobv6_LiI-AhMmBHJSrCxjexh0wzfBi_Ntj9BGCcPThDjG8SQvaV-aLNdLfIy2XO3i076RLBB6Hm_yHuAparrwp-pPE48eQdiYjrSAFalz4ojWQ3_ByLA6uAQ",
- "q": "2FvfeWnIlWNUipan7DIBlJrmz5EinJNxrQ-BNwPHrAoIM8qvyC7jPy09YxZs5Y9CMMZSal6C4Nm2LHBFxHU9z1qd5XDzbk19G-y1lDqZizVXr876TpiAjuq03rcoMQm8dQru_pVjUdgxR64vKyJ9CaFMAqcpZeEMIqAvzhQG8uE",
- "dp": "Kg4HPGpzenhK2ser6nfM1Yt-pkqBbWQotvqsxGptECXpbN7vweupvL5kJPeRrbsXKp9QE7DXTN1sG9puJxMSwtgiv4hr9Va9e9WOC6PMd2VY7tgw5uKMpPLMc5y82PusRhBoRh0SUUsjyQxK9PGtWYnGZXbAoaIYPdMyDlosfqU",
- "dq": "QuUNEHYTjZTbo8n2-4FumarXKGBAalbwM8jyc7cYemnTpWfKt8M_gd4T99oMK2IC3h_DhZ3ZK3pE6DKCb76sMLtczH8C1RziTMsATWdc5_zDMtl07O4b-ZQ5_g51P8w515pc0JwRzFFi0z3Y2aZdMKgNX1id5SES5nXOshHhICE",
- "n": "0lN6CiJGFD8BSPV_azLoEl6Nq-WlHkU743D5rqvzw1sOaxstMGxAhVk2YIhWwfvapV6XjO_yvc4778VBTELOdjRw6BGUdBJepdwkL__TPyjEVhqMQj9MKhEU4GUy9w0Lsilb5D01kfrOKpmdcYw4jhcDvb0H4-LZgh1Vk84vF4WaQCUg_AX4drVDQOjoU8kuWIM8gz9w6zEsbIw-gtMRpFwS8ncA0zDX5VfyC77iMxzFftDIP2gM5GvdevMzvP9IRkRRBhP9vV4JchBFPHSA9OPJcnySjJJNW6aAJn6P6JasN1z68khjufM09J8UzmLAZYOq7gUG95Ox1KsV-g337Q",
- "e": "AQAB",
- "p": "-Nyj_Sw3f2HUqSssCZv84y7b3blOtGGAhfYN_JtGfcTQv2bOtxrIUzeonCi-Z_1W4hO10tqxJcOB0ibtDqkDlLhnLaIYOBfriITRFK83EJG5sC-0KTmFzUXFTA2aMc1QgP-Fu6gUfQpPqLgWxhx8EFhkBlBZshKU5-C-385Sco0",
- "kid": "46c686ea-7d4e-41cd-a462-2125fc1dee0e",
- },
- {
- "kty": "EC",
- "d": "wwULaR9UYWZW6U2oEbkz3sO1lhPSj6DyA6e7PiUfhog",
- "use": "sig",
- "crv": "P-256",
- "x": "HZMHZkX-63heqA5pvWn-UR7bgcXZNEcQa5wfvG_BzTw",
- "y": "SUCuwjjiyKvGq5Odr0sjDqjha_CBqks0JQFrR7Ei5OQ",
- "alg": "ES256",
- "kid": "ac85baf4-835b-49b2-8272-ffecce7654c9",
- },
- ]
-}
-
-
-@pytest.fixture
-def oidc_settings(settings):
- settings.A2_IDP_OIDC_JWKSET = JWKSET
- settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m'
- return settings
-
def test_get_jwkset(oidc_settings):
from authentic2_idp_oidc.utils import get_jwkset
@@ -159,72 +124,6 @@ def test_admin(other_attributes, app, superuser, oidc_settings):
assert OIDCClient.objects.count() == 1
-def make_client(app, superuser, params=None):
- Attribute.objects.create(
- name='cityscape_image',
- label='cityscape',
- kind='profile_image',
- asked_on_registration=True,
- required=False,
- user_visible=True,
- user_editable=True,
- )
-
- client = OIDCClient(
- name='oidcclient',
- slug='oidcclient',
- ou=get_default_ou(),
- unauthorized_url='https://example.com/southpark/',
- redirect_uris='https://example.com/callbac%C3%A9',
- )
-
- for key, value in (params or {}).items():
- setattr(client, key, value)
- client.save()
- for mapping in app_settings.DEFAULT_MAPPINGS:
- OIDCClaim.objects.create(
- client=client, name=mapping['name'], value=mapping['value'], scopes=mapping['scopes']
- )
- return client
-
-
-@pytest.fixture
-def client(app, superuser):
- return make_client(app, superuser, {})
-
-
-@pytest.fixture
-def oidc_client(request, superuser, app, simple_user, oidc_settings):
- return make_client(app, superuser, getattr(request, 'param', None) or {})
-
-
-@pytest.fixture
-def normal_oidc_client(superuser, app, simple_user):
- url = reverse('admin:authentic2_idp_oidc_oidcclient_add')
- assert OIDCClient.objects.count() == 0
- response = utils.login(app, superuser, path=url)
- response.form.set('name', 'oidcclient')
- response.form.set('slug', 'oidcclient')
- response.form.set('ou', get_default_ou().pk)
- response.form.set('unauthorized_url', 'https://example.com/southpark/')
- response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9')
- response = response.form.submit(name='_save').follow()
- assert OIDCClient.objects.count() == 1
- client = OIDCClient.objects.get()
- utils.logout(app)
- return client
-
-
-def client_authentication_headers(oidc_client):
- client_creds = '%s:%s' % (oidc_client.client_id, oidc_client.client_secret)
- token = base64.b64encode(client_creds.encode('ascii'))
- return {'Authorization': 'Basic %s' % str(token.decode('ascii'))}
-
-
-def bearer_authentication_headers(access_token):
- return {'Authorization': 'Bearer %s' % str(access_token)}
-
-
@pytest.mark.parametrize('oidc_client', OIDC_CLIENT_PARAMS, indirect=True)
@pytest.mark.parametrize('do_not_ask_again', [(True,), (False,)])
@pytest.mark.parametrize('login_first', [(True,), (False,)])
@@ -934,58 +833,6 @@ def test_invalid_request(oidc_client, caplog, oidc_settings, simple_user, app):
assert response.json['error_description'] == 'Parameter "code" has expired or user is disconnected'
-def test_expired_manager(db, simple_user):
- expired = now() - datetime.timedelta(seconds=1)
- not_expired = now() + datetime.timedelta(days=1)
- client = OIDCClient.objects.create(
- name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
- )
- OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=expired)
- OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=not_expired)
- assert OIDCAuthorization.objects.count() == 2
- OIDCAuthorization.objects.cleanup()
- assert OIDCAuthorization.objects.count() == 1
-
- OIDCCode.objects.create(
- client=client,
- user=simple_user,
- scopes='openid',
- redirect_uri='https://example.com/',
- session_key='xxx',
- auth_time=now(),
- expired=expired,
- )
- OIDCCode.objects.create(
- client=client,
- user=simple_user,
- scopes='openid',
- redirect_uri='https://example.com/',
- session_key='xxx',
- auth_time=now(),
- expired=not_expired,
- )
- assert OIDCCode.objects.count() == 2
- OIDCCode.objects.cleanup()
- assert OIDCCode.objects.count() == 1
-
- OIDCAccessToken.objects.create(
- client=client, user=simple_user, scopes='openid', session_key='xxx', expired=expired
- )
- OIDCAccessToken.objects.create(
- client=client, user=simple_user, scopes='openid', session_key='xxx', expired=not_expired
- )
- assert OIDCAccessToken.objects.count() == 2
- OIDCAccessToken.objects.cleanup()
- assert OIDCAccessToken.objects.count() == 1
-
-
-@pytest.fixture
-def simple_oidc_client(db):
- return OIDCClient.objects.create(
- name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
- )
-
-
def test_client_secret_post_authentication(oidc_settings, app, simple_oidc_client, simple_user):
utils.login(app, simple_user)
redirect_uri = simple_oidc_client.redirect_uris.split()[0]
@@ -1149,137 +996,6 @@ def test_registration_service_slug(oidc_settings, app, simple_oidc_client, simpl
assert hooks.event[2]['kwargs']['service'] == 'client'
-def test_oidclient_claims_data_migration(migration):
- app = 'authentic2_idp_oidc'
- migrate_from = [(app, '0009_auto_20180313_1156')]
- migrate_to = [(app, '0010_oidcclaim')]
-
- old_apps = migration.before(migrate_from)
- OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
-
- client = OIDCClient(name='test', slug='test', redirect_uris='https://example.net/')
- client.save()
-
- new_apps = migration.apply(migrate_to)
- OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
-
- client = OIDCClient.objects.first()
- assert OIDCClaim.objects.filter(client=client.id).count() == 5
-
-
-def test_oidclient_preferred_username_as_identifier_data_migration(migration):
- app = 'authentic2_idp_oidc'
- migrate_from = [(app, '0010_oidcclaim')]
- migrate_to = [(app, '0011_auto_20180808_1546')]
-
- old_apps = migration.before(migrate_from)
- OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
- OIDCClaim = old_apps.get_model('authentic2_idp_oidc', 'OIDCClaim')
-
- client1 = OIDCClient.objects.create(name='test', slug='test', redirect_uris='https://example.net/')
- client2 = OIDCClient.objects.create(name='test1', slug='test1', redirect_uris='https://example.net/')
- client3 = OIDCClient.objects.create(name='test2', slug='test2', redirect_uris='https://example.net/')
- client4 = OIDCClient.objects.create(name='test3', slug='test3', redirect_uris='https://example.net/')
- for client in (client1, client2, client3, client4):
- if client.name == 'test1':
- continue
- if client.name == 'test3':
- OIDCClaim.objects.create(
- client=client, name='preferred_username', value='django_user_full_name', scopes='profile'
- )
- else:
- OIDCClaim.objects.create(
- client=client, name='preferred_username', value='django_user_username', scopes='profile'
- )
- OIDCClaim.objects.create(
- client=client, name='given_name', value='django_user_first_name', scopes='profile'
- )
- OIDCClaim.objects.create(
- client=client, name='family_name', value='django_user_last_name', scopes='profile'
- )
- if client.name == 'test2':
- continue
- OIDCClaim.objects.create(client=client, name='email', value='django_user_email', scopes='email')
- OIDCClaim.objects.create(
- client=client, name='email_verified', value='django_user_email_verified', scopes='email'
- )
-
- new_apps = migration.apply(migrate_to)
- OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
-
- client = OIDCClient.objects.first()
- for client in OIDCClient.objects.all():
- claims = client.oidcclaim_set.all()
- if client.name == 'test':
- assert claims.count() == 5
- assert sorted(claims.values_list('name', flat=True)) == [
- 'email',
- 'email_verified',
- 'family_name',
- 'given_name',
- 'preferred_username',
- ]
- assert sorted(claims.values_list('value', flat=True)) == [
- 'django_user_email',
- 'django_user_email_verified',
- 'django_user_first_name',
- 'django_user_identifier',
- 'django_user_last_name',
- ]
- elif client.name == 'test2':
- assert claims.count() == 3
- assert sorted(claims.values_list('name', flat=True)) == [
- 'family_name',
- 'given_name',
- 'preferred_username',
- ]
- assert sorted(claims.values_list('value', flat=True)) == [
- 'django_user_first_name',
- 'django_user_last_name',
- 'django_user_username',
- ]
- elif client.name == 'test3':
- assert claims.count() == 5
- assert sorted(claims.values_list('name', flat=True)) == [
- 'email',
- 'email_verified',
- 'family_name',
- 'given_name',
- 'preferred_username',
- ]
- assert sorted(claims.values_list('value', flat=True)) == [
- 'django_user_email',
- 'django_user_email_verified',
- 'django_user_first_name',
- 'django_user_full_name',
- 'django_user_last_name',
- ]
- else:
- assert claims.count() == 0
-
-
-def test_api_synchronization(app, oidc_client):
- oidc_client.has_api_access = True
- oidc_client.save()
- users = [User.objects.create(username='user-%s' % i) for i in range(10)]
- for user in users[5:]:
- user.delete()
- deleted_subs = set(make_sub(oidc_client, user) for user in users[5:])
-
- app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
- status = 200
- if oidc_client.identifier_policy not in (OIDCClient.POLICY_PAIRWISE_REVERSIBLE, OIDCClient.POLICY_UUID):
- status = 401
- response = app.post_json(
- '/api/users/synchronization/',
- params={'known_uuids': [make_sub(oidc_client, user) for user in users]},
- status=status,
- )
- if status == 200:
- assert response.json['result'] == 1
- assert set(response.json['unknown_uuids']) == deleted_subs
-
-
def test_claim_default_value(oidc_settings, normal_oidc_client, simple_user, app):
oidc_settings.A2_IDP_OIDC_SCOPES = ['openid', 'profile', 'email', 'phone']
Attribute.objects.create(
@@ -1922,129 +1638,3 @@ def test_oidc_good_next_url_hook(app, oidc_client):
rf = RequestFactory()
request = rf.get('/')
assert good_next_url(request, 'https://example.com/')
-
-
-@pytest.fixture
-def access_token(client, simple_user):
- return OIDCAccessToken.objects.create(
- client=client,
- user=simple_user,
- scopes='openid profile email',
- expired=now() + datetime.timedelta(seconds=3600),
- )
-
-
-def test_user_info(app, client, access_token, freezer):
- def get_user_info(**kwargs):
- return app.get(
- '/idp/oidc/user_info/', headers=bearer_authentication_headers(access_token.uuid), **kwargs
- )
-
- response = app.get('/idp/oidc/user_info/', status=401)
- assert (
- response['WWW-Authenticate']
- == 'Bearer error="invalid_request", error_description="Bearer authentication is mandatory"'
- )
-
- response = app.get('/idp/oidc/user_info/', headers={'Authorization': 'Bearer'}, status=401)
- assert (
- response['WWW-Authenticate']
- == 'Bearer error="invalid_request", error_description="Invalid Bearer authentication"'
- )
-
- response = get_user_info(status=200)
- assert dict(response.json, sub='') == {
- 'email': 'user@example.net',
- 'email_verified': False,
- 'family_name': 'Dôe',
- 'family_name_verified': True,
- 'given_name': 'Jôhn',
- 'given_name_verified': True,
- 'preferred_username': 'user',
- 'sub': '',
- }
-
- # token is expired
- access_token.expired = now() - datetime.timedelta(seconds=1)
- access_token.save()
- response = get_user_info(status=401)
- assert (
- response['WWW-Authenticate']
- == 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
- )
-
- # token is unknown
- access_token.delete()
- response = get_user_info(status=401)
- assert response['WWW-Authenticate'] == 'Bearer error="invalid_token", error_description="Token unknown"'
-
- utils.login(app, access_token.user)
- access_token.expired = now() + datetime.timedelta(seconds=1)
- access_token.session_key = app.session.session_key
- access_token.save()
-
- get_user_info(status=200)
-
- app.session.flush()
- response = get_user_info(status=401)
- assert (
- response['WWW-Authenticate']
- == 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
- )
-
-
-@pytest.fixture
-def session(settings, db, simple_user):
- engine = import_module(settings.SESSION_ENGINE)
- session = engine.SessionStore()
- session['_auth_user_id'] = str(simple_user.id)
- session.create()
- return session
-
-
-def test_access_token_is_valid_session(simple_oidc_client, simple_user, session):
- token = OIDCAccessToken.objects.create(
- client=simple_oidc_client, user=simple_user, scopes='openid', session_key=session.session_key
- )
-
- assert token.is_valid()
- session.flush()
- assert not token.is_valid()
-
-
-def test_access_token_is_valid_expired(simple_oidc_client, simple_user, freezer):
- start = now()
- expired = start + datetime.timedelta(seconds=30)
-
- token = OIDCAccessToken.objects.create(
- client=simple_oidc_client, user=simple_user, scopes='openid', expired=expired
- )
-
- assert token.is_valid()
- freezer.move_to(expired)
- assert token.is_valid()
- freezer.move_to(expired + datetime.timedelta(seconds=1))
- assert not token.is_valid()
-
-
-def test_access_token_is_valid_session_and_expired(simple_oidc_client, simple_user, session, freezer):
- start = now()
- expired = start + datetime.timedelta(seconds=30)
-
- token = OIDCAccessToken.objects.create(
- client=simple_oidc_client,
- user=simple_user,
- scopes='openid',
- session_key=session.session_key,
- expired=expired,
- )
-
- assert token.is_valid()
- freezer.move_to(expired)
- assert token.is_valid()
- freezer.move_to(expired + datetime.timedelta(seconds=1))
- assert not token.is_valid()
- freezer.move_to(start)
- assert token.is_valid()
- session.flush()
- assert not token.is_valid()
diff --git a/tests/idp_oidc/test_models.py b/tests/idp_oidc/test_models.py
new file mode 100644
index 000000000..c764d72e8
--- /dev/null
+++ b/tests/idp_oidc/test_models.py
@@ -0,0 +1,115 @@
+# authentic2 - versatile identity manager
+# Copyright (C) 2010-2021 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import datetime
+
+from django.utils.timezone import now
+
+from authentic2.a2_rbac.utils import get_default_ou
+from authentic2_idp_oidc.models import OIDCAccessToken, OIDCAuthorization, OIDCClient, OIDCCode
+
+
+def test_expired_manager(db, simple_user):
+ expired = now() - datetime.timedelta(seconds=1)
+ not_expired = now() + datetime.timedelta(days=1)
+ client = OIDCClient.objects.create(
+ name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
+ )
+ OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=expired)
+ OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=not_expired)
+ assert OIDCAuthorization.objects.count() == 2
+ OIDCAuthorization.objects.cleanup()
+ assert OIDCAuthorization.objects.count() == 1
+
+ OIDCCode.objects.create(
+ client=client,
+ user=simple_user,
+ scopes='openid',
+ redirect_uri='https://example.com/',
+ session_key='xxx',
+ auth_time=now(),
+ expired=expired,
+ )
+ OIDCCode.objects.create(
+ client=client,
+ user=simple_user,
+ scopes='openid',
+ redirect_uri='https://example.com/',
+ session_key='xxx',
+ auth_time=now(),
+ expired=not_expired,
+ )
+ assert OIDCCode.objects.count() == 2
+ OIDCCode.objects.cleanup()
+ assert OIDCCode.objects.count() == 1
+
+ OIDCAccessToken.objects.create(
+ client=client, user=simple_user, scopes='openid', session_key='xxx', expired=expired
+ )
+ OIDCAccessToken.objects.create(
+ client=client, user=simple_user, scopes='openid', session_key='xxx', expired=not_expired
+ )
+ assert OIDCAccessToken.objects.count() == 2
+ OIDCAccessToken.objects.cleanup()
+ assert OIDCAccessToken.objects.count() == 1
+
+
+def test_access_token_is_valid_session(simple_oidc_client, simple_user, session):
+ token = OIDCAccessToken.objects.create(
+ client=simple_oidc_client, user=simple_user, scopes='openid', session_key=session.session_key
+ )
+
+ assert token.is_valid()
+ session.flush()
+ assert not token.is_valid()
+
+
+def test_access_token_is_valid_expired(simple_oidc_client, simple_user, freezer):
+ start = now()
+ expired = start + datetime.timedelta(seconds=30)
+
+ token = OIDCAccessToken.objects.create(
+ client=simple_oidc_client, user=simple_user, scopes='openid', expired=expired
+ )
+
+ assert token.is_valid()
+ freezer.move_to(expired)
+ assert token.is_valid()
+ freezer.move_to(expired + datetime.timedelta(seconds=1))
+ assert not token.is_valid()
+
+
+def test_access_token_is_valid_session_and_expired(simple_oidc_client, simple_user, session, freezer):
+ start = now()
+ expired = start + datetime.timedelta(seconds=30)
+
+ token = OIDCAccessToken.objects.create(
+ client=simple_oidc_client,
+ user=simple_user,
+ scopes='openid',
+ session_key=session.session_key,
+ expired=expired,
+ )
+
+ assert token.is_valid()
+ freezer.move_to(expired)
+ assert token.is_valid()
+ freezer.move_to(expired + datetime.timedelta(seconds=1))
+ assert not token.is_valid()
+ freezer.move_to(start)
+ assert token.is_valid()
+ session.flush()
+ assert not token.is_valid()
diff --git a/tests/idp_oidc/test_views.py b/tests/idp_oidc/test_views.py
new file mode 100644
index 000000000..86997a7f1
--- /dev/null
+++ b/tests/idp_oidc/test_views.py
@@ -0,0 +1,90 @@
+# authentic2 - versatile identity manager
+# Copyright (C) 2010-2021 Entr'ouvert
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import datetime
+
+from django.utils.timezone import now
+
+from authentic2_idp_oidc.models import OIDCAccessToken
+
+from .. import utils
+from .conftest import bearer_authentication_headers
+
+
+def test_user_info(app, client, freezer, simple_user):
+ access_token = OIDCAccessToken.objects.create(
+ client=client,
+ user=simple_user,
+ scopes='openid profile email',
+ expired=now() + datetime.timedelta(seconds=3600),
+ )
+
+ def get_user_info(**kwargs):
+ return app.get(
+ '/idp/oidc/user_info/', headers=bearer_authentication_headers(access_token.uuid), **kwargs
+ )
+
+ response = app.get('/idp/oidc/user_info/', status=401)
+ assert (
+ response['WWW-Authenticate']
+ == 'Bearer error="invalid_request", error_description="Bearer authentication is mandatory"'
+ )
+
+ response = app.get('/idp/oidc/user_info/', headers={'Authorization': 'Bearer'}, status=401)
+ assert (
+ response['WWW-Authenticate']
+ == 'Bearer error="invalid_request", error_description="Invalid Bearer authentication"'
+ )
+
+ response = get_user_info(status=200)
+ assert dict(response.json, sub='') == {
+ 'email': 'user@example.net',
+ 'email_verified': False,
+ 'family_name': 'Dôe',
+ 'family_name_verified': True,
+ 'given_name': 'Jôhn',
+ 'given_name_verified': True,
+ 'preferred_username': 'user',
+ 'sub': '',
+ }
+
+ # token is expired
+ access_token.expired = now() - datetime.timedelta(seconds=1)
+ access_token.save()
+ response = get_user_info(status=401)
+ assert (
+ response['WWW-Authenticate']
+ == 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
+ )
+
+ # token is unknown
+ access_token.delete()
+ response = get_user_info(status=401)
+ assert response['WWW-Authenticate'] == 'Bearer error="invalid_token", error_description="Token unknown"'
+
+ utils.login(app, access_token.user)
+ access_token.expired = now() + datetime.timedelta(seconds=1)
+ access_token.session_key = app.session.session_key
+ access_token.save()
+
+ get_user_info(status=200)
+
+ app.session.flush()
+ response = get_user_info(status=401)
+ assert (
+ response['WWW-Authenticate']
+ == 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
+ )