tests: move idp_oidc tests in a subdirectory (#54740)
This commit is contained in:
parent
1792fbe72d
commit
7db6fe5bf0
|
@ -0,0 +1,148 @@
|
|||
# authentic2 - versatile identity manager
|
||||
# Copyright (C) 2010-2021 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import base64
|
||||
from importlib import import_module
|
||||
|
||||
import pytest
|
||||
from django.urls import reverse
|
||||
|
||||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
from authentic2.models import Attribute
|
||||
from authentic2_idp_oidc import app_settings
|
||||
from authentic2_idp_oidc.models import OIDCClaim, OIDCClient
|
||||
from tests import utils
|
||||
|
||||
JWKSET = {
|
||||
"keys": [
|
||||
{
|
||||
"qi": "h_zifVD-ChelxZUVxhICNcgGkQz26b-EdIlLY9rN7SX_aD3sLI_JHEHV4Bz3kV5eW8O4qJ8SHhfUdHGK-gRH7FVOGoXnXACf47QoXowHzsPLL64wCuZENTl7hIRGLY-BInULkfTQfuiVSMoxPjsVNTMBzMiz0bNjMQyMyvW5xH4",
|
||||
"kty": "RSA",
|
||||
"d": "pUcL4-LDBy3rqJWip269h5Hd6nLvqjXltfkVe_mL-LwZPHmCrUaj_SX54SnCY3Wyf7kxhoMYUac62lQ71923uJPFFdiavAujbNrtZPq32i4C-1apWXW8OGJr8VoVDqalxj9SAq1G54wbbsaAPrZdyuqy-esNxDqDigfbM-cWgngBBYo5CSsfnmnd05N2cUS26L7QzWbNHwilnBTE9e_J7rK3xUCDKrobv6_LiI-AhMmBHJSrCxjexh0wzfBi_Ntj9BGCcPThDjG8SQvaV-aLNdLfIy2XO3i076RLBB6Hm_yHuAparrwp-pPE48eQdiYjrSAFalz4ojWQ3_ByLA6uAQ",
|
||||
"q": "2FvfeWnIlWNUipan7DIBlJrmz5EinJNxrQ-BNwPHrAoIM8qvyC7jPy09YxZs5Y9CMMZSal6C4Nm2LHBFxHU9z1qd5XDzbk19G-y1lDqZizVXr876TpiAjuq03rcoMQm8dQru_pVjUdgxR64vKyJ9CaFMAqcpZeEMIqAvzhQG8uE",
|
||||
"dp": "Kg4HPGpzenhK2ser6nfM1Yt-pkqBbWQotvqsxGptECXpbN7vweupvL5kJPeRrbsXKp9QE7DXTN1sG9puJxMSwtgiv4hr9Va9e9WOC6PMd2VY7tgw5uKMpPLMc5y82PusRhBoRh0SUUsjyQxK9PGtWYnGZXbAoaIYPdMyDlosfqU",
|
||||
"dq": "QuUNEHYTjZTbo8n2-4FumarXKGBAalbwM8jyc7cYemnTpWfKt8M_gd4T99oMK2IC3h_DhZ3ZK3pE6DKCb76sMLtczH8C1RziTMsATWdc5_zDMtl07O4b-ZQ5_g51P8w515pc0JwRzFFi0z3Y2aZdMKgNX1id5SES5nXOshHhICE",
|
||||
"n": "0lN6CiJGFD8BSPV_azLoEl6Nq-WlHkU743D5rqvzw1sOaxstMGxAhVk2YIhWwfvapV6XjO_yvc4778VBTELOdjRw6BGUdBJepdwkL__TPyjEVhqMQj9MKhEU4GUy9w0Lsilb5D01kfrOKpmdcYw4jhcDvb0H4-LZgh1Vk84vF4WaQCUg_AX4drVDQOjoU8kuWIM8gz9w6zEsbIw-gtMRpFwS8ncA0zDX5VfyC77iMxzFftDIP2gM5GvdevMzvP9IRkRRBhP9vV4JchBFPHSA9OPJcnySjJJNW6aAJn6P6JasN1z68khjufM09J8UzmLAZYOq7gUG95Ox1KsV-g337Q",
|
||||
"e": "AQAB",
|
||||
"p": "-Nyj_Sw3f2HUqSssCZv84y7b3blOtGGAhfYN_JtGfcTQv2bOtxrIUzeonCi-Z_1W4hO10tqxJcOB0ibtDqkDlLhnLaIYOBfriITRFK83EJG5sC-0KTmFzUXFTA2aMc1QgP-Fu6gUfQpPqLgWxhx8EFhkBlBZshKU5-C-385Sco0",
|
||||
"kid": "46c686ea-7d4e-41cd-a462-2125fc1dee0e",
|
||||
},
|
||||
{
|
||||
"kty": "EC",
|
||||
"d": "wwULaR9UYWZW6U2oEbkz3sO1lhPSj6DyA6e7PiUfhog",
|
||||
"use": "sig",
|
||||
"crv": "P-256",
|
||||
"x": "HZMHZkX-63heqA5pvWn-UR7bgcXZNEcQa5wfvG_BzTw",
|
||||
"y": "SUCuwjjiyKvGq5Odr0sjDqjha_CBqks0JQFrR7Ei5OQ",
|
||||
"alg": "ES256",
|
||||
"kid": "ac85baf4-835b-49b2-8272-ffecce7654c9",
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def jwkset():
|
||||
return JWKSET
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def oidc_settings(settings, jwkset):
|
||||
settings.A2_IDP_OIDC_JWKSET = jwkset
|
||||
settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m'
|
||||
return settings
|
||||
|
||||
|
||||
def make_client(app, superuser, params=None):
|
||||
Attribute.objects.create(
|
||||
name='cityscape_image',
|
||||
label='cityscape',
|
||||
kind='profile_image',
|
||||
asked_on_registration=True,
|
||||
required=False,
|
||||
user_visible=True,
|
||||
user_editable=True,
|
||||
)
|
||||
|
||||
client = OIDCClient(
|
||||
name='oidcclient',
|
||||
slug='oidcclient',
|
||||
ou=get_default_ou(),
|
||||
unauthorized_url='https://example.com/southpark/',
|
||||
redirect_uris='https://example.com/callbac%C3%A9',
|
||||
)
|
||||
|
||||
for key, value in (params or {}).items():
|
||||
setattr(client, key, value)
|
||||
client.save()
|
||||
for mapping in app_settings.DEFAULT_MAPPINGS:
|
||||
OIDCClaim.objects.create(
|
||||
client=client, name=mapping['name'], value=mapping['value'], scopes=mapping['scopes']
|
||||
)
|
||||
return client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(app, superuser):
|
||||
return make_client(app, superuser, {})
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def simple_oidc_client(db):
|
||||
return OIDCClient.objects.create(
|
||||
name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def oidc_client(request, superuser, app, simple_user, oidc_settings):
|
||||
return make_client(app, superuser, getattr(request, 'param', None) or {})
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def normal_oidc_client(superuser, app, simple_user):
|
||||
url = reverse('admin:authentic2_idp_oidc_oidcclient_add')
|
||||
assert OIDCClient.objects.count() == 0
|
||||
response = utils.login(app, superuser, path=url)
|
||||
response.form.set('name', 'oidcclient')
|
||||
response.form.set('slug', 'oidcclient')
|
||||
response.form.set('ou', get_default_ou().pk)
|
||||
response.form.set('unauthorized_url', 'https://example.com/southpark/')
|
||||
response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9')
|
||||
response = response.form.submit(name='_save').follow()
|
||||
assert OIDCClient.objects.count() == 1
|
||||
client = OIDCClient.objects.get()
|
||||
utils.logout(app)
|
||||
return client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def session(settings, db, simple_user):
|
||||
engine = import_module(settings.SESSION_ENGINE)
|
||||
session = engine.SessionStore()
|
||||
session['_auth_user_id'] = str(simple_user.id)
|
||||
session.create()
|
||||
return session
|
||||
|
||||
|
||||
def client_authentication_headers(oidc_client):
|
||||
client_creds = '%s:%s' % (oidc_client.client_id, oidc_client.client_secret)
|
||||
token = base64.b64encode(client_creds.encode('ascii'))
|
||||
return {'Authorization': 'Basic %s' % str(token.decode('ascii'))}
|
||||
|
||||
|
||||
def bearer_authentication_headers(access_token):
|
||||
return {'Authorization': 'Bearer %s' % str(access_token)}
|
|
@ -0,0 +1,41 @@
|
|||
# authentic2 - versatile identity manager
|
||||
# Copyright (C) 2010-2021 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from authentic2.custom_user.models import User
|
||||
from authentic2_idp_oidc.models import OIDCClient
|
||||
from authentic2_idp_oidc.utils import make_sub
|
||||
|
||||
|
||||
def test_api_synchronization(app, oidc_client):
|
||||
oidc_client.has_api_access = True
|
||||
oidc_client.save()
|
||||
users = [User.objects.create(username='user-%s' % i) for i in range(10)]
|
||||
for user in users[5:]:
|
||||
user.delete()
|
||||
deleted_subs = set(make_sub(oidc_client, user) for user in users[5:])
|
||||
|
||||
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
|
||||
status = 200
|
||||
if oidc_client.identifier_policy not in (OIDCClient.POLICY_PAIRWISE_REVERSIBLE, OIDCClient.POLICY_UUID):
|
||||
status = 401
|
||||
response = app.post_json(
|
||||
'/api/users/synchronization/',
|
||||
params={'known_uuids': [make_sub(oidc_client, user) for user in users]},
|
||||
status=status,
|
||||
)
|
||||
if status == 200:
|
||||
assert response.json['result'] == 1
|
||||
assert set(response.json['unknown_uuids']) == deleted_subs
|
|
@ -0,0 +1,125 @@
|
|||
# authentic2 - versatile identity manager
|
||||
# Copyright (C) 2010-2021 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
def test_oidclient_claims_data_migration(migration):
|
||||
app = 'authentic2_idp_oidc'
|
||||
migrate_from = [(app, '0009_auto_20180313_1156')]
|
||||
migrate_to = [(app, '0010_oidcclaim')]
|
||||
|
||||
old_apps = migration.before(migrate_from)
|
||||
OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
|
||||
|
||||
client = OIDCClient(name='test', slug='test', redirect_uris='https://example.net/')
|
||||
client.save()
|
||||
|
||||
new_apps = migration.apply(migrate_to)
|
||||
OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
|
||||
OIDCClaim = new_apps.get_model('authentic2_idp_oidc', 'OIDCClaim')
|
||||
|
||||
client = OIDCClient.objects.first()
|
||||
assert OIDCClaim.objects.filter(client=client.id).count() == 5
|
||||
|
||||
|
||||
def test_oidclient_preferred_username_as_identifier_data_migration(migration):
|
||||
app = 'authentic2_idp_oidc'
|
||||
migrate_from = [(app, '0010_oidcclaim')]
|
||||
migrate_to = [(app, '0011_auto_20180808_1546')]
|
||||
|
||||
old_apps = migration.before(migrate_from)
|
||||
OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
|
||||
OIDCClaim = old_apps.get_model('authentic2_idp_oidc', 'OIDCClaim')
|
||||
|
||||
client1 = OIDCClient.objects.create(name='test', slug='test', redirect_uris='https://example.net/')
|
||||
client2 = OIDCClient.objects.create(name='test1', slug='test1', redirect_uris='https://example.net/')
|
||||
client3 = OIDCClient.objects.create(name='test2', slug='test2', redirect_uris='https://example.net/')
|
||||
client4 = OIDCClient.objects.create(name='test3', slug='test3', redirect_uris='https://example.net/')
|
||||
for client in (client1, client2, client3, client4):
|
||||
if client.name == 'test1':
|
||||
continue
|
||||
if client.name == 'test3':
|
||||
OIDCClaim.objects.create(
|
||||
client=client, name='preferred_username', value='django_user_full_name', scopes='profile'
|
||||
)
|
||||
else:
|
||||
OIDCClaim.objects.create(
|
||||
client=client, name='preferred_username', value='django_user_username', scopes='profile'
|
||||
)
|
||||
OIDCClaim.objects.create(
|
||||
client=client, name='given_name', value='django_user_first_name', scopes='profile'
|
||||
)
|
||||
OIDCClaim.objects.create(
|
||||
client=client, name='family_name', value='django_user_last_name', scopes='profile'
|
||||
)
|
||||
if client.name == 'test2':
|
||||
continue
|
||||
OIDCClaim.objects.create(client=client, name='email', value='django_user_email', scopes='email')
|
||||
OIDCClaim.objects.create(
|
||||
client=client, name='email_verified', value='django_user_email_verified', scopes='email'
|
||||
)
|
||||
|
||||
new_apps = migration.apply(migrate_to)
|
||||
OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
|
||||
|
||||
client = OIDCClient.objects.first()
|
||||
for client in OIDCClient.objects.all():
|
||||
claims = client.oidcclaim_set.all()
|
||||
if client.name == 'test':
|
||||
assert claims.count() == 5
|
||||
assert sorted(claims.values_list('name', flat=True)) == [
|
||||
'email',
|
||||
'email_verified',
|
||||
'family_name',
|
||||
'given_name',
|
||||
'preferred_username',
|
||||
]
|
||||
assert sorted(claims.values_list('value', flat=True)) == [
|
||||
'django_user_email',
|
||||
'django_user_email_verified',
|
||||
'django_user_first_name',
|
||||
'django_user_identifier',
|
||||
'django_user_last_name',
|
||||
]
|
||||
elif client.name == 'test2':
|
||||
assert claims.count() == 3
|
||||
assert sorted(claims.values_list('name', flat=True)) == [
|
||||
'family_name',
|
||||
'given_name',
|
||||
'preferred_username',
|
||||
]
|
||||
assert sorted(claims.values_list('value', flat=True)) == [
|
||||
'django_user_first_name',
|
||||
'django_user_last_name',
|
||||
'django_user_username',
|
||||
]
|
||||
elif client.name == 'test3':
|
||||
assert claims.count() == 5
|
||||
assert sorted(claims.values_list('name', flat=True)) == [
|
||||
'email',
|
||||
'email_verified',
|
||||
'family_name',
|
||||
'given_name',
|
||||
'preferred_username',
|
||||
]
|
||||
assert sorted(claims.values_list('value', flat=True)) == [
|
||||
'django_user_email',
|
||||
'django_user_email_verified',
|
||||
'django_user_first_name',
|
||||
'django_user_full_name',
|
||||
'django_user_last_name',
|
||||
]
|
||||
else:
|
||||
assert claims.count() == 0
|
|
@ -1,5 +1,5 @@
|
|||
# authentic2 - versatile identity manager
|
||||
# Copyright (C) 2010-2019 Entr'ouvert
|
||||
# Copyright (C) 2010-2021 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
|
@ -19,7 +19,6 @@ import datetime
|
|||
import functools
|
||||
import json
|
||||
import urllib.parse
|
||||
from importlib import import_module
|
||||
|
||||
import pytest
|
||||
from django.contrib.auth import get_user_model
|
||||
|
@ -37,51 +36,17 @@ from authentic2.a2_rbac.utils import get_default_ou
|
|||
from authentic2.models import Attribute, AuthorizedRole
|
||||
from authentic2.utils import good_next_url, make_url
|
||||
from authentic2_auth_oidc.utils import parse_timestamp
|
||||
from authentic2_idp_oidc import app_settings
|
||||
from authentic2_idp_oidc.models import OIDCAccessToken, OIDCAuthorization, OIDCClaim, OIDCClient, OIDCCode
|
||||
from authentic2_idp_oidc.utils import base64url, get_first_ec_sig_key, get_first_rsa_sig_key, make_sub
|
||||
from django_rbac.utils import get_ou_model, get_role_model
|
||||
|
||||
from . import utils
|
||||
from .. import utils
|
||||
from .conftest import bearer_authentication_headers, client_authentication_headers
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
JWKSET = {
|
||||
"keys": [
|
||||
{
|
||||
"qi": "h_zifVD-ChelxZUVxhICNcgGkQz26b-EdIlLY9rN7SX_aD3sLI_JHEHV4Bz3kV5eW8O4qJ8SHhfUdHGK-gRH7FVOGoXnXACf47QoXowHzsPLL64wCuZENTl7hIRGLY-BInULkfTQfuiVSMoxPjsVNTMBzMiz0bNjMQyMyvW5xH4",
|
||||
"kty": "RSA",
|
||||
"d": "pUcL4-LDBy3rqJWip269h5Hd6nLvqjXltfkVe_mL-LwZPHmCrUaj_SX54SnCY3Wyf7kxhoMYUac62lQ71923uJPFFdiavAujbNrtZPq32i4C-1apWXW8OGJr8VoVDqalxj9SAq1G54wbbsaAPrZdyuqy-esNxDqDigfbM-cWgngBBYo5CSsfnmnd05N2cUS26L7QzWbNHwilnBTE9e_J7rK3xUCDKrobv6_LiI-AhMmBHJSrCxjexh0wzfBi_Ntj9BGCcPThDjG8SQvaV-aLNdLfIy2XO3i076RLBB6Hm_yHuAparrwp-pPE48eQdiYjrSAFalz4ojWQ3_ByLA6uAQ",
|
||||
"q": "2FvfeWnIlWNUipan7DIBlJrmz5EinJNxrQ-BNwPHrAoIM8qvyC7jPy09YxZs5Y9CMMZSal6C4Nm2LHBFxHU9z1qd5XDzbk19G-y1lDqZizVXr876TpiAjuq03rcoMQm8dQru_pVjUdgxR64vKyJ9CaFMAqcpZeEMIqAvzhQG8uE",
|
||||
"dp": "Kg4HPGpzenhK2ser6nfM1Yt-pkqBbWQotvqsxGptECXpbN7vweupvL5kJPeRrbsXKp9QE7DXTN1sG9puJxMSwtgiv4hr9Va9e9WOC6PMd2VY7tgw5uKMpPLMc5y82PusRhBoRh0SUUsjyQxK9PGtWYnGZXbAoaIYPdMyDlosfqU",
|
||||
"dq": "QuUNEHYTjZTbo8n2-4FumarXKGBAalbwM8jyc7cYemnTpWfKt8M_gd4T99oMK2IC3h_DhZ3ZK3pE6DKCb76sMLtczH8C1RziTMsATWdc5_zDMtl07O4b-ZQ5_g51P8w515pc0JwRzFFi0z3Y2aZdMKgNX1id5SES5nXOshHhICE",
|
||||
"n": "0lN6CiJGFD8BSPV_azLoEl6Nq-WlHkU743D5rqvzw1sOaxstMGxAhVk2YIhWwfvapV6XjO_yvc4778VBTELOdjRw6BGUdBJepdwkL__TPyjEVhqMQj9MKhEU4GUy9w0Lsilb5D01kfrOKpmdcYw4jhcDvb0H4-LZgh1Vk84vF4WaQCUg_AX4drVDQOjoU8kuWIM8gz9w6zEsbIw-gtMRpFwS8ncA0zDX5VfyC77iMxzFftDIP2gM5GvdevMzvP9IRkRRBhP9vV4JchBFPHSA9OPJcnySjJJNW6aAJn6P6JasN1z68khjufM09J8UzmLAZYOq7gUG95Ox1KsV-g337Q",
|
||||
"e": "AQAB",
|
||||
"p": "-Nyj_Sw3f2HUqSssCZv84y7b3blOtGGAhfYN_JtGfcTQv2bOtxrIUzeonCi-Z_1W4hO10tqxJcOB0ibtDqkDlLhnLaIYOBfriITRFK83EJG5sC-0KTmFzUXFTA2aMc1QgP-Fu6gUfQpPqLgWxhx8EFhkBlBZshKU5-C-385Sco0",
|
||||
"kid": "46c686ea-7d4e-41cd-a462-2125fc1dee0e",
|
||||
},
|
||||
{
|
||||
"kty": "EC",
|
||||
"d": "wwULaR9UYWZW6U2oEbkz3sO1lhPSj6DyA6e7PiUfhog",
|
||||
"use": "sig",
|
||||
"crv": "P-256",
|
||||
"x": "HZMHZkX-63heqA5pvWn-UR7bgcXZNEcQa5wfvG_BzTw",
|
||||
"y": "SUCuwjjiyKvGq5Odr0sjDqjha_CBqks0JQFrR7Ei5OQ",
|
||||
"alg": "ES256",
|
||||
"kid": "ac85baf4-835b-49b2-8272-ffecce7654c9",
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def oidc_settings(settings):
|
||||
settings.A2_IDP_OIDC_JWKSET = JWKSET
|
||||
settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m'
|
||||
return settings
|
||||
|
||||
|
||||
def test_get_jwkset(oidc_settings):
|
||||
from authentic2_idp_oidc.utils import get_jwkset
|
||||
|
@ -159,72 +124,6 @@ def test_admin(other_attributes, app, superuser, oidc_settings):
|
|||
assert OIDCClient.objects.count() == 1
|
||||
|
||||
|
||||
def make_client(app, superuser, params=None):
|
||||
Attribute.objects.create(
|
||||
name='cityscape_image',
|
||||
label='cityscape',
|
||||
kind='profile_image',
|
||||
asked_on_registration=True,
|
||||
required=False,
|
||||
user_visible=True,
|
||||
user_editable=True,
|
||||
)
|
||||
|
||||
client = OIDCClient(
|
||||
name='oidcclient',
|
||||
slug='oidcclient',
|
||||
ou=get_default_ou(),
|
||||
unauthorized_url='https://example.com/southpark/',
|
||||
redirect_uris='https://example.com/callbac%C3%A9',
|
||||
)
|
||||
|
||||
for key, value in (params or {}).items():
|
||||
setattr(client, key, value)
|
||||
client.save()
|
||||
for mapping in app_settings.DEFAULT_MAPPINGS:
|
||||
OIDCClaim.objects.create(
|
||||
client=client, name=mapping['name'], value=mapping['value'], scopes=mapping['scopes']
|
||||
)
|
||||
return client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(app, superuser):
|
||||
return make_client(app, superuser, {})
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def oidc_client(request, superuser, app, simple_user, oidc_settings):
|
||||
return make_client(app, superuser, getattr(request, 'param', None) or {})
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def normal_oidc_client(superuser, app, simple_user):
|
||||
url = reverse('admin:authentic2_idp_oidc_oidcclient_add')
|
||||
assert OIDCClient.objects.count() == 0
|
||||
response = utils.login(app, superuser, path=url)
|
||||
response.form.set('name', 'oidcclient')
|
||||
response.form.set('slug', 'oidcclient')
|
||||
response.form.set('ou', get_default_ou().pk)
|
||||
response.form.set('unauthorized_url', 'https://example.com/southpark/')
|
||||
response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9')
|
||||
response = response.form.submit(name='_save').follow()
|
||||
assert OIDCClient.objects.count() == 1
|
||||
client = OIDCClient.objects.get()
|
||||
utils.logout(app)
|
||||
return client
|
||||
|
||||
|
||||
def client_authentication_headers(oidc_client):
|
||||
client_creds = '%s:%s' % (oidc_client.client_id, oidc_client.client_secret)
|
||||
token = base64.b64encode(client_creds.encode('ascii'))
|
||||
return {'Authorization': 'Basic %s' % str(token.decode('ascii'))}
|
||||
|
||||
|
||||
def bearer_authentication_headers(access_token):
|
||||
return {'Authorization': 'Bearer %s' % str(access_token)}
|
||||
|
||||
|
||||
@pytest.mark.parametrize('oidc_client', OIDC_CLIENT_PARAMS, indirect=True)
|
||||
@pytest.mark.parametrize('do_not_ask_again', [(True,), (False,)])
|
||||
@pytest.mark.parametrize('login_first', [(True,), (False,)])
|
||||
|
@ -934,58 +833,6 @@ def test_invalid_request(oidc_client, caplog, oidc_settings, simple_user, app):
|
|||
assert response.json['error_description'] == 'Parameter "code" has expired or user is disconnected'
|
||||
|
||||
|
||||
def test_expired_manager(db, simple_user):
|
||||
expired = now() - datetime.timedelta(seconds=1)
|
||||
not_expired = now() + datetime.timedelta(days=1)
|
||||
client = OIDCClient.objects.create(
|
||||
name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
|
||||
)
|
||||
OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=expired)
|
||||
OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=not_expired)
|
||||
assert OIDCAuthorization.objects.count() == 2
|
||||
OIDCAuthorization.objects.cleanup()
|
||||
assert OIDCAuthorization.objects.count() == 1
|
||||
|
||||
OIDCCode.objects.create(
|
||||
client=client,
|
||||
user=simple_user,
|
||||
scopes='openid',
|
||||
redirect_uri='https://example.com/',
|
||||
session_key='xxx',
|
||||
auth_time=now(),
|
||||
expired=expired,
|
||||
)
|
||||
OIDCCode.objects.create(
|
||||
client=client,
|
||||
user=simple_user,
|
||||
scopes='openid',
|
||||
redirect_uri='https://example.com/',
|
||||
session_key='xxx',
|
||||
auth_time=now(),
|
||||
expired=not_expired,
|
||||
)
|
||||
assert OIDCCode.objects.count() == 2
|
||||
OIDCCode.objects.cleanup()
|
||||
assert OIDCCode.objects.count() == 1
|
||||
|
||||
OIDCAccessToken.objects.create(
|
||||
client=client, user=simple_user, scopes='openid', session_key='xxx', expired=expired
|
||||
)
|
||||
OIDCAccessToken.objects.create(
|
||||
client=client, user=simple_user, scopes='openid', session_key='xxx', expired=not_expired
|
||||
)
|
||||
assert OIDCAccessToken.objects.count() == 2
|
||||
OIDCAccessToken.objects.cleanup()
|
||||
assert OIDCAccessToken.objects.count() == 1
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def simple_oidc_client(db):
|
||||
return OIDCClient.objects.create(
|
||||
name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
|
||||
)
|
||||
|
||||
|
||||
def test_client_secret_post_authentication(oidc_settings, app, simple_oidc_client, simple_user):
|
||||
utils.login(app, simple_user)
|
||||
redirect_uri = simple_oidc_client.redirect_uris.split()[0]
|
||||
|
@ -1149,137 +996,6 @@ def test_registration_service_slug(oidc_settings, app, simple_oidc_client, simpl
|
|||
assert hooks.event[2]['kwargs']['service'] == 'client'
|
||||
|
||||
|
||||
def test_oidclient_claims_data_migration(migration):
|
||||
app = 'authentic2_idp_oidc'
|
||||
migrate_from = [(app, '0009_auto_20180313_1156')]
|
||||
migrate_to = [(app, '0010_oidcclaim')]
|
||||
|
||||
old_apps = migration.before(migrate_from)
|
||||
OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
|
||||
|
||||
client = OIDCClient(name='test', slug='test', redirect_uris='https://example.net/')
|
||||
client.save()
|
||||
|
||||
new_apps = migration.apply(migrate_to)
|
||||
OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
|
||||
|
||||
client = OIDCClient.objects.first()
|
||||
assert OIDCClaim.objects.filter(client=client.id).count() == 5
|
||||
|
||||
|
||||
def test_oidclient_preferred_username_as_identifier_data_migration(migration):
|
||||
app = 'authentic2_idp_oidc'
|
||||
migrate_from = [(app, '0010_oidcclaim')]
|
||||
migrate_to = [(app, '0011_auto_20180808_1546')]
|
||||
|
||||
old_apps = migration.before(migrate_from)
|
||||
OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
|
||||
OIDCClaim = old_apps.get_model('authentic2_idp_oidc', 'OIDCClaim')
|
||||
|
||||
client1 = OIDCClient.objects.create(name='test', slug='test', redirect_uris='https://example.net/')
|
||||
client2 = OIDCClient.objects.create(name='test1', slug='test1', redirect_uris='https://example.net/')
|
||||
client3 = OIDCClient.objects.create(name='test2', slug='test2', redirect_uris='https://example.net/')
|
||||
client4 = OIDCClient.objects.create(name='test3', slug='test3', redirect_uris='https://example.net/')
|
||||
for client in (client1, client2, client3, client4):
|
||||
if client.name == 'test1':
|
||||
continue
|
||||
if client.name == 'test3':
|
||||
OIDCClaim.objects.create(
|
||||
client=client, name='preferred_username', value='django_user_full_name', scopes='profile'
|
||||
)
|
||||
else:
|
||||
OIDCClaim.objects.create(
|
||||
client=client, name='preferred_username', value='django_user_username', scopes='profile'
|
||||
)
|
||||
OIDCClaim.objects.create(
|
||||
client=client, name='given_name', value='django_user_first_name', scopes='profile'
|
||||
)
|
||||
OIDCClaim.objects.create(
|
||||
client=client, name='family_name', value='django_user_last_name', scopes='profile'
|
||||
)
|
||||
if client.name == 'test2':
|
||||
continue
|
||||
OIDCClaim.objects.create(client=client, name='email', value='django_user_email', scopes='email')
|
||||
OIDCClaim.objects.create(
|
||||
client=client, name='email_verified', value='django_user_email_verified', scopes='email'
|
||||
)
|
||||
|
||||
new_apps = migration.apply(migrate_to)
|
||||
OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
|
||||
|
||||
client = OIDCClient.objects.first()
|
||||
for client in OIDCClient.objects.all():
|
||||
claims = client.oidcclaim_set.all()
|
||||
if client.name == 'test':
|
||||
assert claims.count() == 5
|
||||
assert sorted(claims.values_list('name', flat=True)) == [
|
||||
'email',
|
||||
'email_verified',
|
||||
'family_name',
|
||||
'given_name',
|
||||
'preferred_username',
|
||||
]
|
||||
assert sorted(claims.values_list('value', flat=True)) == [
|
||||
'django_user_email',
|
||||
'django_user_email_verified',
|
||||
'django_user_first_name',
|
||||
'django_user_identifier',
|
||||
'django_user_last_name',
|
||||
]
|
||||
elif client.name == 'test2':
|
||||
assert claims.count() == 3
|
||||
assert sorted(claims.values_list('name', flat=True)) == [
|
||||
'family_name',
|
||||
'given_name',
|
||||
'preferred_username',
|
||||
]
|
||||
assert sorted(claims.values_list('value', flat=True)) == [
|
||||
'django_user_first_name',
|
||||
'django_user_last_name',
|
||||
'django_user_username',
|
||||
]
|
||||
elif client.name == 'test3':
|
||||
assert claims.count() == 5
|
||||
assert sorted(claims.values_list('name', flat=True)) == [
|
||||
'email',
|
||||
'email_verified',
|
||||
'family_name',
|
||||
'given_name',
|
||||
'preferred_username',
|
||||
]
|
||||
assert sorted(claims.values_list('value', flat=True)) == [
|
||||
'django_user_email',
|
||||
'django_user_email_verified',
|
||||
'django_user_first_name',
|
||||
'django_user_full_name',
|
||||
'django_user_last_name',
|
||||
]
|
||||
else:
|
||||
assert claims.count() == 0
|
||||
|
||||
|
||||
def test_api_synchronization(app, oidc_client):
|
||||
oidc_client.has_api_access = True
|
||||
oidc_client.save()
|
||||
users = [User.objects.create(username='user-%s' % i) for i in range(10)]
|
||||
for user in users[5:]:
|
||||
user.delete()
|
||||
deleted_subs = set(make_sub(oidc_client, user) for user in users[5:])
|
||||
|
||||
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
|
||||
status = 200
|
||||
if oidc_client.identifier_policy not in (OIDCClient.POLICY_PAIRWISE_REVERSIBLE, OIDCClient.POLICY_UUID):
|
||||
status = 401
|
||||
response = app.post_json(
|
||||
'/api/users/synchronization/',
|
||||
params={'known_uuids': [make_sub(oidc_client, user) for user in users]},
|
||||
status=status,
|
||||
)
|
||||
if status == 200:
|
||||
assert response.json['result'] == 1
|
||||
assert set(response.json['unknown_uuids']) == deleted_subs
|
||||
|
||||
|
||||
def test_claim_default_value(oidc_settings, normal_oidc_client, simple_user, app):
|
||||
oidc_settings.A2_IDP_OIDC_SCOPES = ['openid', 'profile', 'email', 'phone']
|
||||
Attribute.objects.create(
|
||||
|
@ -1922,129 +1638,3 @@ def test_oidc_good_next_url_hook(app, oidc_client):
|
|||
rf = RequestFactory()
|
||||
request = rf.get('/')
|
||||
assert good_next_url(request, 'https://example.com/')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def access_token(client, simple_user):
|
||||
return OIDCAccessToken.objects.create(
|
||||
client=client,
|
||||
user=simple_user,
|
||||
scopes='openid profile email',
|
||||
expired=now() + datetime.timedelta(seconds=3600),
|
||||
)
|
||||
|
||||
|
||||
def test_user_info(app, client, access_token, freezer):
|
||||
def get_user_info(**kwargs):
|
||||
return app.get(
|
||||
'/idp/oidc/user_info/', headers=bearer_authentication_headers(access_token.uuid), **kwargs
|
||||
)
|
||||
|
||||
response = app.get('/idp/oidc/user_info/', status=401)
|
||||
assert (
|
||||
response['WWW-Authenticate']
|
||||
== 'Bearer error="invalid_request", error_description="Bearer authentication is mandatory"'
|
||||
)
|
||||
|
||||
response = app.get('/idp/oidc/user_info/', headers={'Authorization': 'Bearer'}, status=401)
|
||||
assert (
|
||||
response['WWW-Authenticate']
|
||||
== 'Bearer error="invalid_request", error_description="Invalid Bearer authentication"'
|
||||
)
|
||||
|
||||
response = get_user_info(status=200)
|
||||
assert dict(response.json, sub='') == {
|
||||
'email': 'user@example.net',
|
||||
'email_verified': False,
|
||||
'family_name': 'Dôe',
|
||||
'family_name_verified': True,
|
||||
'given_name': 'Jôhn',
|
||||
'given_name_verified': True,
|
||||
'preferred_username': 'user',
|
||||
'sub': '',
|
||||
}
|
||||
|
||||
# token is expired
|
||||
access_token.expired = now() - datetime.timedelta(seconds=1)
|
||||
access_token.save()
|
||||
response = get_user_info(status=401)
|
||||
assert (
|
||||
response['WWW-Authenticate']
|
||||
== 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
|
||||
)
|
||||
|
||||
# token is unknown
|
||||
access_token.delete()
|
||||
response = get_user_info(status=401)
|
||||
assert response['WWW-Authenticate'] == 'Bearer error="invalid_token", error_description="Token unknown"'
|
||||
|
||||
utils.login(app, access_token.user)
|
||||
access_token.expired = now() + datetime.timedelta(seconds=1)
|
||||
access_token.session_key = app.session.session_key
|
||||
access_token.save()
|
||||
|
||||
get_user_info(status=200)
|
||||
|
||||
app.session.flush()
|
||||
response = get_user_info(status=401)
|
||||
assert (
|
||||
response['WWW-Authenticate']
|
||||
== 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def session(settings, db, simple_user):
|
||||
engine = import_module(settings.SESSION_ENGINE)
|
||||
session = engine.SessionStore()
|
||||
session['_auth_user_id'] = str(simple_user.id)
|
||||
session.create()
|
||||
return session
|
||||
|
||||
|
||||
def test_access_token_is_valid_session(simple_oidc_client, simple_user, session):
|
||||
token = OIDCAccessToken.objects.create(
|
||||
client=simple_oidc_client, user=simple_user, scopes='openid', session_key=session.session_key
|
||||
)
|
||||
|
||||
assert token.is_valid()
|
||||
session.flush()
|
||||
assert not token.is_valid()
|
||||
|
||||
|
||||
def test_access_token_is_valid_expired(simple_oidc_client, simple_user, freezer):
|
||||
start = now()
|
||||
expired = start + datetime.timedelta(seconds=30)
|
||||
|
||||
token = OIDCAccessToken.objects.create(
|
||||
client=simple_oidc_client, user=simple_user, scopes='openid', expired=expired
|
||||
)
|
||||
|
||||
assert token.is_valid()
|
||||
freezer.move_to(expired)
|
||||
assert token.is_valid()
|
||||
freezer.move_to(expired + datetime.timedelta(seconds=1))
|
||||
assert not token.is_valid()
|
||||
|
||||
|
||||
def test_access_token_is_valid_session_and_expired(simple_oidc_client, simple_user, session, freezer):
|
||||
start = now()
|
||||
expired = start + datetime.timedelta(seconds=30)
|
||||
|
||||
token = OIDCAccessToken.objects.create(
|
||||
client=simple_oidc_client,
|
||||
user=simple_user,
|
||||
scopes='openid',
|
||||
session_key=session.session_key,
|
||||
expired=expired,
|
||||
)
|
||||
|
||||
assert token.is_valid()
|
||||
freezer.move_to(expired)
|
||||
assert token.is_valid()
|
||||
freezer.move_to(expired + datetime.timedelta(seconds=1))
|
||||
assert not token.is_valid()
|
||||
freezer.move_to(start)
|
||||
assert token.is_valid()
|
||||
session.flush()
|
||||
assert not token.is_valid()
|
|
@ -0,0 +1,115 @@
|
|||
# authentic2 - versatile identity manager
|
||||
# Copyright (C) 2010-2021 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import datetime
|
||||
|
||||
from django.utils.timezone import now
|
||||
|
||||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
from authentic2_idp_oidc.models import OIDCAccessToken, OIDCAuthorization, OIDCClient, OIDCCode
|
||||
|
||||
|
||||
def test_expired_manager(db, simple_user):
|
||||
expired = now() - datetime.timedelta(seconds=1)
|
||||
not_expired = now() + datetime.timedelta(days=1)
|
||||
client = OIDCClient.objects.create(
|
||||
name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
|
||||
)
|
||||
OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=expired)
|
||||
OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=not_expired)
|
||||
assert OIDCAuthorization.objects.count() == 2
|
||||
OIDCAuthorization.objects.cleanup()
|
||||
assert OIDCAuthorization.objects.count() == 1
|
||||
|
||||
OIDCCode.objects.create(
|
||||
client=client,
|
||||
user=simple_user,
|
||||
scopes='openid',
|
||||
redirect_uri='https://example.com/',
|
||||
session_key='xxx',
|
||||
auth_time=now(),
|
||||
expired=expired,
|
||||
)
|
||||
OIDCCode.objects.create(
|
||||
client=client,
|
||||
user=simple_user,
|
||||
scopes='openid',
|
||||
redirect_uri='https://example.com/',
|
||||
session_key='xxx',
|
||||
auth_time=now(),
|
||||
expired=not_expired,
|
||||
)
|
||||
assert OIDCCode.objects.count() == 2
|
||||
OIDCCode.objects.cleanup()
|
||||
assert OIDCCode.objects.count() == 1
|
||||
|
||||
OIDCAccessToken.objects.create(
|
||||
client=client, user=simple_user, scopes='openid', session_key='xxx', expired=expired
|
||||
)
|
||||
OIDCAccessToken.objects.create(
|
||||
client=client, user=simple_user, scopes='openid', session_key='xxx', expired=not_expired
|
||||
)
|
||||
assert OIDCAccessToken.objects.count() == 2
|
||||
OIDCAccessToken.objects.cleanup()
|
||||
assert OIDCAccessToken.objects.count() == 1
|
||||
|
||||
|
||||
def test_access_token_is_valid_session(simple_oidc_client, simple_user, session):
|
||||
token = OIDCAccessToken.objects.create(
|
||||
client=simple_oidc_client, user=simple_user, scopes='openid', session_key=session.session_key
|
||||
)
|
||||
|
||||
assert token.is_valid()
|
||||
session.flush()
|
||||
assert not token.is_valid()
|
||||
|
||||
|
||||
def test_access_token_is_valid_expired(simple_oidc_client, simple_user, freezer):
|
||||
start = now()
|
||||
expired = start + datetime.timedelta(seconds=30)
|
||||
|
||||
token = OIDCAccessToken.objects.create(
|
||||
client=simple_oidc_client, user=simple_user, scopes='openid', expired=expired
|
||||
)
|
||||
|
||||
assert token.is_valid()
|
||||
freezer.move_to(expired)
|
||||
assert token.is_valid()
|
||||
freezer.move_to(expired + datetime.timedelta(seconds=1))
|
||||
assert not token.is_valid()
|
||||
|
||||
|
||||
def test_access_token_is_valid_session_and_expired(simple_oidc_client, simple_user, session, freezer):
|
||||
start = now()
|
||||
expired = start + datetime.timedelta(seconds=30)
|
||||
|
||||
token = OIDCAccessToken.objects.create(
|
||||
client=simple_oidc_client,
|
||||
user=simple_user,
|
||||
scopes='openid',
|
||||
session_key=session.session_key,
|
||||
expired=expired,
|
||||
)
|
||||
|
||||
assert token.is_valid()
|
||||
freezer.move_to(expired)
|
||||
assert token.is_valid()
|
||||
freezer.move_to(expired + datetime.timedelta(seconds=1))
|
||||
assert not token.is_valid()
|
||||
freezer.move_to(start)
|
||||
assert token.is_valid()
|
||||
session.flush()
|
||||
assert not token.is_valid()
|
|
@ -0,0 +1,90 @@
|
|||
# authentic2 - versatile identity manager
|
||||
# Copyright (C) 2010-2021 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import datetime
|
||||
|
||||
from django.utils.timezone import now
|
||||
|
||||
from authentic2_idp_oidc.models import OIDCAccessToken
|
||||
|
||||
from .. import utils
|
||||
from .conftest import bearer_authentication_headers
|
||||
|
||||
|
||||
def test_user_info(app, client, freezer, simple_user):
|
||||
access_token = OIDCAccessToken.objects.create(
|
||||
client=client,
|
||||
user=simple_user,
|
||||
scopes='openid profile email',
|
||||
expired=now() + datetime.timedelta(seconds=3600),
|
||||
)
|
||||
|
||||
def get_user_info(**kwargs):
|
||||
return app.get(
|
||||
'/idp/oidc/user_info/', headers=bearer_authentication_headers(access_token.uuid), **kwargs
|
||||
)
|
||||
|
||||
response = app.get('/idp/oidc/user_info/', status=401)
|
||||
assert (
|
||||
response['WWW-Authenticate']
|
||||
== 'Bearer error="invalid_request", error_description="Bearer authentication is mandatory"'
|
||||
)
|
||||
|
||||
response = app.get('/idp/oidc/user_info/', headers={'Authorization': 'Bearer'}, status=401)
|
||||
assert (
|
||||
response['WWW-Authenticate']
|
||||
== 'Bearer error="invalid_request", error_description="Invalid Bearer authentication"'
|
||||
)
|
||||
|
||||
response = get_user_info(status=200)
|
||||
assert dict(response.json, sub='') == {
|
||||
'email': 'user@example.net',
|
||||
'email_verified': False,
|
||||
'family_name': 'Dôe',
|
||||
'family_name_verified': True,
|
||||
'given_name': 'Jôhn',
|
||||
'given_name_verified': True,
|
||||
'preferred_username': 'user',
|
||||
'sub': '',
|
||||
}
|
||||
|
||||
# token is expired
|
||||
access_token.expired = now() - datetime.timedelta(seconds=1)
|
||||
access_token.save()
|
||||
response = get_user_info(status=401)
|
||||
assert (
|
||||
response['WWW-Authenticate']
|
||||
== 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
|
||||
)
|
||||
|
||||
# token is unknown
|
||||
access_token.delete()
|
||||
response = get_user_info(status=401)
|
||||
assert response['WWW-Authenticate'] == 'Bearer error="invalid_token", error_description="Token unknown"'
|
||||
|
||||
utils.login(app, access_token.user)
|
||||
access_token.expired = now() + datetime.timedelta(seconds=1)
|
||||
access_token.session_key = app.session.session_key
|
||||
access_token.save()
|
||||
|
||||
get_user_info(status=200)
|
||||
|
||||
app.session.flush()
|
||||
response = get_user_info(status=401)
|
||||
assert (
|
||||
response['WWW-Authenticate']
|
||||
== 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
|
||||
)
|
Loading…
Reference in New Issue