authentic/tests/test_idp_oidc.py

1685 lines
66 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import json
import datetime
import pytest
from jwcrypto.jwt import JWT
from jwcrypto.jwk import JWKSet, JWK
from . import utils
from django import VERSION as DJ_VERSION
from django.core.exceptions import ValidationError
from django.core.files import File
from django.test.utils import override_settings
from django.urls import reverse
from django.utils.encoding import force_text
from django.utils.timezone import now
from django.contrib.auth import get_user_model
from django.utils.six.moves.urllib import parse as urlparse
from authentic2.models import Attribute, AuthorizedRole
from authentic2_idp_oidc.models import OIDCClient, OIDCAuthorization, OIDCCode, OIDCAccessToken, OIDCClaim
from authentic2_idp_oidc.utils import base64url
from authentic2_idp_oidc.utils import get_first_rsa_sig_key
from authentic2_idp_oidc.utils import get_first_ec_sig_key
from authentic2_idp_oidc.utils import make_sub
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.utils import make_url
from authentic2_auth_oidc.utils import parse_timestamp
from django_rbac.utils import get_ou_model
from django_rbac.utils import get_role_model
User = get_user_model()
pytestmark = pytest.mark.django_db
JWKSET = {
"keys": [
{
"qi": "h_zifVD-ChelxZUVxhICNcgGkQz26b-EdIlLY9rN7SX_aD3sLI_JHEHV4Bz3kV5eW8O4qJ8SHhfUdHGK-gRH7FVOGoXnXACf47QoXowHzsPLL64wCuZENTl7hIRGLY-BInULkfTQfuiVSMoxPjsVNTMBzMiz0bNjMQyMyvW5xH4",
"kty": "RSA",
"d": "pUcL4-LDBy3rqJWip269h5Hd6nLvqjXltfkVe_mL-LwZPHmCrUaj_SX54SnCY3Wyf7kxhoMYUac62lQ71923uJPFFdiavAujbNrtZPq32i4C-1apWXW8OGJr8VoVDqalxj9SAq1G54wbbsaAPrZdyuqy-esNxDqDigfbM-cWgngBBYo5CSsfnmnd05N2cUS26L7QzWbNHwilnBTE9e_J7rK3xUCDKrobv6_LiI-AhMmBHJSrCxjexh0wzfBi_Ntj9BGCcPThDjG8SQvaV-aLNdLfIy2XO3i076RLBB6Hm_yHuAparrwp-pPE48eQdiYjrSAFalz4ojWQ3_ByLA6uAQ",
"q": "2FvfeWnIlWNUipan7DIBlJrmz5EinJNxrQ-BNwPHrAoIM8qvyC7jPy09YxZs5Y9CMMZSal6C4Nm2LHBFxHU9z1qd5XDzbk19G-y1lDqZizVXr876TpiAjuq03rcoMQm8dQru_pVjUdgxR64vKyJ9CaFMAqcpZeEMIqAvzhQG8uE",
"dp": "Kg4HPGpzenhK2ser6nfM1Yt-pkqBbWQotvqsxGptECXpbN7vweupvL5kJPeRrbsXKp9QE7DXTN1sG9puJxMSwtgiv4hr9Va9e9WOC6PMd2VY7tgw5uKMpPLMc5y82PusRhBoRh0SUUsjyQxK9PGtWYnGZXbAoaIYPdMyDlosfqU",
"dq": "QuUNEHYTjZTbo8n2-4FumarXKGBAalbwM8jyc7cYemnTpWfKt8M_gd4T99oMK2IC3h_DhZ3ZK3pE6DKCb76sMLtczH8C1RziTMsATWdc5_zDMtl07O4b-ZQ5_g51P8w515pc0JwRzFFi0z3Y2aZdMKgNX1id5SES5nXOshHhICE",
"n": "0lN6CiJGFD8BSPV_azLoEl6Nq-WlHkU743D5rqvzw1sOaxstMGxAhVk2YIhWwfvapV6XjO_yvc4778VBTELOdjRw6BGUdBJepdwkL__TPyjEVhqMQj9MKhEU4GUy9w0Lsilb5D01kfrOKpmdcYw4jhcDvb0H4-LZgh1Vk84vF4WaQCUg_AX4drVDQOjoU8kuWIM8gz9w6zEsbIw-gtMRpFwS8ncA0zDX5VfyC77iMxzFftDIP2gM5GvdevMzvP9IRkRRBhP9vV4JchBFPHSA9OPJcnySjJJNW6aAJn6P6JasN1z68khjufM09J8UzmLAZYOq7gUG95Ox1KsV-g337Q",
"e": "AQAB",
"p": "-Nyj_Sw3f2HUqSssCZv84y7b3blOtGGAhfYN_JtGfcTQv2bOtxrIUzeonCi-Z_1W4hO10tqxJcOB0ibtDqkDlLhnLaIYOBfriITRFK83EJG5sC-0KTmFzUXFTA2aMc1QgP-Fu6gUfQpPqLgWxhx8EFhkBlBZshKU5-C-385Sco0"
},
{
"kty": "EC",
"d": "wwULaR9UYWZW6U2oEbkz3sO1lhPSj6DyA6e7PiUfhog",
"use": "sig",
"crv": "P-256",
"x": "HZMHZkX-63heqA5pvWn-UR7bgcXZNEcQa5wfvG_BzTw",
"y": "SUCuwjjiyKvGq5Odr0sjDqjha_CBqks0JQFrR7Ei5OQ",
"alg": "ES256"
}
]
}
@pytest.fixture
def oidc_settings(settings):
settings.A2_IDP_OIDC_JWKSET = JWKSET
settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m'
return settings
def test_get_jwkset(oidc_settings):
from authentic2_idp_oidc.utils import get_jwkset
get_jwkset()
OIDC_CLIENT_PARAMS = [
{
'authorization_flow': OIDCClient.FLOW_IMPLICIT,
},
{
'post_logout_redirect_uris': 'https://example.com/',
},
{
'identifier_policy': OIDCClient.POLICY_UUID,
'post_logout_redirect_uris': 'https://example.com/',
},
{
'identifier_policy': OIDCClient.POLICY_EMAIL,
},
{
'idtoken_algo': OIDCClient.ALGO_HMAC,
},
{
'idtoken_algo': OIDCClient.ALGO_EC,
},
{
'authorization_mode': OIDCClient.AUTHORIZATION_MODE_NONE,
},
{
'idtoken_duration': datetime.timedelta(hours=1),
},
{
'authorization_flow': OIDCClient.FLOW_IMPLICIT,
'idtoken_duration': datetime.timedelta(hours=1),
'post_logout_redirect_uris': 'https://example.com/',
},
{
'frontchannel_logout_uri': 'https://example.com/southpark/logout/',
},
{
'frontchannel_logout_uri': 'https://example.com/southpark/logout/',
'frontchannel_timeout': 3000,
},
{
'identifier_policy': OIDCClient.POLICY_PAIRWISE_REVERSIBLE,
},
]
@pytest.fixture(params=OIDC_CLIENT_PARAMS)
def oidc_client(request, superuser, app, simple_user, oidc_settings):
Attribute.objects.create(
name='cityscape_image',
label='cityscape',
kind='profile_image',
asked_on_registration=True,
required=False,
user_visible=True,
user_editable=True)
url = reverse('admin:authentic2_idp_oidc_oidcclient_add')
assert OIDCClient.objects.count() == 0
response = utils.login(app, superuser, path=url)
response.form.set('name', 'oidcclient')
response.form.set('slug', 'oidcclient')
response.form.set('ou', get_default_ou().pk)
response.form.set('unauthorized_url', 'https://example.com/southpark/')
response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9')
for key, value in request.param.items():
response.form.set(key, value)
response = response.form.submit().follow()
assert OIDCClient.objects.count() == 1
client = OIDCClient.objects.get()
utils.logout(app)
return client
@pytest.fixture
def normal_oidc_client(superuser, app, simple_user):
url = reverse('admin:authentic2_idp_oidc_oidcclient_add')
assert OIDCClient.objects.count() == 0
response = utils.login(app, superuser, path=url)
response.form.set('name', 'oidcclient')
response.form.set('slug', 'oidcclient')
response.form.set('ou', get_default_ou().pk)
response.form.set('unauthorized_url', 'https://example.com/southpark/')
response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9')
response = response.form.submit(name='_save').follow()
assert OIDCClient.objects.count() == 1
client = OIDCClient.objects.get()
utils.logout(app)
return client
def client_authentication_headers(oidc_client):
client_creds = '%s:%s' % (oidc_client.client_id, oidc_client.client_secret)
token = base64.b64encode(client_creds.encode('ascii'))
return {'Authorization': 'Basic %s' % str(token.decode('ascii'))}
def bearer_authentication_headers(access_token):
return {'Authorization': 'Bearer %s' % str(access_token)}
@pytest.mark.parametrize('do_not_ask_again', [(True,), (False,)])
@pytest.mark.parametrize('login_first', [(True,), (False,)])
def test_authorization_code_sso(login_first, do_not_ask_again, oidc_settings, oidc_client, simple_user, app):
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'login_hint': 'backoffice john@example.com',
}
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
params['response_type'] = 'code'
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
params['response_type'] = 'token id_token'
authorize_url = make_url('oidc-authorize', params=params)
if login_first:
utils.login(app, simple_user)
response = app.get(authorize_url)
if not login_first:
assert set(app.session['login-hint']) == set(['backoffice', 'john@example.com'])
response = response.follow()
assert response.request.path == reverse('auth_login')
response.form.set('username', simple_user.username)
response.form.set('password', simple_user.username)
response = response.form.submit(name='login-password-submit')
response = response.follow()
assert response.request.path == reverse('oidc-authorize')
if oidc_client.authorization_mode != OIDCClient.AUTHORIZATION_MODE_NONE:
assert 'a2-oidc-authorization-form' in response.text
assert OIDCAuthorization.objects.count() == 0
assert OIDCCode.objects.count() == 0
assert OIDCAccessToken.objects.count() == 0
response.form['do_not_ask_again'] = do_not_ask_again
response = response.form.submit('accept')
if do_not_ask_again:
assert OIDCAuthorization.objects.count() == 1
authz = OIDCAuthorization.objects.get()
assert authz.client == oidc_client
assert authz.user == simple_user
assert authz.scope_set() == set('openid profile email'.split())
assert authz.expired >= now()
else:
assert OIDCAuthorization.objects.count() == 0
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert OIDCCode.objects.count() == 1
code = OIDCCode.objects.get()
assert code.client == oidc_client
assert code.user == simple_user
assert code.scope_set() == set('openid profile email'.split())
assert code.state == 'xxx'
assert code.nonce == 'yyy'
assert code.redirect_uri == redirect_uri
assert code.session_key == app.session.session_key
assert code.auth_time <= now()
assert code.expired >= now()
assert response['Location'].startswith(redirect_uri)
location = urlparse.urlparse(response['Location'])
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
query = urlparse.parse_qs(location.query)
assert set(query.keys()) == set(['code', 'state'])
assert query['code'] == [code.uuid]
code = query['code'][0]
assert query['state'] == ['xxx']
token_url = make_url('oidc-token')
response = app.post(token_url, params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
}, headers=client_authentication_headers(oidc_client))
assert 'error' not in response.json
assert 'access_token' in response.json
assert 'expires_in' in response.json
assert 'id_token' in response.json
assert response.json['token_type'] == 'Bearer'
access_token = response.json['access_token']
id_token = response.json['id_token']
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
assert location.fragment
query = urlparse.parse_qs(location.fragment)
assert OIDCAccessToken.objects.count() == 1
access_token = OIDCAccessToken.objects.get()
assert set(query.keys()) == set(['access_token', 'token_type', 'expires_in', 'id_token',
'state'])
assert query['access_token'] == [access_token.uuid]
assert query['token_type'] == ['Bearer']
assert query['state'] == ['xxx']
access_token = query['access_token'][0]
id_token = query['id_token'][0]
if oidc_client.idtoken_algo in (oidc_client.ALGO_RSA, oidc_client.ALGO_EC):
keyset = JWKSet.from_json(app.get(reverse('oidc-certs')).content)
for k in keyset.get('keys'):
if {
'RSA': oidc_client.ALGO_RSA,
'EC': oidc_client.ALGO_EC
}.get(k.key_type) == oidc_client.idtoken_algo:
algs=[{
oidc_client.ALGO_RSA: 'RS256',
oidc_client.ALGO_EC: 'ES256'
}.get(oidc_client.idtoken_algo)]
key = k
break
elif oidc_client.idtoken_algo == oidc_client.ALGO_HMAC:
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
key = JWK(kty='oct', k=force_text(k))
algs = ['HS256']
else:
raise NotImplementedError
jwt = JWT(jwt=id_token, key=key, algs=algs)
claims = json.loads(jwt.claims)
assert set(claims) >= set(['iss', 'sub', 'aud', 'exp', 'iat', 'nonce', 'auth_time', 'acr'])
assert claims['nonce'] == 'yyy'
assert response.request.url.startswith(claims['iss'])
assert claims['aud'] == oidc_client.client_id
assert parse_timestamp(claims['iat']) <= now()
assert parse_timestamp(claims['auth_time']) <= now()
exp_delta = (parse_timestamp(claims['exp']) - now()).total_seconds()
assert exp_delta > 0
if oidc_client.idtoken_duration:
assert abs(exp_delta - oidc_client.idtoken_duration.total_seconds()) < 2
else:
assert abs(exp_delta - 30) < 2
if login_first:
assert claims['acr'] == '0'
else:
assert claims['acr'] == '1'
assert claims['sub'] == make_sub(oidc_client, simple_user)
assert claims['preferred_username'] == simple_user.username
assert claims['given_name'] == simple_user.first_name
assert claims['family_name'] == simple_user.last_name
assert claims['email'] == simple_user.email
assert claims['email_verified'] is False
user_info_url = make_url('oidc-user-info')
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
assert response.json['sub'] == make_sub(oidc_client, simple_user)
assert response.json['preferred_username'] == simple_user.username
assert response.json['given_name'] == simple_user.first_name
assert response.json['family_name'] == simple_user.last_name
assert response.json['email'] == simple_user.email
assert response.json['email_verified'] is False
# when adding extra attributes
OIDCClaim.objects.create(client=oidc_client, name='ou', value='django_user_ou_name', scopes='profile')
OIDCClaim.objects.create(client=oidc_client, name='roles', value='a2_role_names', scopes='profile, role')
OIDCClaim.objects.create(client=oidc_client,
name='cityscape_image',
value='django_user_cityscape_image',
scopes='profile')
simple_user.roles.add(get_role_model().objects.create(
name='Whatever', slug='whatever', ou=get_default_ou()))
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
assert response.json['ou'] == simple_user.ou.name
assert response.json['roles'][0] == 'Whatever'
assert response.json.get('cityscape_image') is None
with open('tests/200x200.jpg', 'rb') as fd:
simple_user.attributes.cityscape_image = File(fd)
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
assert response.json['cityscape_image'].startswith('http://testserver/media/profile-image/')
# check against a user without username
simple_user.username = None
simple_user.save()
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
assert response.json['preferred_username'] == ''
# Now logout
if oidc_client.post_logout_redirect_uris:
params = {
'post_logout_redirect_uri': oidc_client.post_logout_redirect_uris,
'state': 'xyz',
}
logout_url = make_url('oidc-logout', params=params)
response = app.get(logout_url)
assert response.status_code == 302
assert response.location == 'https://example.com/?state=xyz'
assert '_auth_user_id' not in app.session
else:
response = app.get(make_url('account_management'))
response = response.click('Logout')
if oidc_client.frontchannel_logout_uri:
iframes = response.pyquery('iframe[src="https://example.com/southpark/logout/"]')
assert iframes
if oidc_client.frontchannel_timeout:
assert iframes.attr('onload').endswith(', %d)' % oidc_client.frontchannel_timeout)
else:
assert iframes.attr('onload').endswith(', 300)')
def assert_oidc_error(response, error, error_description=None, fragment=False):
location = urlparse.urlparse(response['Location'])
query = location.fragment if fragment else location.query
query = urlparse.parse_qs(query)
assert query['error'] == [error]
if error_description:
assert len(query['error_description']) == 1
assert error_description in query['error_description'][0]
def assert_authorization_response(response, fragment=False, **kwargs):
location = urlparse.urlparse(response['Location'])
query = location.fragment if fragment else location.query
query = urlparse.parse_qs(query)
for key, value in kwargs.items():
if value is None:
assert key in query
elif isinstance(value, list):
assert query[key] == value
else:
assert value in query[key][0]
def test_invalid_request(caplog, oidc_settings, oidc_client, simple_user, app):
redirect_uri = oidc_client.redirect_uris.split()[0]
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
fragment = False
response_type = 'code'
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
fragment = True
response_type = 'id_token token'
else:
raise NotImplementedError
# missing client_id
authorize_url = make_url('oidc-authorize', params={})
response = app.get(authorize_url, status=302)
assert urlparse.urlparse(response['Location']).path == '/'
response = response.maybe_follow()
assert 'Authorization request is invalid' in response
# missing redirect_uri
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
})
response = app.get(authorize_url, status=302)
assert urlparse.urlparse(response['Location']).path == '/'
response = response.maybe_follow()
assert 'Authorization request is invalid' in response
# invalid client_id
authorize_url = make_url('oidc-authorize', params={
'client_id': 'xxx',
'redirect_uri': redirect_uri,
})
response = app.get(authorize_url, status=302)
assert urlparse.urlparse(response['Location']).path == '/'
response = response.maybe_follow()
assert 'Authorization request is invalid' in response
# invalid redirect_uri
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': 'xxx',
'response_type': 'code',
'scope': 'openid',
})
response = app.get(authorize_url, status=302)
assert urlparse.urlparse(response['Location']).path == '/'
response = response.maybe_follow()
assert 'Authorization request is invalid' in response
assert not 'invalid redirect_uri' in response
with override_settings(DEBUG=True):
response = app.get(authorize_url, status=302)
assert urlparse.urlparse(response['Location']).path == '/'
response = response.maybe_follow()
assert 'invalid redirect_uri' in response
# missing response_type
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
})
response = app.get(authorize_url)
if DJ_VERSION < (2, 0):
errmsg1 = 'missing parameter \'response_type\''
errmsg2 = 'missing parameter \'scope\''
else:
errmsg1 = 'missing parameter response_type'
errmsg2 = 'missing parameter scope'
assert_oidc_error(response, 'invalid_request', errmsg1,
fragment=fragment)
logrecord = [rec for rec in caplog.records if rec.funcName == 'authorization_error'][0]
assert logrecord.levelname == 'WARNING'
assert logrecord.redirect_uri == 'https://example.com/callbac%C3%A9'
assert errmsg1 in logrecord.message
# missing scope
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'invalid_request', errmsg2, fragment=fragment)
# invalid max_age
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': 'openid',
'max_age': 'xxx',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'invalid_request', 'max_age is not', fragment=fragment)
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': 'openid',
'max_age': '-1',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'invalid_request', 'max_age is not', fragment=fragment)
# unsupported response_type
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': 'xxx',
'scope': 'openid',
})
response = app.get(authorize_url)
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert_oidc_error(response, 'unsupported_response_type', 'only code is supported')
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
assert_oidc_error(response, 'unsupported_response_type',
'only "id_token token" or "id_token" are supported', fragment=fragment)
# openid scope is missing
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'profile',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'invalid_request', 'openid scope is missing', fragment=fragment)
# use of an unknown scope
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile zob',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'invalid_scope', fragment=fragment)
# restriction on scopes
oidc_settings.A2_IDP_OIDC_SCOPES = ['openid']
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'invalid_scope', fragment=fragment)
del oidc_settings.A2_IDP_OIDC_SCOPES
# cancel
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'cancel': '1',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'access_denied', error_description='user did not authenticate',
fragment=fragment)
# prompt=none
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'none',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'login_required', error_description='prompt is none',
fragment=fragment)
utils.login(app, simple_user)
# prompt=none max_age=0
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'max_age': '0',
'prompt': 'none',
})
response = app.get(authorize_url)
assert_oidc_error(response, 'login_required', error_description='prompt is none',
fragment=fragment)
# max_age=0
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'max_age': '0',
})
response = app.get(authorize_url)
assert urlparse.urlparse(response['Location']).path == reverse('auth_login')
# prompt=login
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'login',
})
response = app.get(authorize_url)
assert urlparse.urlparse(response['Location']).path == reverse('auth_login')
# user refuse authorization
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'none',
})
response = app.get(authorize_url)
if oidc_client.authorization_mode != oidc_client.AUTHORIZATION_MODE_NONE:
assert_oidc_error(response, 'consent_required', error_description='prompt is none',
fragment=fragment)
# user refuse authorization
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
})
response = app.get(authorize_url)
if oidc_client.authorization_mode != oidc_client.AUTHORIZATION_MODE_NONE:
response = response.form.submit('refuse')
assert_oidc_error(response, 'access_denied', error_description='user denied access',
fragment=fragment)
# authorization exists
authorize = OIDCAuthorization.objects.create(
client=oidc_client, user=simple_user, scopes='openid profile email',
expired=now() + datetime.timedelta(days=2))
response = app.get(authorize_url)
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert_authorization_response(response, code=None, fragment=fragment)
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
assert_authorization_response(response, access_token=None, id_token=None, expires_in=None,
token_type=None, fragment=fragment)
# client ask for explicit authorization
authorize_url = make_url('oidc-authorize', params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'consent',
})
response = app.get(authorize_url)
assert 'a2-oidc-authorization-form' in response.text
# check all authorization have been deleted, it's our policy
assert OIDCAuthorization.objects.count() == 0
if oidc_client.authorization_mode == oidc_client.AUTHORIZATION_MODE_NONE:
# authorization mode is none, but explicit consent is asked, we validate it
response = response.form.submit('accept')
# authorization has expired
OIDCCode.objects.all().delete()
authorize.expired = now() - datetime.timedelta(days=2)
authorize.save()
response = app.get(authorize_url)
assert 'a2-oidc-authorization-form' in response.text
authorize.expired = now() + datetime.timedelta(days=2)
authorize.scopes = 'openid profile'
authorize.save()
assert OIDCAuthorization.objects.count() == 1
response.form['do_not_ask_again'] = True
response = response.form.submit('accept')
assert OIDCAuthorization.objects.count() == 1
# old authorizations have been deleted
assert OIDCAuthorization.objects.get().pk != authorize.pk
# check expired codes
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert OIDCCode.objects.count() == 1
code = OIDCCode.objects.get()
assert code.is_valid()
# make code expire
code.expired = now() - datetime.timedelta(seconds=120)
assert not code.is_valid()
code.save()
location = urlparse.urlparse(response['Location'])
query = urlparse.parse_qs(location.query)
assert set(query.keys()) == set(['code'])
assert query['code'] == [code.uuid]
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(token_url, params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
}, headers=client_authentication_headers(oidc_client), status=400)
assert 'error' in response.json
assert response.json['error'] == 'invalid_request'
assert response.json['error_description'] == 'code has expired or user is disconnected'
# invalid logout
logout_url = make_url('oidc-logout', params={
'post_logout_redirect_uri': 'https://whatever.com/',
})
response = app.get(logout_url)
assert '_auth_user_id' in app.session
assert 'Location' in response.headers
# check code expiration after logout
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
code = OIDCCode.objects.get()
code.expired = now() + datetime.timedelta(seconds=120)
code.save()
assert code.is_valid()
utils.logout(app)
code = OIDCCode.objects.get()
assert not code.is_valid()
response = app.post(token_url, params={
'grant_type': 'authorization_code',
'code': code.uuid,
'redirect_uri': oidc_client.redirect_uris.split()[0],
}, headers=client_authentication_headers(oidc_client), status=400)
assert 'error' in response.json
assert response.json['error'] == 'invalid_request'
assert response.json['error_description'] == 'code has expired or user is disconnected'
def test_expired_manager(db, simple_user):
expired = now() - datetime.timedelta(seconds=1)
not_expired = now() + datetime.timedelta(days=1)
client = OIDCClient.objects.create(
name='client',
slug='client',
ou=get_default_ou(),
redirect_uris='https://example.com/')
OIDCAuthorization.objects.create(
client=client,
user=simple_user,
scopes='openid',
expired=expired)
OIDCAuthorization.objects.create(
client=client,
user=simple_user,
scopes='openid',
expired=not_expired)
assert OIDCAuthorization.objects.count() == 2
OIDCAuthorization.objects.cleanup()
assert OIDCAuthorization.objects.count() == 1
OIDCCode.objects.create(
client=client,
user=simple_user,
scopes='openid',
redirect_uri='https://example.com/',
session_key='xxx',
auth_time=now(),
expired=expired)
OIDCCode.objects.create(
client=client,
user=simple_user,
scopes='openid',
redirect_uri='https://example.com/',
session_key='xxx',
auth_time=now(),
expired=not_expired)
assert OIDCCode.objects.count() == 2
OIDCCode.objects.cleanup()
assert OIDCCode.objects.count() == 1
OIDCAccessToken.objects.create(
client=client,
user=simple_user,
scopes='openid',
session_key='xxx',
expired=expired)
OIDCAccessToken.objects.create(
client=client,
user=simple_user,
scopes='openid',
session_key='xxx',
expired=not_expired)
assert OIDCAccessToken.objects.count() == 2
OIDCAccessToken.objects.cleanup()
assert OIDCAccessToken.objects.count() == 1
@pytest.fixture
def simple_oidc_client(db):
return OIDCClient.objects.create(
name='client',
slug='client',
ou=get_default_ou(),
redirect_uris='https://example.com/')
def test_client_secret_post_authentication(oidc_settings, app, simple_oidc_client, simple_user):
utils.login(app, simple_user)
redirect_uri = simple_oidc_client.redirect_uris.split()[0]
params = {
'client_id': simple_oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
}
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
response = response.form.submit('accept')
location = urlparse.urlparse(response['Location'])
query = urlparse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(token_url, params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': redirect_uri,
'client_id': simple_oidc_client.client_id,
'client_secret': simple_oidc_client.client_secret,
})
assert 'error' not in response.json
assert 'access_token' in response.json
assert 'expires_in' in response.json
assert 'id_token' in response.json
assert response.json['token_type'] == 'Bearer'
@pytest.mark.parametrize('login_first', [(True,), (False,)])
def test_role_control_access(login_first, oidc_settings, oidc_client, simple_user, app):
# authorized_role
role_authorized = get_role_model().objects.create(
name='Goth Kids', slug='goth-kids', ou=get_default_ou())
oidc_client.add_authorized_role(role_authorized)
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
}
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
params['response_type'] = 'code'
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
params['response_type'] = 'token id_token'
authorize_url = make_url('oidc-authorize', params=params)
if login_first:
utils.login(app, simple_user)
# user not authorized
response = app.get(authorize_url)
assert 'https://example.com/southpark/' in response.text
# user authorized
simple_user.roles.add(role_authorized)
simple_user.save()
response = app.get(authorize_url)
if not login_first:
response = response.follow()
response.form.set('username', simple_user.username)
response.form.set('password', simple_user.username)
response = response.form.submit(name='login-password-submit')
response = response.follow()
if oidc_client.authorization_mode != oidc_client.AUTHORIZATION_MODE_NONE:
response.form['do_not_ask_again'] = True
response = response.form.submit('accept')
assert OIDCAuthorization.objects.get()
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
code = OIDCCode.objects.get()
location = urlparse.urlparse(response['Location'])
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
query = urlparse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(token_url, params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
}, headers=client_authentication_headers(oidc_client))
id_token = response.json['id_token']
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
query = urlparse.parse_qs(location.fragment)
id_token = query['id_token'][0]
if oidc_client.idtoken_algo in (oidc_client.ALGO_RSA, oidc_client.ALGO_EC):
keyset = JWKSet.from_json(app.get(reverse('oidc-certs')).content)
for k in keyset.get('keys'):
if {
'RSA': oidc_client.ALGO_RSA,
'EC': oidc_client.ALGO_EC
}.get(k.key_type) == oidc_client.idtoken_algo:
algs=[{
oidc_client.ALGO_RSA: 'RS256',
oidc_client.ALGO_EC: 'ES256'
}.get(oidc_client.idtoken_algo)]
key = k
break
elif oidc_client.idtoken_algo == oidc_client.ALGO_HMAC:
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
key = JWK(kty='oct', k=force_text(k))
algs = ['HS256']
else:
raise NotImplementedError
jwt = JWT(jwt=id_token, key=key, algs=algs)
claims = json.loads(jwt.claims)
if login_first:
assert claims['acr'] == '0'
else:
assert claims['acr'] == '1'
def test_registration_service_slug(oidc_settings, app, simple_oidc_client, simple_user, hooks,
mailoutbox):
redirect_uri = simple_oidc_client.redirect_uris.split()[0]
params = {
'client_id': simple_oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
}
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
location = urlparse.urlparse(response['Location'])
query = urlparse.parse_qs(location.query)
assert query['service'] == ['default client']
response = response.follow().click('Register')
location = urlparse.urlparse(response.request.url)
query = urlparse.parse_qs(location.query)
assert query['service'] == ['default client']
response.form.set('email', 'john.doe@example.com')
response = response.form.submit()
assert len(mailoutbox) == 1
link = utils.get_link_from_mail(mailoutbox[0])
response = app.get(link)
response.form.set('first_name', 'John')
response.form.set('last_name', 'Doe')
response.form.set('password1', 'T0==toto')
response.form.set('password2', 'T0==toto')
response = response.form.submit()
assert hooks.event[0]['kwargs']['name'] == 'sso-request'
assert hooks.event[0]['kwargs']['service'].slug == 'client'
assert hooks.event[1]['kwargs']['name'] == 'registration'
assert hooks.event[1]['kwargs']['service'] == 'client'
assert hooks.event[2]['kwargs']['name'] == 'login'
assert hooks.event[2]['kwargs']['how'] == 'email'
assert hooks.event[2]['kwargs']['service'] == 'client'
def test_oidclient_claims_data_migration(migration):
app = 'authentic2_idp_oidc'
migrate_from = [(app, '0009_auto_20180313_1156')]
migrate_to = [(app, '0010_oidcclaim')]
old_apps = migration.before(migrate_from)
OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
client = OIDCClient(name='test', slug='test', redirect_uris='https://example.net/')
client.save()
new_apps = migration.apply(migrate_to)
OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
client = OIDCClient.objects.first()
assert OIDCClaim.objects.filter(client=client.id).count() == 5
def test_oidclient_preferred_username_as_identifier_data_migration(migration):
app = 'authentic2_idp_oidc'
migrate_from = [(app, '0010_oidcclaim')]
migrate_to = [(app, '0011_auto_20180808_1546')]
old_apps = migration.before(migrate_from)
OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
OIDCClaim = old_apps.get_model('authentic2_idp_oidc', 'OIDCClaim')
client1 = OIDCClient.objects.create(name='test', slug='test', redirect_uris='https://example.net/')
client2 = OIDCClient.objects.create(name='test1', slug='test1', redirect_uris='https://example.net/')
client3 = OIDCClient.objects.create(name='test2', slug='test2', redirect_uris='https://example.net/')
client4 = OIDCClient.objects.create(name='test3', slug='test3', redirect_uris='https://example.net/')
for client in (client1, client2, client3, client4):
if client.name == 'test1':
continue
if client.name == 'test3':
OIDCClaim.objects.create(client=client, name='preferred_username', value='django_user_full_name', scopes='profile')
else:
OIDCClaim.objects.create(client=client, name='preferred_username', value='django_user_username', scopes='profile')
OIDCClaim.objects.create(client=client, name='given_name', value='django_user_first_name', scopes='profile')
OIDCClaim.objects.create(client=client, name='family_name', value='django_user_last_name', scopes='profile')
if client.name == 'test2':
continue
OIDCClaim.objects.create(client=client, name='email', value='django_user_email', scopes='email')
OIDCClaim.objects.create(client=client, name='email_verified', value='django_user_email_verified', scopes='email')
new_apps = migration.apply(migrate_to)
OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
client = OIDCClient.objects.first()
for client in OIDCClient.objects.all():
claims = client.oidcclaim_set.all()
if client.name == 'test':
assert claims.count() == 5
assert sorted(claims.values_list('name', flat=True)) == [u'email', u'email_verified', u'family_name', u'given_name', u'preferred_username']
assert sorted(claims.values_list('value', flat=True)) == [u'django_user_email', u'django_user_email_verified', u'django_user_first_name', u'django_user_identifier', u'django_user_last_name']
elif client.name == 'test2':
assert claims.count() == 3
assert sorted(claims.values_list('name', flat=True)) == [u'family_name', u'given_name', u'preferred_username']
assert sorted(claims.values_list('value', flat=True)) == [u'django_user_first_name', u'django_user_last_name', u'django_user_username']
elif client.name == 'test3':
assert claims.count() == 5
assert sorted(claims.values_list('name', flat=True)) == [u'email', u'email_verified', u'family_name', u'given_name', u'preferred_username']
assert sorted(claims.values_list('value', flat=True)) == [u'django_user_email', u'django_user_email_verified', u'django_user_first_name', u'django_user_full_name', u'django_user_last_name']
else:
assert claims.count() == 0
def test_api_synchronization(app, oidc_client):
oidc_client.has_api_access = True
oidc_client.save()
users = [User.objects.create(username='user-%s' % i) for i in range(10)]
for user in users[5:]:
user.delete()
deleted_subs = set(make_sub(oidc_client, user) for user in users[5:])
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
status = 200
if oidc_client.identifier_policy not in (OIDCClient.POLICY_PAIRWISE_REVERSIBLE, OIDCClient.POLICY_UUID):
status = 401
response = app.post_json('/api/users/synchronization/',
params={
'known_uuids': [make_sub(oidc_client, user) for user in users]},
status=status)
if status == 200:
assert response.json['result'] == 1
assert set(response.json['unknown_uuids']) == deleted_subs
def test_claim_default_value(oidc_settings, normal_oidc_client, simple_user, app):
oidc_settings.A2_IDP_OIDC_SCOPES = ['openid', 'profile', 'email', 'phone']
Attribute.objects.create(
name='phone',
label='phone',
kind='phone_number',
asked_on_registration=False,
required=False,
user_visible=False,
user_editable=False)
OIDCClaim.objects.create(client=normal_oidc_client, name='phone', value='django_user_phone', scopes='phone')
normal_oidc_client.authorization_flow = normal_oidc_client.FLOW_AUTHORIZATION_CODE
normal_oidc_client.authorization_mode = normal_oidc_client.AUTHORIZATION_MODE_NONE
normal_oidc_client.save()
utils.login(app, simple_user)
simple_user.username = None
simple_user.save()
oidc_client = normal_oidc_client
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid email profile phone',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
}
def sso():
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
location = urlparse.urlparse(response['Location'])
query = urlparse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(token_url, params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
}, headers=client_authentication_headers(oidc_client))
access_token = response.json['access_token']
id_token = response.json['id_token']
k=base64.b64encode(oidc_client.client_secret.encode('utf-8'))
key = JWK(kty='oct', k=force_text(k))
jwt = JWT(jwt=id_token, key=key)
claims = json.loads(jwt.claims)
user_info_url = make_url('oidc-user-info')
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
return claims, response.json
claims, user_info = sso()
assert claims['sub'] == make_sub(oidc_client, simple_user)
assert claims['preferred_username'] == ''
assert claims['given_name'] == simple_user.first_name
assert claims['family_name'] == simple_user.last_name
assert claims['email'] == simple_user.email
assert claims['phone'] is None
assert claims['email_verified'] is False
assert user_info['sub'] == make_sub(oidc_client, simple_user)
assert user_info['preferred_username'] == ''
assert user_info['given_name'] == simple_user.first_name
assert user_info['family_name'] == simple_user.last_name
assert user_info['email'] == simple_user.email
assert user_info['phone'] is None
assert user_info['email_verified'] is False
params['scope'] = 'openid email'
claims, user_info = sso()
assert claims['sub'] == make_sub(oidc_client, simple_user)
assert claims['email'] == simple_user.email
assert claims['email_verified'] is False
assert 'phone' not in claims
assert 'preferred_username' not in claims
assert 'given_name' not in claims
assert 'family_name' not in claims
assert user_info['sub'] == make_sub(oidc_client, simple_user)
assert user_info['email'] == simple_user.email
assert user_info['email_verified'] is False
assert 'phone' not in user_info
assert 'preferred_username' not in user_info
assert 'given_name' not in user_info
assert 'family_name' not in user_info
def test_claim_templated(oidc_settings, normal_oidc_client, simple_user, app):
oidc_settings.A2_IDP_OIDC_SCOPES = ['openid', 'profile', 'email']
OIDCClaim.objects.filter(
client=normal_oidc_client, name='given_name').delete()
OIDCClaim.objects.filter(
client=normal_oidc_client, name='family_name').delete()
claim1 = OIDCClaim.objects.create(
client=normal_oidc_client,
name='given_name',
value='{{ django_user_first_name|add:"ounet" }}',
scopes='profile')
claim2 = OIDCClaim.objects.create(
client=normal_oidc_client,
name='family_name',
value='{{ "Von der "|add:django_user_last_name }}',
scopes='profile')
normal_oidc_client.authorization_flow = normal_oidc_client.FLOW_AUTHORIZATION_CODE
normal_oidc_client.authorization_mode = normal_oidc_client.AUTHORIZATION_MODE_NONE
normal_oidc_client.save()
utils.login(app, simple_user)
oidc_client = normal_oidc_client
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid email profile',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
}
def sso():
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
location = urlparse.urlparse(response['Location'])
query = urlparse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(token_url, params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
}, headers=client_authentication_headers(oidc_client))
access_token = response.json['access_token']
id_token = response.json['id_token']
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
key = JWK(kty='oct', k=force_text(k))
jwt = JWT(jwt=id_token, key=key)
claims = json.loads(jwt.claims)
user_info_url = make_url('oidc-user-info')
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
return claims, response.json
claims, user_info = sso()
assert claims['given_name'].endswith('ounet')
assert claims['given_name'].startswith(simple_user.first_name)
assert claims['family_name'].startswith('Von der')
assert claims['family_name'].endswith(simple_user.last_name)
assert user_info['given_name'].endswith('ounet')
assert user_info['given_name'].startswith(simple_user.first_name)
assert user_info['family_name'].startswith('Von der')
assert user_info['family_name'].endswith(simple_user.last_name)
def test_client_validate_redirect_uri():
client = OIDCClient(redirect_uris='''http://example.com
http://example2.com/
http://example3.com/toto
http://*example4.com/
http://example5.com/toto*
http://example6.com/#*
http://example7.com/?*
http://example8.com/?*#*
''')
# ok
for uri in [
'http://example.com',
'http://example.com/',
'http://example2.com',
'http://example2.com/',
'http://example3.com/toto',
'http://example3.com/toto/',
'http://example4.com/',
'http://example4.com',
'http://coin.example4.com',
'http://coin.example4.com/',
'http://example5.com/toto',
'http://example5.com/toto/',
'http://example5.com/toto/tata',
'http://example5.com/toto/tata/',
'http://example6.com/#some-fragment',
'http://example7.com/?foo=bar',
'http://example8.com/?foo=bar#some-fragment']:
client.validate_redirect_uri(uri)
# nok
for uri in [
'http://coin.example.com/',
'http://example.com/toto/',
'http://coin.example.com',
'http://example3.com/',
'http://example3.com',
'http://coinexample4.com',
'http://coinexample4.com/',
'http://example5.com/tototata/',
'http://example5.com/tototata',
'http://example6.com/?foo=bar',
'http://example7.com/#some-fragment']:
with pytest.raises(ValueError, match=r'is not declared'):
client.validate_redirect_uri(uri)
client.validate_redirect_uri('http://example5.com/toto/' + 'a' * 500)
with pytest.raises(ValueError, match=r'redirect_uri length >'):
client.validate_redirect_uri('http://example5.com/toto/' + 'a' * 1024)
def test_filter_api_users(app, oidc_client, admin, simple_user, role_random):
oidc_client.has_api_access = True
oidc_client.save()
if (oidc_client.identifier_policy not in
(oidc_client.POLICY_UUID, oidc_client.POLICY_PAIRWISE_REVERSIBLE)):
return
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
response = app.get('/api/users/')
count = len(response.json['results'])
assert count > 0
AuthorizedRole.objects.create(service=oidc_client, role=role_random)
response = app.get('/api/users/')
assert len(response.json['results']) == 0
role_random.members.add(simple_user)
response = app.get('/api/users/')
assert len(response.json['results']) == 1
AuthorizedRole.objects.all().delete()
response = app.get('/api/users/')
assert len(response.json['results']) == count
def test_credentials_grant(app, oidc_client, admin, simple_user):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.scope = 'openid'
oidc_client.save()
token_url = make_url('oidc-token')
if oidc_client.idtoken_algo == OIDCClient.ALGO_HMAC:
k=base64url(oidc_client.client_secret.encode('utf-8'))
jwk = JWK(kty='oct', k=force_text(k))
elif oidc_client.idtoken_algo == OIDCClient.ALGO_RSA:
jwk = get_first_rsa_sig_key()
elif oidc_client.idtoken_algo == OIDCClient.ALGO_EC:
jwk = get_first_ec_sig_key()
# 1. test in-request client credentials
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
response = app.post(token_url, params=params)
assert 'id_token' in response.json
token = response.json['id_token']
header, payload, signature = token.split('.')
jwt = JWT()
# jwt deserialization implicitly checks the token signature:
jwt.deserialize(token, key=jwk)
claims = json.loads(jwt.claims)
assert set(claims) == set(['acr', 'aud', 'auth_time', 'exp', 'iat', 'iss', 'sub'])
assert all(claims.values())
# 2. test basic authz
params.pop('client_id')
params.pop('client_secret')
response = app.post(token_url, params=params, headers=client_authentication_headers(oidc_client))
assert 'id_token' in response.json
token = response.json['id_token']
header, payload, signature = token.split('.')
jwt = JWT()
# jwt deserialization implicitly checks the token signature:
jwt.deserialize(token, key=jwk)
claims = json.loads(jwt.claims)
assert set(claims) == set(['acr', 'aud', 'auth_time', 'exp', 'iat', 'iss', 'sub'])
assert all(claims.values())
def test_credentials_grant_ratelimitation_invalid_client(
app, oidc_client, admin, simple_user, oidc_settings, freezer):
freezer.move_to('2020-01-01')
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
token_url = make_url('oidc-token')
params = {
'client_id': oidc_client.client_id,
'client_secret': 'notgood',
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
for i in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])):
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_client'
assert 'client authentication failed' in response.json['error_description']
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_request'
assert 'reached rate limitation' in response.json['error_description']
def test_credentials_grant_ratelimitation_valid_client(
app, oidc_client, admin, simple_user, oidc_settings, freezer):
freezer.move_to('2020-01-01')
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
token_url = make_url('oidc-token')
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
for i in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])):
app.post(token_url, params=params)
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_request'
assert 'reached rate limitation' in response.json['error_description']
def test_credentials_grant_retrytimout(
app, oidc_client, admin, simple_user, settings, freezer):
freezer.move_to('2020-01-01')
settings.A2_LOGIN_EXPONENTIAL_RETRY_TIMEOUT_DURATION = 2
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
token_url = make_url('oidc-token')
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': u'SurelyNotTheRightPassword',
}
attempts = 0
while attempts < 100:
response = app.post(token_url, params=params, status=400)
attempts += 1
if attempts >= 10:
assert response.json['error'] == 'invalid_request'
assert 'too many attempts with erroneous RO password' in response.json['error_description']
# freeze some time after backoff delay expiration
freezer.move_to(datetime.timedelta(days=2))
# obtain a successful login
params['password'] = simple_user.username
response = app.post(token_url, params=params, status=200)
assert 'id_token' in response.json
def test_credentials_grant_invalid_flow(
app, oidc_client, admin, simple_user, settings):
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': u'SurelyNotTheRightPassword',
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'unauthorized_client'
assert 'is not configured' in response.json['error_description']
def test_credentials_grant_invalid_client(
app, oidc_client, admin, simple_user, settings):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': 'tryingthis', # Nope, wrong secret
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_client'
assert response.json['error_description'] == 'client authentication failed'
def test_credentials_grant_unauthz_client(
app, oidc_client, admin, simple_user, settings):
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'unauthorized_client'
assert 'client is not configured for resource owner'in response.json['error_description']
def test_credentials_grant_invalid_content_type(
app, oidc_client, admin, simple_user, settings):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
token_url = make_url('oidc-token')
response = app.post(
token_url, params=params,
content_type='multipart/form-data',
status=400)
assert response.json['error'] == 'invalid_request'
assert 'wrong content type' in response.json['error_description']
def test_credentials_grant_ou_selection_simple(
app, oidc_client, admin, user_ou1, user_ou2, ou1, ou2, settings):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'ou_slug': ou1.slug,
'username': user_ou1.username,
'password': user_ou1.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params)
params['username'] = user_ou2.username
params['password'] = user_ou2.password
response = app.post(token_url, params=params, status=400)
def test_credentials_grant_ou_selection_username_not_unique(
app, oidc_client, admin, user_ou1, admin_ou2, ou1, ou2, settings):
settings.A2_USERNAME_IS_UNIQUE = False
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
admin_ou2.username = user_ou1.username
admin_ou2.set_password(user_ou1.username)
admin_ou2.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'ou_slug': ou1.slug,
'username': user_ou1.username,
'password': user_ou1.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params)
assert OIDCAccessToken.objects.get(
uuid=response.json['access_token']).user == user_ou1
params['ou_slug'] = ou2.slug
response = app.post(token_url, params=params)
assert OIDCAccessToken.objects.get(
uuid=response.json['access_token']).user == admin_ou2
def test_credentials_grant_ou_selection_username_not_unique_wrong_ou(
app, oidc_client, admin, user_ou1, admin_ou2, ou1, ou2, settings):
settings.A2_USERNAME_IS_UNIQUE = False
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'ou_slug': ou2.slug,
'username': user_ou1.username,
'password': user_ou1.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
params['ou_slug'] = ou1.slug
params['username'] = admin_ou2.username
params['password'] = admin_ou2.password
response = app.post(token_url, params=params, status=400)
def test_credentials_grant_ou_selection_invalid_ou(
app, oidc_client, admin, user_ou1, settings):
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'ou_slug': 'invalidslug',
'username': user_ou1.username,
'password': user_ou1.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
def test_oidc_client_clean():
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/',
identifier_policy=OIDCClient.POLICY_UUID).clean()
with pytest.raises(ValidationError, match=r'same domain'):
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/',
identifier_policy=OIDCClient.POLICY_PAIRWISE_REVERSIBLE).clean()
with pytest.raises(ValidationError, match=r'same domain'):
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/',
identifier_policy=OIDCClient.POLICY_PAIRWISE).clean()
with pytest.raises(ValidationError, match=r'same domain'):
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/',
identifier_policy=OIDCClient.POLICY_PAIRWISE).clean()
with pytest.raises(ValidationError, match=r'within an OU'):
OIDCClient(
authorization_mode=OIDCClient.AUTHORIZATION_MODE_BY_OU,
ou=None).clean()
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/',
sector_identifier_uri='https://example.com/').clean()
def test_oidc_authorized_oauth_services_view(app, oidc_client, simple_user):
from django.contrib.contenttypes.models import ContentType
url = make_url('authorized-oauth-services')
response = app.get(url, status=302)
assert '/login/' in response.location
utils.login(app, simple_user)
response = app.get(url, status=200)
assert "You have not given any authorization to access your account profile data." in response.text
# create an ou authz
OU = get_ou_model()
ou1 = OU.objects.create(name='Orgunit1', slug='orgunit1')
OIDCAuthorization.objects.create(
client=ou1, user=simple_user, scopes='openid profile email',
expired=now() + datetime.timedelta(days=2))
# create service authzs
OIDCAuthorization.objects.create(
client=oidc_client, user=simple_user, scopes='openid',
expired=now() + datetime.timedelta(days=2))
OIDCAuthorization.objects.create(
client=oidc_client, user=simple_user, scopes='openid profile',
expired=now() + datetime.timedelta(days=2))
OIDCAuthorization.objects.create(
client=oidc_client, user=simple_user, scopes='openid profile email',
expired=now() + datetime.timedelta(days=2))
response = app.get(url, status=200)
assert "You have given authorizations to access your account profile data." in response.text
assert len(response.html.find_all(
'button', {'class': 'authorized-oauth-services--revoke-button'})) == 4
# revoke two service authz
response = response.forms[1].submit()
response = response.follow()
assert len(response.html.find_all(
'button', {'class': 'authorized-oauth-services--revoke-button'})) == 3
assert OIDCAuthorization.objects.filter(
client_ct=ContentType.objects.get_for_model(OIDCClient)).count() == 2
response = response.forms[1].submit()
response = response.follow()
assert len(response.html.find_all(
'button', {'class': 'authorized-oauth-services--revoke-button'})) == 2
assert OIDCAuthorization.objects.filter(
client_ct=ContentType.objects.get_for_model(OIDCClient)).count() == 1
# revoke the only OU authz
response = response.forms[0].submit()
response = response.follow()
assert len(response.html.find_all(
'button', {'class': 'authorized-oauth-services--revoke-button'})) == 1
assert OIDCAuthorization.objects.filter(
client_ct=ContentType.objects.get_for_model(OU)).count() == 0