authentic/tests/test_idp_oidc.py

2049 lines
75 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import functools
import json
import urllib.parse
from importlib import import_module
import pytest
from django.contrib.auth import get_user_model
from django.core.exceptions import ValidationError
from django.core.files import File
from django.http import QueryDict
from django.test.utils import override_settings
from django.urls import reverse
from django.utils.encoding import force_text
from django.utils.timezone import now
from jwcrypto.jwk import JWK, JWKSet
from jwcrypto.jwt import JWT
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.models import Attribute, AuthorizedRole
from authentic2.utils import good_next_url, make_url
from authentic2_auth_oidc.utils import parse_timestamp
from authentic2_idp_oidc import app_settings
from authentic2_idp_oidc.models import OIDCAccessToken, OIDCAuthorization, OIDCClaim, OIDCClient, OIDCCode
from authentic2_idp_oidc.utils import base64url, get_first_ec_sig_key, get_first_rsa_sig_key, make_sub
from django_rbac.utils import get_ou_model, get_role_model
from . import utils
User = get_user_model()
pytestmark = pytest.mark.django_db
JWKSET = {
"keys": [
{
"qi": "h_zifVD-ChelxZUVxhICNcgGkQz26b-EdIlLY9rN7SX_aD3sLI_JHEHV4Bz3kV5eW8O4qJ8SHhfUdHGK-gRH7FVOGoXnXACf47QoXowHzsPLL64wCuZENTl7hIRGLY-BInULkfTQfuiVSMoxPjsVNTMBzMiz0bNjMQyMyvW5xH4",
"kty": "RSA",
"d": "pUcL4-LDBy3rqJWip269h5Hd6nLvqjXltfkVe_mL-LwZPHmCrUaj_SX54SnCY3Wyf7kxhoMYUac62lQ71923uJPFFdiavAujbNrtZPq32i4C-1apWXW8OGJr8VoVDqalxj9SAq1G54wbbsaAPrZdyuqy-esNxDqDigfbM-cWgngBBYo5CSsfnmnd05N2cUS26L7QzWbNHwilnBTE9e_J7rK3xUCDKrobv6_LiI-AhMmBHJSrCxjexh0wzfBi_Ntj9BGCcPThDjG8SQvaV-aLNdLfIy2XO3i076RLBB6Hm_yHuAparrwp-pPE48eQdiYjrSAFalz4ojWQ3_ByLA6uAQ",
"q": "2FvfeWnIlWNUipan7DIBlJrmz5EinJNxrQ-BNwPHrAoIM8qvyC7jPy09YxZs5Y9CMMZSal6C4Nm2LHBFxHU9z1qd5XDzbk19G-y1lDqZizVXr876TpiAjuq03rcoMQm8dQru_pVjUdgxR64vKyJ9CaFMAqcpZeEMIqAvzhQG8uE",
"dp": "Kg4HPGpzenhK2ser6nfM1Yt-pkqBbWQotvqsxGptECXpbN7vweupvL5kJPeRrbsXKp9QE7DXTN1sG9puJxMSwtgiv4hr9Va9e9WOC6PMd2VY7tgw5uKMpPLMc5y82PusRhBoRh0SUUsjyQxK9PGtWYnGZXbAoaIYPdMyDlosfqU",
"dq": "QuUNEHYTjZTbo8n2-4FumarXKGBAalbwM8jyc7cYemnTpWfKt8M_gd4T99oMK2IC3h_DhZ3ZK3pE6DKCb76sMLtczH8C1RziTMsATWdc5_zDMtl07O4b-ZQ5_g51P8w515pc0JwRzFFi0z3Y2aZdMKgNX1id5SES5nXOshHhICE",
"n": "0lN6CiJGFD8BSPV_azLoEl6Nq-WlHkU743D5rqvzw1sOaxstMGxAhVk2YIhWwfvapV6XjO_yvc4778VBTELOdjRw6BGUdBJepdwkL__TPyjEVhqMQj9MKhEU4GUy9w0Lsilb5D01kfrOKpmdcYw4jhcDvb0H4-LZgh1Vk84vF4WaQCUg_AX4drVDQOjoU8kuWIM8gz9w6zEsbIw-gtMRpFwS8ncA0zDX5VfyC77iMxzFftDIP2gM5GvdevMzvP9IRkRRBhP9vV4JchBFPHSA9OPJcnySjJJNW6aAJn6P6JasN1z68khjufM09J8UzmLAZYOq7gUG95Ox1KsV-g337Q",
"e": "AQAB",
"p": "-Nyj_Sw3f2HUqSssCZv84y7b3blOtGGAhfYN_JtGfcTQv2bOtxrIUzeonCi-Z_1W4hO10tqxJcOB0ibtDqkDlLhnLaIYOBfriITRFK83EJG5sC-0KTmFzUXFTA2aMc1QgP-Fu6gUfQpPqLgWxhx8EFhkBlBZshKU5-C-385Sco0",
},
{
"kty": "EC",
"d": "wwULaR9UYWZW6U2oEbkz3sO1lhPSj6DyA6e7PiUfhog",
"use": "sig",
"crv": "P-256",
"x": "HZMHZkX-63heqA5pvWn-UR7bgcXZNEcQa5wfvG_BzTw",
"y": "SUCuwjjiyKvGq5Odr0sjDqjha_CBqks0JQFrR7Ei5OQ",
"alg": "ES256",
},
]
}
@pytest.fixture
def oidc_settings(settings):
settings.A2_IDP_OIDC_JWKSET = JWKSET
settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m'
return settings
def test_get_jwkset(oidc_settings):
from authentic2_idp_oidc.utils import get_jwkset
get_jwkset()
OIDC_CLIENT_PARAMS = [
{
'authorization_flow': OIDCClient.FLOW_IMPLICIT,
},
{
'post_logout_redirect_uris': 'https://example.com/',
},
{
'identifier_policy': OIDCClient.POLICY_UUID,
'post_logout_redirect_uris': 'https://example.com/',
},
{
'identifier_policy': OIDCClient.POLICY_EMAIL,
},
{
'idtoken_algo': OIDCClient.ALGO_HMAC,
},
{
'idtoken_algo': OIDCClient.ALGO_EC,
},
{
'authorization_mode': OIDCClient.AUTHORIZATION_MODE_NONE,
},
{
'idtoken_duration': datetime.timedelta(hours=1),
},
{
'authorization_flow': OIDCClient.FLOW_IMPLICIT,
'idtoken_duration': datetime.timedelta(hours=1),
'post_logout_redirect_uris': 'https://example.com/',
},
{
'frontchannel_logout_uri': 'https://example.com/southpark/logout/',
},
{
'frontchannel_logout_uri': 'https://example.com/southpark/logout/',
'frontchannel_timeout': 3000,
},
{
'identifier_policy': OIDCClient.POLICY_PAIRWISE_REVERSIBLE,
},
]
@pytest.mark.parametrize('other_attributes', OIDC_CLIENT_PARAMS)
def test_admin(other_attributes, app, superuser, oidc_settings):
Attribute.objects.create(
name='cityscape_image',
label='cityscape',
kind='profile_image',
asked_on_registration=True,
required=False,
user_visible=True,
user_editable=True,
)
url = reverse('admin:authentic2_idp_oidc_oidcclient_add')
assert OIDCClient.objects.count() == 0
response = utils.login(app, superuser, path=url)
response.form.set('name', 'oidcclient')
response.form.set('slug', 'oidcclient')
response.form.set('ou', get_default_ou().pk)
response.form.set('unauthorized_url', 'https://example.com/southpark/')
response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9')
for key, value in other_attributes.items():
response.form.set(key, value)
response = response.form.submit().follow()
assert OIDCClient.objects.count() == 1
def make_client(app, superuser, params=None):
Attribute.objects.create(
name='cityscape_image',
label='cityscape',
kind='profile_image',
asked_on_registration=True,
required=False,
user_visible=True,
user_editable=True,
)
client = OIDCClient(
name='oidcclient',
slug='oidcclient',
ou=get_default_ou(),
unauthorized_url='https://example.com/southpark/',
redirect_uris='https://example.com/callbac%C3%A9',
)
for key, value in (params or {}).items():
setattr(client, key, value)
client.save()
for mapping in app_settings.DEFAULT_MAPPINGS:
OIDCClaim.objects.create(
client=client, name=mapping['name'], value=mapping['value'], scopes=mapping['scopes']
)
return client
@pytest.fixture
def client(app, superuser):
return make_client(app, superuser, {})
@pytest.fixture
def oidc_client(request, superuser, app, simple_user, oidc_settings):
return make_client(app, superuser, getattr(request, 'param', None) or {})
@pytest.fixture
def normal_oidc_client(superuser, app, simple_user):
url = reverse('admin:authentic2_idp_oidc_oidcclient_add')
assert OIDCClient.objects.count() == 0
response = utils.login(app, superuser, path=url)
response.form.set('name', 'oidcclient')
response.form.set('slug', 'oidcclient')
response.form.set('ou', get_default_ou().pk)
response.form.set('unauthorized_url', 'https://example.com/southpark/')
response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9')
response = response.form.submit(name='_save').follow()
assert OIDCClient.objects.count() == 1
client = OIDCClient.objects.get()
utils.logout(app)
return client
def client_authentication_headers(oidc_client):
client_creds = '%s:%s' % (oidc_client.client_id, oidc_client.client_secret)
token = base64.b64encode(client_creds.encode('ascii'))
return {'Authorization': 'Basic %s' % str(token.decode('ascii'))}
def bearer_authentication_headers(access_token):
return {'Authorization': 'Bearer %s' % str(access_token)}
@pytest.mark.parametrize('oidc_client', OIDC_CLIENT_PARAMS, indirect=True)
@pytest.mark.parametrize('do_not_ask_again', [(True,), (False,)])
@pytest.mark.parametrize('login_first', [(True,), (False,)])
def test_authorization_code_sso(
login_first, do_not_ask_again, oidc_client, oidc_settings, simple_user, app, caplog
):
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'login_hint': 'backoffice john@example.com',
}
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
params['response_type'] = 'code'
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
params['response_type'] = 'token id_token'
authorize_url = make_url('oidc-authorize', params=params)
if login_first:
utils.login(app, simple_user)
response = app.get(authorize_url)
if not login_first:
assert set(app.session['login-hint']) == set(['backoffice', 'john@example.com'])
response = response.follow()
assert response.request.path == reverse('auth_login')
response.form.set('username', simple_user.username)
response.form.set('password', simple_user.username)
response = response.form.submit(name='login-password-submit')
response = response.follow()
assert response.request.path == reverse('oidc-authorize')
if oidc_client.authorization_mode != OIDCClient.AUTHORIZATION_MODE_NONE:
response = response.maybe_follow()
assert 'a2-oidc-authorization-form' in response.text
assert OIDCAuthorization.objects.count() == 0
assert OIDCCode.objects.count() == 0
assert OIDCAccessToken.objects.count() == 0
response.form['do_not_ask_again'] = do_not_ask_again
response = response.form.submit('accept')
if do_not_ask_again:
assert OIDCAuthorization.objects.count() == 1
authz = OIDCAuthorization.objects.get()
assert authz.client == oidc_client
assert authz.user == simple_user
assert authz.scope_set() == set('openid profile email'.split())
assert authz.expired >= now()
else:
assert OIDCAuthorization.objects.count() == 0
utils.assert_event(
'user.service.sso.authorization',
session=app.session,
user=simple_user,
service=oidc_client,
scopes=['email', 'openid', 'profile'],
)
utils.assert_event(
'user.service.sso',
session=app.session,
user=simple_user,
service=oidc_client,
how='password-on-https',
)
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert OIDCCode.objects.count() == 1
code = OIDCCode.objects.get()
assert code.client == oidc_client
assert code.user == simple_user
assert code.scope_set() == set('openid profile email'.split())
assert code.state == 'xxx'
assert code.nonce == 'yyy'
assert code.redirect_uri == redirect_uri
assert code.session_key == app.session.session_key
assert code.auth_time <= now()
assert code.expired >= now()
assert response['Location'].startswith(redirect_uri)
location = urllib.parse.urlparse(response['Location'])
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
query = urllib.parse.parse_qs(location.query)
assert set(query.keys()) == set(['code', 'state'])
assert query['code'] == [code.uuid]
code = query['code'][0]
assert query['state'] == ['xxx']
token_url = make_url('oidc-token')
response = app.post(
token_url,
params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
},
headers=client_authentication_headers(oidc_client),
)
assert 'error' not in response.json
assert 'access_token' in response.json
assert 'expires_in' in response.json
assert 'id_token' in response.json
assert response.json['token_type'] == 'Bearer'
access_token = response.json['access_token']
id_token = response.json['id_token']
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
assert location.fragment
query = urllib.parse.parse_qs(location.fragment)
assert OIDCAccessToken.objects.count() == 1
access_token = OIDCAccessToken.objects.get()
assert set(query.keys()) == set(['access_token', 'token_type', 'expires_in', 'id_token', 'state'])
assert query['access_token'] == [access_token.uuid]
assert query['token_type'] == ['Bearer']
assert query['state'] == ['xxx']
access_token = query['access_token'][0]
id_token = query['id_token'][0]
if oidc_client.idtoken_algo in (oidc_client.ALGO_RSA, oidc_client.ALGO_EC):
key = JWKSet.from_json(app.get(reverse('oidc-certs')).content)
algs = ['RS256', 'ES256']
elif oidc_client.idtoken_algo == oidc_client.ALGO_HMAC:
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
key = JWK(kty='oct', k=force_text(k))
algs = ['HS256']
else:
raise NotImplementedError
jwt = JWT(jwt=id_token, key=key, algs=algs)
claims = json.loads(jwt.claims)
assert set(claims) >= set(['iss', 'sub', 'aud', 'exp', 'iat', 'nonce', 'auth_time', 'acr'])
assert claims['nonce'] == 'yyy'
assert response.request.url.startswith(claims['iss'])
assert claims['aud'] == oidc_client.client_id
assert parse_timestamp(claims['iat']) <= now()
assert parse_timestamp(claims['auth_time']) <= now()
exp_delta = (parse_timestamp(claims['exp']) - now()).total_seconds()
assert exp_delta > 0
if oidc_client.idtoken_duration:
assert abs(exp_delta - oidc_client.idtoken_duration.total_seconds()) < 2
else:
assert abs(exp_delta - 30) < 2
if login_first:
assert claims['acr'] == '0'
else:
assert claims['acr'] == '1'
assert claims['sub'] == make_sub(oidc_client, simple_user)
assert claims['preferred_username'] == simple_user.username
assert claims['given_name'] == simple_user.first_name
assert claims['family_name'] == simple_user.last_name
assert claims['email'] == simple_user.email
assert claims['email_verified'] is False
user_info_url = make_url('oidc-user-info')
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
assert response.json['sub'] == make_sub(oidc_client, simple_user)
assert response.json['preferred_username'] == simple_user.username
assert response.json['given_name'] == simple_user.first_name
assert response.json['family_name'] == simple_user.last_name
assert response.json['email'] == simple_user.email
assert response.json['email_verified'] is False
# when adding extra attributes
OIDCClaim.objects.create(client=oidc_client, name='ou', value='django_user_ou_name', scopes='profile')
OIDCClaim.objects.create(client=oidc_client, name='roles', value='a2_role_names', scopes='profile, role')
OIDCClaim.objects.create(
client=oidc_client, name='cityscape_image', value='django_user_cityscape_image', scopes='profile'
)
simple_user.roles.add(
get_role_model().objects.create(name='Whatever', slug='whatever', ou=get_default_ou())
)
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
assert response.json['ou'] == simple_user.ou.name
assert response.json['roles'][0] == 'Whatever'
assert response.json.get('cityscape_image') is None
with open('tests/200x200.jpg', 'rb') as fd:
simple_user.attributes.cityscape_image = File(fd)
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
assert response.json['cityscape_image'].startswith('http://testserver/media/profile-image/')
# check against a user without username
simple_user.username = None
simple_user.save()
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
assert response.json['preferred_username'] == ''
# Now logout
if oidc_client.post_logout_redirect_uris:
params = {
'post_logout_redirect_uri': oidc_client.post_logout_redirect_uris,
'state': 'xyz',
}
logout_url = make_url('oidc-logout', params=params)
response = app.get(logout_url)
assert response.status_code == 302
assert response.location == 'https://example.com/?state=xyz'
assert '_auth_user_id' not in app.session
else:
response = app.get(make_url('account_management'))
response = response.click('Logout')
if oidc_client.frontchannel_logout_uri:
iframes = response.pyquery('iframe[src="https://example.com/southpark/logout/"]')
assert iframes
if oidc_client.frontchannel_timeout:
assert iframes.attr('onload').endswith(', %d)' % oidc_client.frontchannel_timeout)
else:
assert iframes.attr('onload').endswith(', 300)')
def check_authorize_error(
response, error, error_description, fragment, caplog, check_next=True, redirect_uri=None, message=True
):
# check next_url qs
if message:
location = urllib.parse.urlparse(response.location)
assert location.path == '/continue/'
if check_next:
location_qs = QueryDict(location.query or '')
assert 'next' in location_qs
assert location_qs['next'].startswith(redirect_uri)
next_url = urllib.parse.urlparse(location_qs['next'])
next_url_qs = QueryDict(next_url.fragment if fragment else next_url.query)
assert next_url_qs['error'] == error
assert next_url_qs['error_description'] == error_description
# check continue page
continue_response = response.follow()
assert error_description in continue_response.pyquery('.error').text()
elif check_next:
assert response.location.startswith(redirect_uri)
location = urllib.parse.urlparse(response.location)
location_qs = QueryDict(location.fragment if fragment else location.query)
assert location_qs['error'] == error
assert location_qs['error_description'] == error_description
# check logs
last_record = caplog.records[-1]
if message:
assert last_record.levelname == 'WARNING'
else:
assert last_record.levelname == 'INFO'
assert 'error "%s" in authorize endpoint' % error in last_record.message
assert error_description in last_record.message
if message:
return continue_response
def assert_authorization_response(response, fragment=False, **kwargs):
location = urllib.parse.urlparse(response.location)
location_qs = QueryDict(location.fragment if fragment else location.query)
assert set(location_qs) == set(kwargs)
for key, value in kwargs.items():
if value is None:
assert key in location_qs
elif isinstance(value, list):
assert set(location_qs.getlist(key)) == set(value)
else:
assert value in location_qs[key]
@pytest.mark.parametrize('oidc_client', OIDC_CLIENT_PARAMS, indirect=True)
def test_invalid_request(oidc_client, caplog, oidc_settings, simple_user, app):
redirect_uri = oidc_client.redirect_uris.split()[0]
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
fragment = False
response_type = 'code'
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
fragment = True
response_type = 'id_token token'
else:
raise NotImplementedError
assert_authorize_error = functools.partial(
check_authorize_error, caplog=caplog, fragment=fragment, redirect_uri=redirect_uri
)
# missing client_id
response = app.get(make_url('oidc-authorize', params={}))
assert_authorize_error(response, 'invalid_request', 'Missing parameter "client_id"', check_next=False)
# missing redirect_uri
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
},
)
)
assert_authorize_error(response, 'invalid_request', 'Missing parameter "redirect_uri"', check_next=False)
# invalid client_id
authorize_url = app.get(
make_url(
'oidc-authorize',
params={
'client_id': 'xxx',
'redirect_uri': redirect_uri,
},
)
)
assert_authorize_error(response, 'invalid_request', 'Unknown client identifier: "xxx"', check_next=False)
# invalid redirect_uri
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': 'xxx',
'response_type': 'code',
'scope': 'openid',
},
),
status=302,
)
continue_response = assert_authorize_error(
response, 'invalid_request', 'Redirect URI "xxx" is unknown.', check_next=False
)
assert 'Known' not in continue_response.pyquery('.error').text()
# invalid redirect_uri with DEBUG=True, list of redirect_uris is shown
with override_settings(DEBUG=True):
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': 'xxx',
'response_type': 'code',
'scope': 'openid',
},
),
status=302,
)
continue_response = assert_authorize_error(
response, 'invalid_request', 'Redirect URI "xxx" is unknown.', check_next=False
)
assert (
'Known redirect URIs are: https://example.com/callbac%C3%A9'
in continue_response.pyquery('.error').text()
)
# missing response_type
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
},
)
)
assert_authorize_error(response, 'invalid_request', 'Missing parameter "response_type"')
# unsupported response_type
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': 'xxx',
},
)
)
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert_authorize_error(response, 'unsupported_response_type', 'Response type must be "code"')
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
assert_authorize_error(
response, 'unsupported_response_type', 'Response type must be "id_token token" or "id_token"'
)
# missing scope
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
},
)
)
assert_authorize_error(response, 'invalid_request', 'Missing parameter "scope"')
# invalid max_age : not an integer
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid',
'max_age': 'xxx',
},
)
)
assert_authorize_error(response, 'invalid_request', 'Parameter "max_age" must be a positive integer')
# invalid max_age : not positive
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid',
'max_age': '-1',
},
)
)
assert_authorize_error(response, 'invalid_request', 'Parameter "max_age" must be a positive integer')
# openid scope is missing
authorize_url = make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'profile',
},
)
response = app.get(authorize_url)
assert_authorize_error(response, 'invalid_scope', 'Scope must contain "openid", received "profile"')
# use of an unknown scope
authorize_url = make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile zob',
},
)
response = app.get(authorize_url)
assert_authorize_error(
response,
'invalid_scope',
'Scope may contain "email, openid, profile" scope(s), received "email, openid, profile, zob"',
)
# restriction on scopes
with override_settings(A2_IDP_OIDC_SCOPES=['openid']):
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email',
},
)
)
assert_authorize_error(
response, 'invalid_scope', 'Scope may contain "openid" scope(s), received "email, openid"'
)
# cancel
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'cancel': '1',
},
)
)
assert_authorize_error(response, 'access_denied', 'Authentication cancelled by user', message=False)
# prompt=none
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'none',
},
)
)
assert_authorize_error(
response,
'login_required',
error_description='Login is required but prompt parameter is "none"',
message=False,
)
utils.login(app, simple_user)
# prompt=none max_age=0
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'max_age': '0',
'prompt': 'none',
},
)
)
assert_authorize_error(
response,
'login_required',
error_description='Login is required because of max_age, but prompt parameter is "none"',
message=False,
)
# max_age=0
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'max_age': '0',
},
)
)
assert response.location.startswith(reverse('auth_login') + '?')
# prompt=login
authorize_url = make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'login',
},
)
response = app.get(authorize_url)
assert urllib.parse.urlparse(response['Location']).path == reverse('auth_login')
if oidc_client.authorization_mode != oidc_client.AUTHORIZATION_MODE_NONE:
# prompt is none, but consent is required
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'none',
},
)
)
assert_authorize_error(
response, 'consent_required', 'Consent is required but prompt parameter is "none"', message=False
)
# user do not consent
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
},
)
)
response = response.form.submit('refuse')
assert_authorize_error(response, 'access_denied', 'User did not consent', message=False)
# authorization exists
authorize = OIDCAuthorization.objects.create(
client=oidc_client,
user=simple_user,
scopes='openid profile email',
expired=now() + datetime.timedelta(days=2),
)
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
},
)
)
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert_authorization_response(response, code=None)
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
assert_authorization_response(
response, access_token=None, id_token=None, expires_in=None, token_type=None, fragment=True
)
# client ask for explicit authorization
authorize_url = make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'consent',
},
)
response = app.get(authorize_url)
assert 'a2-oidc-authorization-form' in response.text
# check all authorization have been deleted, it's our policy
assert OIDCAuthorization.objects.count() == 0
if oidc_client.authorization_mode == oidc_client.AUTHORIZATION_MODE_NONE:
# authorization mode is none, but explicit consent is asked, we validate it
response = response.form.submit('accept')
# authorization has expired
OIDCCode.objects.all().delete()
authorize.expired = now() - datetime.timedelta(days=2)
authorize.save()
response = app.get(authorize_url)
assert 'a2-oidc-authorization-form' in response.text
authorize.expired = now() + datetime.timedelta(days=2)
authorize.scopes = 'openid profile'
authorize.save()
assert OIDCAuthorization.objects.count() == 1
response.form['do_not_ask_again'] = True
response = response.form.submit('accept')
assert OIDCAuthorization.objects.count() == 1
# old authorizations have been deleted
assert OIDCAuthorization.objects.get().pk != authorize.pk
# check expired codes
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert OIDCCode.objects.count() == 1
code = OIDCCode.objects.get()
assert code.is_valid()
# make code expire
code.expired = now() - datetime.timedelta(seconds=120)
assert not code.is_valid()
code.save()
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
assert set(query.keys()) == set(['code'])
assert query['code'] == [code.uuid]
code = query['code'][0]
token_url = make_url('oidc-token')
params = {
'grant_type': 'authorization_code',
'redirect_uri': oidc_client.redirect_uris.split()[0],
}
response = app.post(
token_url, params=params, headers=client_authentication_headers(oidc_client), status=400
)
assert response.json['error'] == 'invalid_request'
assert response.json['error_description'] == 'Missing parameter "code"'
params['code'] = code
response = app.post(
token_url, params=params, headers=client_authentication_headers(oidc_client), status=400
)
assert 'error' in response.json
assert response.json['error'] == 'invalid_request'
assert response.json['error_description'] == 'Parameter "code" has expired or user is disconnected'
# invalid logout
logout_url = make_url(
'oidc-logout',
params={
'post_logout_redirect_uri': 'https://whatever.com/',
},
)
response = app.get(logout_url)
assert '_auth_user_id' in app.session
assert 'Location' in response.headers
# check code expiration after logout
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
code = OIDCCode.objects.get()
code.expired = now() + datetime.timedelta(seconds=120)
code.save()
assert code.is_valid()
utils.logout(app)
code = OIDCCode.objects.get()
assert not code.is_valid()
response = app.post(
token_url,
params={
'grant_type': 'authorization_code',
'code': code.uuid,
'redirect_uri': oidc_client.redirect_uris.split()[0],
},
headers=client_authentication_headers(oidc_client),
status=400,
)
assert 'error' in response.json
assert response.json['error'] == 'invalid_request'
assert response.json['error_description'] == 'Parameter "code" has expired or user is disconnected'
def test_expired_manager(db, simple_user):
expired = now() - datetime.timedelta(seconds=1)
not_expired = now() + datetime.timedelta(days=1)
client = OIDCClient.objects.create(
name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
)
OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=expired)
OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=not_expired)
assert OIDCAuthorization.objects.count() == 2
OIDCAuthorization.objects.cleanup()
assert OIDCAuthorization.objects.count() == 1
OIDCCode.objects.create(
client=client,
user=simple_user,
scopes='openid',
redirect_uri='https://example.com/',
session_key='xxx',
auth_time=now(),
expired=expired,
)
OIDCCode.objects.create(
client=client,
user=simple_user,
scopes='openid',
redirect_uri='https://example.com/',
session_key='xxx',
auth_time=now(),
expired=not_expired,
)
assert OIDCCode.objects.count() == 2
OIDCCode.objects.cleanup()
assert OIDCCode.objects.count() == 1
OIDCAccessToken.objects.create(
client=client, user=simple_user, scopes='openid', session_key='xxx', expired=expired
)
OIDCAccessToken.objects.create(
client=client, user=simple_user, scopes='openid', session_key='xxx', expired=not_expired
)
assert OIDCAccessToken.objects.count() == 2
OIDCAccessToken.objects.cleanup()
assert OIDCAccessToken.objects.count() == 1
@pytest.fixture
def simple_oidc_client(db):
return OIDCClient.objects.create(
name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
)
def test_client_secret_post_authentication(oidc_settings, app, simple_oidc_client, simple_user):
utils.login(app, simple_user)
redirect_uri = simple_oidc_client.redirect_uris.split()[0]
params = {
'client_id': simple_oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
}
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
response = response.form.submit('accept')
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(
token_url,
params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': redirect_uri,
'client_id': simple_oidc_client.client_id,
'client_secret': simple_oidc_client.client_secret,
},
)
assert 'error' not in response.json
assert 'access_token' in response.json
assert 'expires_in' in response.json
assert 'id_token' in response.json
assert response.json['token_type'] == 'Bearer'
@pytest.mark.parametrize('login_first', [(True,), (False,)])
def test_role_control_access(login_first, oidc_settings, oidc_client, simple_user, app):
# authorized_role
role_authorized = get_role_model().objects.create(name='Goth Kids', slug='goth-kids', ou=get_default_ou())
oidc_client.add_authorized_role(role_authorized)
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
}
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
params['response_type'] = 'code'
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
params['response_type'] = 'token id_token'
authorize_url = make_url('oidc-authorize', params=params)
if login_first:
utils.login(app, simple_user)
# user not authorized
response = app.get(authorize_url)
assert 'https://example.com/southpark/' in response.text
# user authorized
simple_user.roles.add(role_authorized)
simple_user.save()
response = app.get(authorize_url)
if not login_first:
response = response.follow()
response.form.set('username', simple_user.username)
response.form.set('password', simple_user.username)
response = response.form.submit(name='login-password-submit')
response = response.follow()
if oidc_client.authorization_mode != oidc_client.AUTHORIZATION_MODE_NONE:
response.form['do_not_ask_again'] = True
response = response.form.submit('accept')
assert OIDCAuthorization.objects.get()
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
code = OIDCCode.objects.get()
location = urllib.parse.urlparse(response['Location'])
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
query = urllib.parse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(
token_url,
params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
},
headers=client_authentication_headers(oidc_client),
)
id_token = response.json['id_token']
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
query = urllib.parse.parse_qs(location.fragment)
id_token = query['id_token'][0]
if oidc_client.idtoken_algo in (oidc_client.ALGO_RSA, oidc_client.ALGO_EC):
key = JWKSet.from_json(app.get(reverse('oidc-certs')).content)
algs = ['RS256', 'ES256']
elif oidc_client.idtoken_algo == oidc_client.ALGO_HMAC:
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
key = JWK(kty='oct', k=force_text(k))
algs = ['HS256']
else:
raise NotImplementedError
jwt = JWT(jwt=id_token, key=key, algs=algs)
claims = json.loads(jwt.claims)
if login_first:
assert claims['acr'] == '0'
else:
assert claims['acr'] == '1'
def test_registration_service_slug(oidc_settings, app, simple_oidc_client, simple_user, hooks, mailoutbox):
redirect_uri = simple_oidc_client.redirect_uris.split()[0]
params = {
'client_id': simple_oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
}
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
assert query['service'] == ['default client']
response = response.follow().click('Register')
location = urllib.parse.urlparse(response.request.url)
query = urllib.parse.parse_qs(location.query)
assert query['service'] == ['default client']
response.form.set('email', 'john.doe@example.com')
response = response.form.submit()
assert len(mailoutbox) == 1
link = utils.get_link_from_mail(mailoutbox[0])
response = app.get(link)
response.form.set('first_name', 'John')
response.form.set('last_name', 'Doe')
response.form.set('password1', 'T0==toto')
response.form.set('password2', 'T0==toto')
response = response.form.submit()
assert hooks.event[0]['kwargs']['name'] == 'sso-request'
assert hooks.event[0]['kwargs']['service'].slug == 'client'
assert hooks.event[1]['kwargs']['name'] == 'registration'
assert hooks.event[1]['kwargs']['service'] == 'client'
assert hooks.event[2]['kwargs']['name'] == 'login'
assert hooks.event[2]['kwargs']['how'] == 'email'
assert hooks.event[2]['kwargs']['service'] == 'client'
def test_oidclient_claims_data_migration(migration):
app = 'authentic2_idp_oidc'
migrate_from = [(app, '0009_auto_20180313_1156')]
migrate_to = [(app, '0010_oidcclaim')]
old_apps = migration.before(migrate_from)
OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
client = OIDCClient(name='test', slug='test', redirect_uris='https://example.net/')
client.save()
new_apps = migration.apply(migrate_to)
OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
client = OIDCClient.objects.first()
assert OIDCClaim.objects.filter(client=client.id).count() == 5
def test_oidclient_preferred_username_as_identifier_data_migration(migration):
app = 'authentic2_idp_oidc'
migrate_from = [(app, '0010_oidcclaim')]
migrate_to = [(app, '0011_auto_20180808_1546')]
old_apps = migration.before(migrate_from)
OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
OIDCClaim = old_apps.get_model('authentic2_idp_oidc', 'OIDCClaim')
client1 = OIDCClient.objects.create(name='test', slug='test', redirect_uris='https://example.net/')
client2 = OIDCClient.objects.create(name='test1', slug='test1', redirect_uris='https://example.net/')
client3 = OIDCClient.objects.create(name='test2', slug='test2', redirect_uris='https://example.net/')
client4 = OIDCClient.objects.create(name='test3', slug='test3', redirect_uris='https://example.net/')
for client in (client1, client2, client3, client4):
if client.name == 'test1':
continue
if client.name == 'test3':
OIDCClaim.objects.create(
client=client, name='preferred_username', value='django_user_full_name', scopes='profile'
)
else:
OIDCClaim.objects.create(
client=client, name='preferred_username', value='django_user_username', scopes='profile'
)
OIDCClaim.objects.create(
client=client, name='given_name', value='django_user_first_name', scopes='profile'
)
OIDCClaim.objects.create(
client=client, name='family_name', value='django_user_last_name', scopes='profile'
)
if client.name == 'test2':
continue
OIDCClaim.objects.create(client=client, name='email', value='django_user_email', scopes='email')
OIDCClaim.objects.create(
client=client, name='email_verified', value='django_user_email_verified', scopes='email'
)
new_apps = migration.apply(migrate_to)
OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
client = OIDCClient.objects.first()
for client in OIDCClient.objects.all():
claims = client.oidcclaim_set.all()
if client.name == 'test':
assert claims.count() == 5
assert sorted(claims.values_list('name', flat=True)) == [
'email',
'email_verified',
'family_name',
'given_name',
'preferred_username',
]
assert sorted(claims.values_list('value', flat=True)) == [
'django_user_email',
'django_user_email_verified',
'django_user_first_name',
'django_user_identifier',
'django_user_last_name',
]
elif client.name == 'test2':
assert claims.count() == 3
assert sorted(claims.values_list('name', flat=True)) == [
'family_name',
'given_name',
'preferred_username',
]
assert sorted(claims.values_list('value', flat=True)) == [
'django_user_first_name',
'django_user_last_name',
'django_user_username',
]
elif client.name == 'test3':
assert claims.count() == 5
assert sorted(claims.values_list('name', flat=True)) == [
'email',
'email_verified',
'family_name',
'given_name',
'preferred_username',
]
assert sorted(claims.values_list('value', flat=True)) == [
'django_user_email',
'django_user_email_verified',
'django_user_first_name',
'django_user_full_name',
'django_user_last_name',
]
else:
assert claims.count() == 0
def test_api_synchronization(app, oidc_client):
oidc_client.has_api_access = True
oidc_client.save()
users = [User.objects.create(username='user-%s' % i) for i in range(10)]
for user in users[5:]:
user.delete()
deleted_subs = set(make_sub(oidc_client, user) for user in users[5:])
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
status = 200
if oidc_client.identifier_policy not in (OIDCClient.POLICY_PAIRWISE_REVERSIBLE, OIDCClient.POLICY_UUID):
status = 401
response = app.post_json(
'/api/users/synchronization/',
params={'known_uuids': [make_sub(oidc_client, user) for user in users]},
status=status,
)
if status == 200:
assert response.json['result'] == 1
assert set(response.json['unknown_uuids']) == deleted_subs
def test_claim_default_value(oidc_settings, normal_oidc_client, simple_user, app):
oidc_settings.A2_IDP_OIDC_SCOPES = ['openid', 'profile', 'email', 'phone']
Attribute.objects.create(
name='phone',
label='phone',
kind='phone_number',
asked_on_registration=False,
required=False,
user_visible=False,
user_editable=False,
)
OIDCClaim.objects.create(
client=normal_oidc_client, name='phone', value='django_user_phone', scopes='phone'
)
normal_oidc_client.authorization_flow = normal_oidc_client.FLOW_AUTHORIZATION_CODE
normal_oidc_client.authorization_mode = normal_oidc_client.AUTHORIZATION_MODE_NONE
normal_oidc_client.save()
utils.login(app, simple_user)
simple_user.username = None
simple_user.save()
oidc_client = normal_oidc_client
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid email profile phone',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
}
def sso():
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(
token_url,
params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
},
headers=client_authentication_headers(oidc_client),
)
access_token = response.json['access_token']
id_token = response.json['id_token']
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
key = JWK(kty='oct', k=force_text(k))
jwt = JWT(jwt=id_token, key=key)
claims = json.loads(jwt.claims)
user_info_url = make_url('oidc-user-info')
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
return claims, response.json
claims, user_info = sso()
assert claims['sub'] == make_sub(oidc_client, simple_user)
assert claims['preferred_username'] == ''
assert claims['given_name'] == simple_user.first_name
assert claims['family_name'] == simple_user.last_name
assert claims['email'] == simple_user.email
assert claims['phone'] is None
assert claims['email_verified'] is False
assert user_info['sub'] == make_sub(oidc_client, simple_user)
assert user_info['preferred_username'] == ''
assert user_info['given_name'] == simple_user.first_name
assert user_info['family_name'] == simple_user.last_name
assert user_info['email'] == simple_user.email
assert user_info['phone'] is None
assert user_info['email_verified'] is False
params['scope'] = 'openid email'
claims, user_info = sso()
assert claims['sub'] == make_sub(oidc_client, simple_user)
assert claims['email'] == simple_user.email
assert claims['email_verified'] is False
assert 'phone' not in claims
assert 'preferred_username' not in claims
assert 'given_name' not in claims
assert 'family_name' not in claims
assert user_info['sub'] == make_sub(oidc_client, simple_user)
assert user_info['email'] == simple_user.email
assert user_info['email_verified'] is False
assert 'phone' not in user_info
assert 'preferred_username' not in user_info
assert 'given_name' not in user_info
assert 'family_name' not in user_info
def test_claim_templated(oidc_settings, normal_oidc_client, simple_user, app):
oidc_settings.A2_IDP_OIDC_SCOPES = ['openid', 'profile', 'email']
OIDCClaim.objects.filter(client=normal_oidc_client, name='given_name').delete()
OIDCClaim.objects.filter(client=normal_oidc_client, name='family_name').delete()
OIDCClaim.objects.create(
client=normal_oidc_client,
name='given_name',
value='{{ django_user_first_name|add:"ounet" }}',
scopes='profile',
)
OIDCClaim.objects.create(
client=normal_oidc_client,
name='family_name',
value='{{ "Von der "|add:django_user_last_name }}',
scopes='profile',
)
normal_oidc_client.authorization_flow = normal_oidc_client.FLOW_AUTHORIZATION_CODE
normal_oidc_client.authorization_mode = normal_oidc_client.AUTHORIZATION_MODE_NONE
normal_oidc_client.save()
utils.login(app, simple_user)
oidc_client = normal_oidc_client
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid email profile',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
}
def sso():
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(
token_url,
params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
},
headers=client_authentication_headers(oidc_client),
)
access_token = response.json['access_token']
id_token = response.json['id_token']
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
key = JWK(kty='oct', k=force_text(k))
jwt = JWT(jwt=id_token, key=key)
claims = json.loads(jwt.claims)
user_info_url = make_url('oidc-user-info')
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
return claims, response.json
claims, user_info = sso()
assert claims['given_name'].endswith('ounet')
assert claims['given_name'].startswith(simple_user.first_name)
assert claims['family_name'].startswith('Von der')
assert claims['family_name'].endswith(simple_user.last_name)
assert user_info['given_name'].endswith('ounet')
assert user_info['given_name'].startswith(simple_user.first_name)
assert user_info['family_name'].startswith('Von der')
assert user_info['family_name'].endswith(simple_user.last_name)
def test_client_validate_redirect_uri():
client = OIDCClient(
redirect_uris='''http://example.com
http://example2.com/
http://example3.com/toto
http://*example4.com/
http://example5.com/toto*
http://example6.com/#*
http://example7.com/?*
http://example8.com/?*#*
'''
)
# ok
for uri in [
'http://example.com',
'http://example.com/',
'http://example2.com',
'http://example2.com/',
'http://example3.com/toto',
'http://example3.com/toto/',
'http://example4.com/',
'http://example4.com',
'http://coin.example4.com',
'http://coin.example4.com/',
'http://example5.com/toto',
'http://example5.com/toto/',
'http://example5.com/toto/tata',
'http://example5.com/toto/tata/',
'http://example6.com/#some-fragment',
'http://example7.com/?foo=bar',
'http://example8.com/?foo=bar#some-fragment',
]:
client.validate_redirect_uri(uri)
# nok
for uri in [
'http://coin.example.com/',
'http://example.com/toto/',
'http://coin.example.com',
'http://example3.com/',
'http://example3.com',
'http://coinexample4.com',
'http://coinexample4.com/',
'http://example5.com/tototata/',
'http://example5.com/tototata',
'http://example6.com/?foo=bar',
'http://example7.com/#some-fragment',
]:
with pytest.raises(ValueError, match=r'is not declared'):
client.validate_redirect_uri(uri)
client.validate_redirect_uri('http://example5.com/toto/' + 'a' * 500)
with pytest.raises(ValueError, match=r'redirect_uri length >'):
client.validate_redirect_uri('http://example5.com/toto/' + 'a' * 1024)
def test_filter_api_users(app, oidc_client, admin, simple_user, role_random):
oidc_client.has_api_access = True
oidc_client.save()
if oidc_client.identifier_policy not in (oidc_client.POLICY_UUID, oidc_client.POLICY_PAIRWISE_REVERSIBLE):
return
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
response = app.get('/api/users/')
count = len(response.json['results'])
assert count > 0
AuthorizedRole.objects.create(service=oidc_client, role=role_random)
response = app.get('/api/users/')
assert len(response.json['results']) == 0
role_random.members.add(simple_user)
response = app.get('/api/users/')
assert len(response.json['results']) == 1
AuthorizedRole.objects.all().delete()
response = app.get('/api/users/')
assert len(response.json['results']) == count
def test_credentials_grant(app, oidc_client, admin, simple_user):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.scope = 'openid'
oidc_client.save()
token_url = make_url('oidc-token')
if oidc_client.idtoken_algo == OIDCClient.ALGO_HMAC:
k = base64url(oidc_client.client_secret.encode('utf-8'))
jwk = JWK(kty='oct', k=force_text(k))
elif oidc_client.idtoken_algo == OIDCClient.ALGO_RSA:
jwk = get_first_rsa_sig_key()
elif oidc_client.idtoken_algo == OIDCClient.ALGO_EC:
jwk = get_first_ec_sig_key()
# 1. test in-request client credentials
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
response = app.post(token_url, params=params)
assert 'id_token' in response.json
token = response.json['id_token']
header, payload, signature = token.split('.')
jwt = JWT()
# jwt deserialization implicitly checks the token signature:
jwt.deserialize(token, key=jwk)
claims = json.loads(jwt.claims)
assert set(claims) == set(['acr', 'aud', 'auth_time', 'exp', 'iat', 'iss', 'sub'])
assert all(claims.values())
# 2. test basic authz
params.pop('client_id')
params.pop('client_secret')
response = app.post(token_url, params=params, headers=client_authentication_headers(oidc_client))
assert 'id_token' in response.json
token = response.json['id_token']
header, payload, signature = token.split('.')
jwt = JWT()
# jwt deserialization implicitly checks the token signature:
jwt.deserialize(token, key=jwk)
claims = json.loads(jwt.claims)
assert set(claims) == set(['acr', 'aud', 'auth_time', 'exp', 'iat', 'iss', 'sub'])
assert all(claims.values())
def test_credentials_grant_ratelimitation_invalid_client(
app, oidc_client, admin, simple_user, oidc_settings, freezer
):
freezer.move_to('2020-01-01')
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
token_url = make_url('oidc-token')
params = {
'client_id': oidc_client.client_id,
'client_secret': 'notgood',
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
for i in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])):
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_client'
assert 'Wrong client secret' in response.json['error_description']
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_request'
assert response.json['error_description'] == 'Rate limit exceeded for IP address "127.0.0.1"'
def test_credentials_grant_ratelimitation_valid_client(
app, oidc_client, admin, simple_user, oidc_settings, freezer
):
freezer.move_to('2020-01-01')
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
token_url = make_url('oidc-token')
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
for i in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])):
app.post(token_url, params=params)
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_client'
assert response.json['error_description'] == 'Rate limit of 100/m exceeded for client "oidcclient"'
def test_credentials_grant_retrytimout(app, oidc_client, admin, simple_user, settings, freezer):
freezer.move_to('2020-01-01')
settings.A2_LOGIN_EXPONENTIAL_RETRY_TIMEOUT_DURATION = 2
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
token_url = make_url('oidc-token')
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': 'SurelyNotTheRightPassword',
}
attempts = 0
while attempts < 100:
response = app.post(token_url, params=params, status=400)
attempts += 1
if attempts >= 10:
assert response.json['error'] == 'invalid_request'
assert 'Too many attempts with erroneous RO password' in response.json['error_description']
# freeze some time after backoff delay expiration
freezer.move_to(datetime.timedelta(days=2))
# obtain a successful login
params['password'] = simple_user.username
response = app.post(token_url, params=params, status=200)
assert 'id_token' in response.json
def test_credentials_grant_invalid_flow(app, oidc_client, admin, simple_user, settings):
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': 'SurelyNotTheRightPassword',
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'unauthorized_client'
assert 'is not configured' in response.json['error_description']
def test_credentials_grant_invalid_client(app, oidc_client, admin, simple_user, settings):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': 'tryingthis', # Nope, wrong secret
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_client'
assert response.json['error_description'] == 'Wrong client secret'
def test_credentials_grant_unauthz_client(app, oidc_client, admin, simple_user, settings):
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'unauthorized_client'
assert 'Client is not configured for resource owner' in response.json['error_description']
def test_credentials_grant_invalid_content_type(app, oidc_client, admin, simple_user, settings):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, content_type='multipart/form-data', status=400)
assert response.json['error'] == 'invalid_request'
assert 'Wrong content type' in response.json['error_description']
def test_credentials_grant_ou_selection_simple(
app, oidc_client, admin, user_ou1, user_ou2, ou1, ou2, settings
):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'ou_slug': ou1.slug,
'username': user_ou1.username,
'password': user_ou1.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=200)
params['username'] = user_ou2.username
params['password'] = user_ou2.password
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'access_denied'
assert response.json['error_description'] == 'Invalid user credentials'
def test_credentials_grant_ou_selection_username_not_unique(
app, oidc_client, admin, user_ou1, admin_ou2, ou1, ou2, settings
):
settings.A2_USERNAME_IS_UNIQUE = False
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
admin_ou2.username = user_ou1.username
admin_ou2.set_password(user_ou1.username)
admin_ou2.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'ou_slug': ou1.slug,
'username': user_ou1.username,
'password': user_ou1.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params)
assert OIDCAccessToken.objects.get(uuid=response.json['access_token']).user == user_ou1
params['ou_slug'] = ou2.slug
response = app.post(token_url, params=params)
assert OIDCAccessToken.objects.get(uuid=response.json['access_token']).user == admin_ou2
def test_credentials_grant_ou_selection_username_not_unique_wrong_ou(
app, oidc_client, admin, user_ou1, admin_ou2, ou1, ou2, settings
):
settings.A2_USERNAME_IS_UNIQUE = False
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'ou_slug': ou2.slug,
'username': user_ou1.username,
'password': user_ou1.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
params['ou_slug'] = ou1.slug
params['username'] = admin_ou2.username
params['password'] = admin_ou2.password
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'access_denied'
assert response.json['error_description'] == 'Invalid user credentials'
def test_credentials_grant_ou_selection_invalid_ou(app, oidc_client, admin, user_ou1, settings):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'ou_slug': 'invalidslug',
'username': user_ou1.username,
'password': user_ou1.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_request'
assert (
response.json['error_description']
== 'Parameter "ou_slug" does not match an existing organizational unit'
)
def test_oidc_client_clean():
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/', identifier_policy=OIDCClient.POLICY_UUID
).clean()
with pytest.raises(ValidationError, match=r'same domain'):
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/',
identifier_policy=OIDCClient.POLICY_PAIRWISE_REVERSIBLE,
).clean()
with pytest.raises(ValidationError, match=r'same domain'):
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/',
identifier_policy=OIDCClient.POLICY_PAIRWISE,
).clean()
with pytest.raises(ValidationError, match=r'same domain'):
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/',
identifier_policy=OIDCClient.POLICY_PAIRWISE,
).clean()
with pytest.raises(ValidationError, match=r'within an OU'):
OIDCClient(authorization_mode=OIDCClient.AUTHORIZATION_MODE_BY_OU, ou=None).clean()
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/',
sector_identifier_uri='https://example.com/',
).clean()
def test_oidc_authorized_oauth_services_view(app, oidc_client, simple_user):
from django.contrib.contenttypes.models import ContentType
url = make_url('authorized-oauth-services')
response = app.get(url, status=302)
assert '/login/' in response.location
utils.login(app, simple_user)
response = app.get(url, status=200)
assert "You have not given any authorization to access your account profile data." in response.text
# create an ou authz
OU = get_ou_model()
ou1 = OU.objects.create(name='Orgunit1', slug='orgunit1')
OIDCAuthorization.objects.create(
client=ou1,
user=simple_user,
scopes='openid profile email',
expired=now() + datetime.timedelta(days=2),
)
# create service authzs
OIDCAuthorization.objects.create(
client=oidc_client, user=simple_user, scopes='openid', expired=now() + datetime.timedelta(days=2)
)
OIDCAuthorization.objects.create(
client=oidc_client,
user=simple_user,
scopes='openid profile',
expired=now() + datetime.timedelta(days=2),
)
OIDCAuthorization.objects.create(
client=oidc_client,
user=simple_user,
scopes='openid profile email',
expired=now() + datetime.timedelta(days=2),
)
response = app.get(url, status=200)
assert "You have given authorizations to access your account profile data." in response.text
assert len(response.html.find_all('button', {'class': 'authorized-oauth-services--revoke-button'})) == 4
# revoke two service authz
response = response.forms[1].submit()
response = response.follow()
assert len(response.html.find_all('button', {'class': 'authorized-oauth-services--revoke-button'})) == 3
assert (
OIDCAuthorization.objects.filter(client_ct=ContentType.objects.get_for_model(OIDCClient)).count() == 2
)
response = response.forms[1].submit()
response = response.follow()
assert len(response.html.find_all('button', {'class': 'authorized-oauth-services--revoke-button'})) == 2
assert (
OIDCAuthorization.objects.filter(client_ct=ContentType.objects.get_for_model(OIDCClient)).count() == 1
)
# revoke the only OU authz
response = response.forms[0].submit()
response = response.follow()
assert len(response.html.find_all('button', {'class': 'authorized-oauth-services--revoke-button'})) == 1
assert OIDCAuthorization.objects.filter(client_ct=ContentType.objects.get_for_model(OU)).count() == 0
def test_oidc_good_next_url_hook(app, oidc_client):
from django.test.client import RequestFactory
rf = RequestFactory()
request = rf.get('/')
assert good_next_url(request, 'https://example.com/')
@pytest.fixture
def access_token(client, simple_user):
return OIDCAccessToken.objects.create(
client=client,
user=simple_user,
scopes='openid profile email',
expired=now() + datetime.timedelta(seconds=3600),
)
def test_user_info(app, client, access_token, freezer):
def get_user_info(**kwargs):
return app.get(
'/idp/oidc/user_info/', headers=bearer_authentication_headers(access_token.uuid), **kwargs
)
response = app.get('/idp/oidc/user_info/', status=401)
assert (
response['WWW-Authenticate']
== 'Bearer error="invalid_request", error_description="Bearer authentication is mandatory"'
)
response = app.get('/idp/oidc/user_info/', headers={'Authorization': 'Bearer'}, status=401)
assert (
response['WWW-Authenticate']
== 'Bearer error="invalid_request", error_description="Invalid Bearer authentication"'
)
response = get_user_info(status=200)
assert dict(response.json, sub='') == {
'email': 'user@example.net',
'email_verified': False,
'family_name': 'Dôe',
'family_name_verified': True,
'given_name': 'Jôhn',
'given_name_verified': True,
'preferred_username': 'user',
'sub': '',
}
# token is expired
access_token.expired = now() - datetime.timedelta(seconds=1)
access_token.save()
response = get_user_info(status=401)
assert (
response['WWW-Authenticate']
== 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
)
# token is unknown
access_token.delete()
response = get_user_info(status=401)
assert response['WWW-Authenticate'] == 'Bearer error="invalid_token", error_description="Token unknown"'
utils.login(app, access_token.user)
access_token.expired = now() + datetime.timedelta(seconds=1)
access_token.session_key = app.session.session_key
access_token.save()
get_user_info(status=200)
app.session.flush()
response = get_user_info(status=401)
assert (
response['WWW-Authenticate']
== 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
)
@pytest.fixture
def session(settings, db, simple_user):
engine = import_module(settings.SESSION_ENGINE)
session = engine.SessionStore()
session['_auth_user_id'] = str(simple_user.id)
session.create()
return session
def test_access_token_is_valid_session(simple_oidc_client, simple_user, session):
token = OIDCAccessToken.objects.create(
client=simple_oidc_client, user=simple_user, scopes='openid', session_key=session.session_key
)
assert token.is_valid()
session.flush()
assert not token.is_valid()
def test_access_token_is_valid_expired(simple_oidc_client, simple_user, freezer):
start = now()
expired = start + datetime.timedelta(seconds=30)
token = OIDCAccessToken.objects.create(
client=simple_oidc_client, user=simple_user, scopes='openid', expired=expired
)
assert token.is_valid()
freezer.move_to(expired)
assert token.is_valid()
freezer.move_to(expired + datetime.timedelta(seconds=1))
assert not token.is_valid()
def test_access_token_is_valid_session_and_expired(simple_oidc_client, simple_user, session, freezer):
start = now()
expired = start + datetime.timedelta(seconds=30)
token = OIDCAccessToken.objects.create(
client=simple_oidc_client,
user=simple_user,
scopes='openid',
session_key=session.session_key,
expired=expired,
)
assert token.is_valid()
freezer.move_to(expired)
assert token.is_valid()
freezer.move_to(expired + datetime.timedelta(seconds=1))
assert not token.is_valid()
freezer.move_to(start)
assert token.is_valid()
session.flush()
assert not token.is_valid()