authentic/tests/idp_oidc/test_misc.py

1896 lines
69 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import functools
import json
import urllib.parse
from unittest import mock
import pytest
from django.contrib.auth import get_user_model
from django.core.exceptions import ValidationError
from django.core.files import File
from django.core.files.uploadedfile import SimpleUploadedFile
from django.http import QueryDict
from django.test.utils import override_settings
from django.urls import reverse
from django.utils.dateparse import parse_datetime
from django.utils.encoding import force_str
from django.utils.timezone import now
from jwcrypto.jwk import JWK, JWKSet
from jwcrypto.jwt import JWT
from authentic2.a2_rbac.models import OrganizationalUnit, Role
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.models import Attribute, AuthorizedRole
from authentic2.utils.misc import good_next_url, make_url
from authentic2_auth_oidc.utils import parse_timestamp
from authentic2_idp_oidc.models import OIDCAccessToken, OIDCAuthorization, OIDCClaim, OIDCClient, OIDCCode
from authentic2_idp_oidc.utils import (
base64url,
get_first_ec_sig_key,
get_first_rsa_sig_key,
get_session_id,
make_sub,
)
from .. import utils
from .conftest import bearer_authentication_headers, client_authentication_headers
User = get_user_model()
pytestmark = pytest.mark.django_db
def test_get_jwkset(oidc_settings):
from authentic2_idp_oidc.utils import get_jwkset
get_jwkset()
OIDC_CLIENT_PARAMS = [
{
'authorization_flow': OIDCClient.FLOW_IMPLICIT,
},
{
'post_logout_redirect_uris': 'https://example.com/',
},
{
'identifier_policy': OIDCClient.POLICY_UUID,
'post_logout_redirect_uris': 'https://example.com/',
},
{
'identifier_policy': OIDCClient.POLICY_EMAIL,
},
{
'idtoken_algo': OIDCClient.ALGO_HMAC,
},
{
'idtoken_algo': OIDCClient.ALGO_EC,
},
{
'authorization_mode': OIDCClient.AUTHORIZATION_MODE_NONE,
},
{
'idtoken_duration': datetime.timedelta(hours=1),
},
{
'authorization_flow': OIDCClient.FLOW_IMPLICIT,
'idtoken_duration': datetime.timedelta(hours=1),
'post_logout_redirect_uris': 'https://example.com/',
'home_url': 'https://example.com/',
},
{
'frontchannel_logout_uri': 'https://example.com/southpark/logout/',
},
{
'frontchannel_logout_uri': 'https://example.com/southpark/logout/',
'frontchannel_timeout': 3000,
'colour': '#ff00ff',
},
{
'identifier_policy': OIDCClient.POLICY_PAIRWISE_REVERSIBLE,
},
]
def test_login_from_client_with_home_url(oidc_client, app, simple_user):
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'login_hint': 'backoffice john@example.com',
'response_type': 'code',
}
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url).follow()
assert not response.pyquery.find('.service-message--link')
assert response.pyquery.find('.service-message--text')
ou = oidc_client.ou
ou.home_url = 'https://ou.example.net'
ou.colour = '#8c00ec'
with open('tests/200x200.jpg', 'rb') as fd:
ou.logo = SimpleUploadedFile(name='200x200.jpg', content=fd.read())
ou.save()
response = app.get(authorize_url).follow()
assert response.pyquery.find('.service-message')
assert not response.pyquery.find('.service-message--link')
assert 'color: #8c00ec' in response.text
assert (
response.pyquery.find('img.service-message--logo')[0].attrib['src']
== '/media/services/logos/200x200.jpg'
)
oidc_client.home_url = 'https://service.example.net'
oidc_client.colour = '#ec008c'
with open('tests/201x201.jpg', 'rb') as fd:
oidc_client.logo = SimpleUploadedFile(name='201x201.jpg', content=fd.read())
oidc_client.save()
response = app.get(authorize_url).follow()
link = response.pyquery.find('a.service-message--link')[0]
assert link.attrib['href'] == 'https://service.example.net'
assert 'color: #ec008c' in response.text
assert (
response.pyquery.find('img.service-message--logo')[0].attrib['src']
== '/media/services/logos/201x201.jpg'
)
# check registration page
response = response.click('Register!')
assert link.attrib['href'] == 'https://service.example.net'
assert (
response.pyquery.find('img.service-message--logo')[0].attrib['src']
== '/media/services/logos/201x201.jpg'
)
# check authorization page
response = utils.login(app, simple_user)
response = app.get(authorize_url)
assert response.pyquery.find('.service-message')
assert response.pyquery.find('a.service-message--link')
assert (
response.pyquery.find('img.service-message--logo')[0].attrib['src']
== '/media/services/logos/201x201.jpg'
)
link = response.pyquery.find('a.service-message--link')[0]
assert link.attrib['href'] == 'https://service.example.net'
@pytest.mark.parametrize('oidc_client', OIDC_CLIENT_PARAMS, indirect=True)
@pytest.mark.parametrize('do_not_ask_again', [(True,), (False,)])
@pytest.mark.parametrize('login_first', [(True,), (False,)])
def test_authorization_code_sso(
login_first, do_not_ask_again, oidc_client, oidc_settings, simple_user, app, caplog, rf
):
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'login_hint': 'backoffice john@example.com',
}
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
params['response_type'] = 'code'
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
params['response_type'] = 'token id_token'
authorize_url = make_url('oidc-authorize', params=params)
if login_first:
utils.login(app, simple_user)
response = app.get(authorize_url)
if not login_first:
assert set(app.session['login-hint']) == {'backoffice', 'john@example.com'}
response = response.follow()
assert response.request.path == reverse('auth_login')
response.form.set('username', simple_user.username)
response.form.set('password', simple_user.username)
response = response.form.submit(name='login-password-submit')
response = response.follow()
assert response.request.path == reverse('oidc-authorize')
if oidc_client.authorization_mode != OIDCClient.AUTHORIZATION_MODE_NONE:
response = response.maybe_follow()
assert 'a2-oidc-authorization-form' in response.text
assert OIDCAuthorization.objects.count() == 0
assert OIDCCode.objects.count() == 0
assert OIDCAccessToken.objects.count() == 0
response.form['do_not_ask_again'] = do_not_ask_again
response = response.form.submit('accept')
if do_not_ask_again:
assert OIDCAuthorization.objects.count() == 1
authz = OIDCAuthorization.objects.get()
assert authz.client == oidc_client
assert authz.user == simple_user
assert authz.scope_set() == set('openid profile email'.split())
assert authz.expired >= now()
else:
assert OIDCAuthorization.objects.count() == 0
utils.assert_event(
'user.service.sso.authorization',
session=app.session,
user=simple_user,
service=oidc_client,
scopes=['email', 'openid', 'profile'],
)
utils.assert_event(
'user.service.sso',
session=app.session,
user=simple_user,
service=oidc_client,
how='password-on-https',
)
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert OIDCCode.objects.count() == 1
code = OIDCCode.objects.get()
assert code.client == oidc_client
assert code.user == simple_user
assert code.scope_set() == set('openid profile email'.split())
assert code.state == 'xxx'
assert code.nonce == 'yyy'
assert code.redirect_uri == redirect_uri
assert code.session_key == app.session.session_key
assert code.auth_time <= now()
assert code.expired >= now()
assert response['Location'].startswith(redirect_uri)
location = urllib.parse.urlparse(response['Location'])
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
query = urllib.parse.parse_qs(location.query)
assert set(query.keys()) == {'code', 'state'}
assert query['code'] == [code.uuid]
code = query['code'][0]
assert query['state'] == ['xxx']
token_url = make_url('oidc-token')
response = app.post(
token_url,
params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
},
headers=client_authentication_headers(oidc_client),
)
assert 'error' not in response.json
assert 'access_token' in response.json
assert 'expires_in' in response.json
assert 'id_token' in response.json
assert response.json['token_type'] == 'Bearer'
access_token = response.json['access_token']
id_token = response.json['id_token']
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
assert location.fragment
query = urllib.parse.parse_qs(location.fragment)
assert OIDCAccessToken.objects.count() == 1
access_token = OIDCAccessToken.objects.get()
assert set(query.keys()) == {'access_token', 'token_type', 'expires_in', 'id_token', 'state'}
assert query['access_token'] == [access_token.uuid]
assert query['token_type'] == ['Bearer']
assert query['state'] == ['xxx']
access_token = query['access_token'][0]
id_token = query['id_token'][0]
if oidc_client.idtoken_algo in (oidc_client.ALGO_RSA, oidc_client.ALGO_EC):
key = JWKSet.from_json(app.get(reverse('oidc-certs')).content)
algs = ['RS256', 'ES256']
elif oidc_client.idtoken_algo == oidc_client.ALGO_HMAC:
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
key = JWK(kty='oct', k=force_str(k))
algs = ['HS256']
else:
raise NotImplementedError
jwt = JWT(jwt=id_token, key=key, algs=algs)
claims = json.loads(jwt.claims)
assert set(claims) >= {'iss', 'sub', 'aud', 'exp', 'iat', 'nonce', 'auth_time', 'acr'}
assert claims['nonce'] == 'yyy'
assert response.request.url.startswith(claims['iss'])
assert claims['aud'] == oidc_client.client_id
assert parse_timestamp(claims['iat']) <= now()
assert parse_timestamp(claims['auth_time']) <= now()
exp_delta = (parse_timestamp(claims['exp']) - now()).total_seconds()
assert exp_delta > 0
if oidc_client.idtoken_duration:
assert abs(exp_delta - oidc_client.idtoken_duration.total_seconds()) < 2
else:
assert abs(exp_delta - 30) < 2
if login_first:
assert claims['acr'] == '0'
else:
assert claims['acr'] == '1'
assert claims['sub'] == make_sub(oidc_client, simple_user)
assert claims['preferred_username'] == simple_user.username
assert claims['given_name'] == simple_user.first_name
assert claims['family_name'] == simple_user.last_name
assert claims['email'] == simple_user.email
assert claims['email_verified'] is False
user_info_url = make_url('oidc-user-info')
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
assert response.json['sub'] == make_sub(oidc_client, simple_user)
assert response.json['preferred_username'] == simple_user.username
assert response.json['given_name'] == simple_user.first_name
assert response.json['family_name'] == simple_user.last_name
assert response.json['email'] == simple_user.email
assert response.json['email_verified'] is False
# when adding extra attributes
OIDCClaim.objects.create(client=oidc_client, name='ou', value='django_user_ou_name', scopes='profile')
OIDCClaim.objects.create(client=oidc_client, name='roles', value='a2_role_names', scopes='profile, role')
OIDCClaim.objects.create(
client=oidc_client, name='cityscape_image', value='django_user_cityscape_image', scopes='profile'
)
OIDCClaim.objects.create(
client=oidc_client, name='date_joined', value='django_user_date_joined', scopes='profile'
)
simple_user.roles.add(Role.objects.create(name='Whatever', slug='whatever', ou=get_default_ou()))
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
assert response.json['ou'] == simple_user.ou.name
assert response.json['roles'][0] == 'Whatever'
assert parse_datetime(response.json['date_joined'])
assert response.json.get('cityscape_image') is None
with open('tests/200x200.jpg', 'rb') as fd:
simple_user.attributes.cityscape_image = File(fd)
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
assert response.json['cityscape_image'].startswith('http://testserver/media/profile-image/')
# check against a user without username
simple_user.username = None
simple_user.save()
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
assert response.json['preferred_username'] == ''
# Now logout
if oidc_client.post_logout_redirect_uris:
params = {
'post_logout_redirect_uri': oidc_client.post_logout_redirect_uris,
'state': 'xyz',
}
logout_url = make_url('oidc-logout', params=params)
response = app.get(logout_url)
assert response.status_code == 302
assert response.location == 'https://example.com/?state=xyz'
assert '_auth_user_id' not in app.session
else:
response = app.get(make_url('account_management'))
response = response.click('Logout')
if oidc_client.frontchannel_logout_uri:
iframes = response.pyquery('iframe[src^="https://example.com/southpark/logout/"]')
assert iframes
src = iframes.attr('src')
assert '?' in src
src_qd = QueryDict(src.split('?', 1)[1])
assert 'iss' in src_qd and src_qd['iss'] == 'http://testserver/'
assert 'sid' in src_qd and src_qd['sid'] == get_session_id(
mock.Mock(session=app.session), oidc_client
)
if oidc_client.frontchannel_timeout:
assert iframes.attr('onload').endswith(', %d)' % oidc_client.frontchannel_timeout)
else:
assert iframes.attr('onload').endswith(', 300)')
def check_authorize_error(
response, error, error_description, fragment, caplog, check_next=True, redirect_uri=None, message=True
):
# check next_url qs
if message:
location = urllib.parse.urlparse(response.location)
assert location.path == '/continue/'
if check_next:
location_qs = QueryDict(location.query or '')
assert 'next' in location_qs
assert location_qs['next'].startswith(redirect_uri)
next_url = urllib.parse.urlparse(location_qs['next'])
next_url_qs = QueryDict(next_url.fragment if fragment else next_url.query)
assert next_url_qs['error'] == error
assert next_url_qs['error_description'] == error_description
# check continue page
continue_response = response.follow()
assert error_description in continue_response.pyquery('.error').text()
elif check_next:
assert response.location.startswith(redirect_uri)
location = urllib.parse.urlparse(response.location)
location_qs = QueryDict(location.fragment if fragment else location.query)
assert location_qs['error'] == error
assert location_qs['error_description'] == error_description
# check logs
last_record = caplog.records[-1]
if message:
assert last_record.levelname == 'WARNING'
else:
assert last_record.levelname == 'INFO'
assert 'error "%s" in authorize endpoint' % error in last_record.message
assert error_description in last_record.message
if message:
return continue_response
def assert_authorization_response(response, fragment=False, **kwargs):
location = urllib.parse.urlparse(response.location)
location_qs = QueryDict(location.fragment if fragment else location.query)
assert set(location_qs) == set(kwargs)
for key, value in kwargs.items():
if value is None:
assert key in location_qs
elif isinstance(value, list):
assert set(location_qs.getlist(key)) == set(value)
else:
assert value in location_qs[key]
@pytest.mark.parametrize('oidc_client', OIDC_CLIENT_PARAMS, indirect=True)
def test_invalid_request(oidc_client, caplog, oidc_settings, simple_user, app, make_client, app_factory):
redirect_uri = oidc_client.redirect_uris.split()[0]
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
fragment = False
response_type = 'code'
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
fragment = True
response_type = 'id_token token'
else:
raise NotImplementedError
assert_authorize_error = functools.partial(
check_authorize_error, caplog=caplog, fragment=fragment, redirect_uri=redirect_uri
)
# missing client_id
response = app.get(make_url('oidc-authorize', params={}))
assert_authorize_error(response, 'invalid_request', 'Missing parameter "client_id"', check_next=False)
# missing redirect_uri
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
},
)
)
assert_authorize_error(response, 'invalid_request', 'Missing parameter "redirect_uri"', check_next=False)
# invalid client_id
authorize_url = app.get(
make_url(
'oidc-authorize',
params={
'client_id': 'xxx',
'redirect_uri': redirect_uri,
},
)
)
assert_authorize_error(response, 'invalid_request', 'Unknown client identifier: "xxx"', check_next=False)
# invalid redirect_uri
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': 'xxx',
'response_type': 'code',
'scope': 'openid',
},
),
status=302,
)
continue_response = assert_authorize_error(
response, 'invalid_request', 'Redirect URI "xxx" is unknown.', check_next=False
)
assert 'Known' not in continue_response.pyquery('.error').text()
# invalid redirect_uri with DEBUG=True, list of redirect_uris is shown
with override_settings(DEBUG=True):
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': 'xxx',
'response_type': 'code',
'scope': 'openid',
},
),
status=302,
)
continue_response = assert_authorize_error(
response, 'invalid_request', 'Redirect URI "xxx" is unknown.', check_next=False
)
assert (
'Known redirect URIs are: https://example.com/callbac%C3%A9'
in continue_response.pyquery('.error').text()
)
# missing response_type
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
},
)
)
assert_authorize_error(response, 'invalid_request', 'Missing parameter "response_type"')
# unsupported response_type
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': 'xxx',
},
)
)
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert_authorize_error(response, 'unsupported_response_type', 'Response type must be "code"')
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
assert_authorize_error(
response, 'unsupported_response_type', 'Response type must be "id_token token" or "id_token"'
)
# missing scope
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
},
)
)
assert_authorize_error(response, 'invalid_request', 'Missing parameter "scope"')
# invalid max_age : not an integer
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid',
'max_age': 'xxx',
},
)
)
assert_authorize_error(response, 'invalid_request', 'Parameter "max_age" must be a positive integer')
# invalid max_age : not positive
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid',
'max_age': '-1',
},
)
)
assert_authorize_error(response, 'invalid_request', 'Parameter "max_age" must be a positive integer')
# openid scope is missing
authorize_url = make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'profile',
},
)
response = app.get(authorize_url)
assert_authorize_error(response, 'invalid_scope', 'Scope must contain "openid", received "profile"')
# use of an unknown scope
authorize_url = make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile zob',
},
)
response = app.get(authorize_url)
assert_authorize_error(
response,
'invalid_scope',
'Scope may contain "email, openid, profile" scope(s), received "email, openid, profile, zob"',
)
# restriction on scopes
with override_settings(A2_IDP_OIDC_SCOPES=['openid']):
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email',
},
)
)
assert_authorize_error(
response, 'invalid_scope', 'Scope may contain "openid" scope(s), received "email, openid"'
)
# cancel
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'cancel': '1',
},
)
)
assert_authorize_error(response, 'access_denied', 'Authentication cancelled by user', message=False)
# prompt=none
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'none',
},
)
)
assert_authorize_error(
response,
'login_required',
error_description='Login is required but prompt parameter is "none"',
message=False,
)
utils.login(app, simple_user)
# prompt=none max_age=0
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'max_age': '0',
'prompt': 'none',
},
)
)
assert_authorize_error(
response,
'login_required',
error_description='Login is required because of max_age, but prompt parameter is "none"',
message=False,
)
# max_age=0
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'max_age': '0',
},
)
)
assert response.location.startswith(reverse('auth_login') + '?')
# prompt=login
authorize_url = make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'login',
},
)
response = app.get(authorize_url)
assert urllib.parse.urlparse(response['Location']).path == reverse('auth_login')
if oidc_client.authorization_mode != oidc_client.AUTHORIZATION_MODE_NONE:
# prompt is none, but consent is required
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'none',
},
)
)
assert_authorize_error(
response, 'consent_required', 'Consent is required but prompt parameter is "none"', message=False
)
# user do not consent
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
},
)
)
response = response.form.submit('refuse')
utils.assert_event(
'user.service.sso.refusal',
session=app.session,
user=simple_user,
service=oidc_client,
scopes=['email', 'openid', 'profile'],
)
assert_authorize_error(response, 'access_denied', 'User did not consent', message=False)
# authorization exists
authorize = OIDCAuthorization.objects.create(
client=oidc_client,
user=simple_user,
scopes='openid profile email',
expired=now() + datetime.timedelta(days=2),
)
response = app.get(
make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
},
)
)
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
assert_authorization_response(response, code=None)
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
assert_authorization_response(
response, access_token=None, id_token=None, expires_in=None, token_type=None, fragment=True
)
# client ask for explicit authorization
authorize_url = make_url(
'oidc-authorize',
params={
'client_id': oidc_client.client_id,
'redirect_uri': redirect_uri,
'response_type': response_type,
'scope': 'openid email profile',
'prompt': 'consent',
},
)
response = app.get(authorize_url)
assert 'a2-oidc-authorization-form' in response.text
# check all authorization have been deleted, it's our policy
assert OIDCAuthorization.objects.count() == 0
if oidc_client.authorization_mode == oidc_client.AUTHORIZATION_MODE_NONE:
# authorization mode is none, but explicit consent is asked, we validate it
response = response.form.submit('accept')
# authorization has expired
OIDCCode.objects.all().delete()
authorize.expired = now() - datetime.timedelta(days=2)
authorize.save()
response = app.get(authorize_url)
assert 'a2-oidc-authorization-form' in response.text
authorize.expired = now() + datetime.timedelta(days=2)
authorize.scopes = 'openid profile'
authorize.save()
assert OIDCAuthorization.objects.count() == 1
response.form['do_not_ask_again'] = True
response = response.form.submit('accept')
assert OIDCAuthorization.objects.count() == 1
# old authorizations have been deleted
assert OIDCAuthorization.objects.get().pk != authorize.pk
# check expired codes
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
rp_app = app_factory()
assert OIDCCode.objects.count() == 1
oidc_code = OIDCCode.objects.get()
assert oidc_code.is_valid()
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
assert set(query.keys()) == {'code'}
assert query['code'] == [oidc_code.uuid]
code = query['code'][0]
token_url = make_url('oidc-token')
# missing code parameter
params = {
'grant_type': 'authorization_code',
'redirect_uri': oidc_client.redirect_uris.split()[0],
}
response = rp_app.post(
token_url, params=params, headers=client_authentication_headers(oidc_client), status=400
)
assert response.json['error'] == 'invalid_request'
assert response.json['error_description'] == 'Missing parameter "code"'
params['code'] = code
# wrong redirect_uri
response = rp_app.post(
token_url,
params=dict(params, redirect_uri='https://example.com/'),
headers=client_authentication_headers(oidc_client),
status=400,
)
assert 'error' in response.json
assert response.json['error'] == 'invalid_grant', response.json
assert response.json['error_description'] == 'Redirect_uri does not match the code.'
assert response.json['client_id'] == '1234'
# unknown code
response = rp_app.post(
token_url,
params=dict(params, code='xyz'),
headers=client_authentication_headers(oidc_client),
status=400,
)
assert 'error' in response.json
assert response.json['error'] == 'invalid_grant'
assert response.json['error_description'] == 'Code is unknown or has expired.'
assert response.json['client_id'] == '1234'
# code from another client
other_client = make_client(rp_app, params={'slug': 'other', 'name': 'other', 'client_id': 'abcd'})
other_oidc_code = OIDCCode.objects.create(
client=other_client,
user=oidc_code.user,
profile=None,
scopes='',
state='1234',
nonce='1234',
expired=now() + datetime.timedelta(hours=1),
redirect_uri=oidc_code.redirect_uri,
auth_time=now(),
session_key=oidc_code.session_key,
)
response = rp_app.post(
token_url,
params=dict(params, code=other_oidc_code.uuid),
headers=client_authentication_headers(oidc_client),
status=400,
)
assert 'error' in response.json
assert response.json['error'] == 'invalid_grant'
assert response.json['error_description'] == 'Code was issued to a different client.', response.json
assert response.json['client_id'] == '1234'
other_oidc_code.delete()
other_client.delete()
# simulate expired session
from django.contrib.sessions.models import Session
session = Session.objects.get(session_key=oidc_code.session_key)
Session.objects.filter(pk=session.pk).delete()
response = rp_app.post(
token_url, params=params, headers=client_authentication_headers(oidc_client), status=400
)
assert 'error' in response.json
assert response.json['error'] == 'invalid_grant'
assert response.json['error_description'] == 'User is disconnected or session was lost.'
assert response.json['client_id'] == '1234'
session.save()
# make code expire
oidc_code.expired = now() - datetime.timedelta(seconds=120)
assert not oidc_code.is_valid()
oidc_code.save()
# expired code
response = rp_app.post(
token_url, params=params, headers=client_authentication_headers(oidc_client), status=400
)
assert 'error' in response.json
assert response.json['error'] == 'invalid_grant'
assert response.json['error_description'] == 'Code is unknown or has expired.'
assert response.json['client_id'] == '1234'
# invalid logout
logout_url = make_url(
'oidc-logout',
params={
'post_logout_redirect_uri': 'https://whatever.com/',
},
)
response = app.get(logout_url)
assert '_auth_user_id' in app.session
assert 'Location' in response.headers
# check code expiration after logout
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
code = OIDCCode.objects.get()
code.expired = now() + datetime.timedelta(seconds=120)
code.save()
assert code.is_valid()
utils.logout(app)
code = OIDCCode.objects.get()
assert not code.is_valid()
response = app.post(
token_url,
params={
'grant_type': 'authorization_code',
'code': code.uuid,
'redirect_uri': oidc_client.redirect_uris.split()[0],
},
headers=client_authentication_headers(oidc_client),
status=400,
)
assert 'error' in response.json
assert response.json['error'] == 'invalid_grant'
def test_client_secret_post_authentication(oidc_settings, app, simple_oidc_client, simple_user):
utils.login(app, simple_user)
redirect_uri = simple_oidc_client.redirect_uris.split()[0]
params = {
'client_id': simple_oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
}
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
response = response.form.submit('accept')
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(
token_url,
params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': redirect_uri,
'client_id': simple_oidc_client.client_id,
'client_secret': simple_oidc_client.client_secret,
},
)
assert 'error' not in response.json
assert 'access_token' in response.json
assert 'expires_in' in response.json
assert 'id_token' in response.json
assert response.json['token_type'] == 'Bearer'
@pytest.mark.parametrize('login_first', [(True,), (False,)])
def test_role_control_access(login_first, oidc_settings, oidc_client, simple_user, app):
# authorized_role
role_authorized = Role.objects.create(name='Goth Kids', slug='goth-kids', ou=get_default_ou())
oidc_client.add_authorized_role(role_authorized)
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
}
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
params['response_type'] = 'code'
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
params['response_type'] = 'token id_token'
authorize_url = make_url('oidc-authorize', params=params)
if login_first:
utils.login(app, simple_user)
# user not authorized
response = app.get(authorize_url)
assert 'https://example.com/southpark/' in response.text
# user authorized
simple_user.roles.add(role_authorized)
simple_user.save()
response = app.get(authorize_url)
if not login_first:
response = response.follow()
response.form.set('username', simple_user.username)
response.form.set('password', simple_user.username)
response = response.form.submit(name='login-password-submit')
response = response.follow()
if oidc_client.authorization_mode != oidc_client.AUTHORIZATION_MODE_NONE:
response.form['do_not_ask_again'] = True
response = response.form.submit('accept')
assert OIDCAuthorization.objects.get()
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
code = OIDCCode.objects.get()
location = urllib.parse.urlparse(response['Location'])
if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE:
query = urllib.parse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(
token_url,
params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
},
headers=client_authentication_headers(oidc_client),
)
id_token = response.json['id_token']
elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT:
query = urllib.parse.parse_qs(location.fragment)
id_token = query['id_token'][0]
if oidc_client.idtoken_algo in (oidc_client.ALGO_RSA, oidc_client.ALGO_EC):
key = JWKSet.from_json(app.get(reverse('oidc-certs')).content)
algs = ['RS256', 'ES256']
elif oidc_client.idtoken_algo == oidc_client.ALGO_HMAC:
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
key = JWK(kty='oct', k=force_str(k))
algs = ['HS256']
else:
raise NotImplementedError
jwt = JWT(jwt=id_token, key=key, algs=algs)
claims = json.loads(jwt.claims)
if login_first:
assert claims['acr'] == '0'
else:
assert claims['acr'] == '1'
def test_registration_service_slug(oidc_settings, app, simple_oidc_client, simple_user, hooks, mailoutbox):
redirect_uri = simple_oidc_client.redirect_uris.split()[0]
simple_oidc_client.ou.home_url = 'https://portal/'
simple_oidc_client.ou.save()
params = {
'client_id': simple_oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
}
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
response = response.follow().click('Register')
response.form.set('email', 'john.doe@example.com')
response = response.form.submit()
assert len(mailoutbox) == 1
link = utils.get_link_from_mail(mailoutbox[0])
response = app.get(link)
body = response.pyquery('body')[0]
assert body.attrib['data-home-ou-slug'] == 'default'
assert body.attrib['data-home-ou-name'] == 'Default organizational unit'
assert body.attrib['data-home-service-slug'] == 'client'
assert body.attrib['data-home-service-name'] == 'client'
assert body.attrib['data-home-url'] == 'https://portal/'
response.form.set('first_name', 'John')
response.form.set('last_name', 'Doe')
response.form.set('password1', 'T0==toto')
response.form.set('password2', 'T0==toto')
response = response.form.submit()
assert hooks.event[0]['kwargs']['name'] == 'sso-request'
assert hooks.event[0]['kwargs']['service'].slug == 'client'
assert hooks.event[1]['kwargs']['name'] == 'registration'
assert hooks.event[1]['kwargs']['service'].slug == 'client'
assert hooks.event[2]['kwargs']['name'] == 'login'
assert hooks.event[2]['kwargs']['how'] == 'email'
assert hooks.event[2]['kwargs']['service'].slug == 'client'
def test_claim_default_value(oidc_settings, normal_oidc_client, simple_user, app):
oidc_settings.A2_IDP_OIDC_SCOPES = ['openid', 'profile', 'email', 'phone']
Attribute.objects.create(
name='phone',
label='phone',
kind='phone_number',
asked_on_registration=False,
required=False,
user_visible=False,
user_editable=False,
)
OIDCClaim.objects.create(
client=normal_oidc_client, name='phone', value='django_user_phone', scopes='phone'
)
normal_oidc_client.authorization_flow = normal_oidc_client.FLOW_AUTHORIZATION_CODE
normal_oidc_client.authorization_mode = normal_oidc_client.AUTHORIZATION_MODE_NONE
normal_oidc_client.save()
utils.login(app, simple_user)
simple_user.username = None
simple_user.save()
oidc_client = normal_oidc_client
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid email profile phone',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
}
def sso():
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(
token_url,
params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
},
headers=client_authentication_headers(oidc_client),
)
access_token = response.json['access_token']
id_token = response.json['id_token']
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
key = JWK(kty='oct', k=force_str(k))
jwt = JWT(jwt=id_token, key=key)
claims = json.loads(jwt.claims)
user_info_url = make_url('oidc-user-info')
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
return claims, response.json
claims, user_info = sso()
assert claims['sub'] == make_sub(oidc_client, simple_user)
assert claims['preferred_username'] == ''
assert claims['given_name'] == simple_user.first_name
assert claims['family_name'] == simple_user.last_name
assert claims['email'] == simple_user.email
assert claims['phone'] == simple_user.phone
assert claims['email_verified'] is False
assert user_info['sub'] == make_sub(oidc_client, simple_user)
assert user_info['preferred_username'] == ''
assert user_info['given_name'] == simple_user.first_name
assert user_info['family_name'] == simple_user.last_name
assert user_info['email'] == simple_user.email
assert user_info['phone'] == simple_user.phone
assert user_info['email_verified'] is False
params['scope'] = 'openid email'
claims, user_info = sso()
assert claims['sub'] == make_sub(oidc_client, simple_user)
assert claims['email'] == simple_user.email
assert claims['email_verified'] is False
assert 'phone' not in claims
assert 'preferred_username' not in claims
assert 'given_name' not in claims
assert 'family_name' not in claims
assert user_info['sub'] == make_sub(oidc_client, simple_user)
assert user_info['email'] == simple_user.email
assert user_info['email_verified'] is False
assert 'phone' not in user_info
assert 'preferred_username' not in user_info
assert 'given_name' not in user_info
assert 'family_name' not in user_info
def test_claim_templated(oidc_settings, normal_oidc_client, simple_user, app):
oidc_settings.A2_IDP_OIDC_SCOPES = ['openid', 'profile', 'email']
OIDCClaim.objects.filter(client=normal_oidc_client, name='given_name').delete()
OIDCClaim.objects.filter(client=normal_oidc_client, name='family_name').delete()
OIDCClaim.objects.create(
client=normal_oidc_client,
name='given_name',
value='{{ django_user_first_name|add:"ounet" }}',
scopes='profile',
)
OIDCClaim.objects.create(
client=normal_oidc_client,
name='family_name',
value='{{ "Von der "|add:django_user_last_name }}',
scopes='profile',
)
normal_oidc_client.authorization_flow = normal_oidc_client.FLOW_AUTHORIZATION_CODE
normal_oidc_client.authorization_mode = normal_oidc_client.AUTHORIZATION_MODE_NONE
normal_oidc_client.save()
utils.login(app, simple_user)
oidc_client = normal_oidc_client
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid email profile',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
}
def sso():
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
code = query['code'][0]
token_url = make_url('oidc-token')
response = app.post(
token_url,
params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
},
headers=client_authentication_headers(oidc_client),
)
access_token = response.json['access_token']
id_token = response.json['id_token']
k = base64.b64encode(oidc_client.client_secret.encode('utf-8'))
key = JWK(kty='oct', k=force_str(k))
jwt = JWT(jwt=id_token, key=key)
claims = json.loads(jwt.claims)
user_info_url = make_url('oidc-user-info')
response = app.get(user_info_url, headers=bearer_authentication_headers(access_token))
return claims, response.json
claims, user_info = sso()
assert claims['given_name'].endswith('ounet')
assert claims['given_name'].startswith(simple_user.first_name)
assert claims['family_name'].startswith('Von der')
assert claims['family_name'].endswith(simple_user.last_name)
assert user_info['given_name'].endswith('ounet')
assert user_info['given_name'].startswith(simple_user.first_name)
assert user_info['family_name'].startswith('Von der')
assert user_info['family_name'].endswith(simple_user.last_name)
def test_client_validate_redirect_uri():
client = OIDCClient(
redirect_uris='''http://example.com
http://example2.com/
http://example3.com/toto
http://*example4.com/
http://example5.com/toto*
http://example6.com/#*
http://example7.com/?*
http://example8.com/?*#*
'''
)
# ok
for uri in [
'http://example.com',
'http://example.com/',
'http://example2.com',
'http://example2.com/',
'http://example3.com/toto',
'http://example3.com/toto/',
'http://example4.com/',
'http://example4.com',
'http://coin.example4.com',
'http://coin.example4.com/',
'http://example5.com/toto',
'http://example5.com/toto/',
'http://example5.com/toto/tata',
'http://example5.com/toto/tata/',
'http://example6.com/#some-fragment',
'http://example7.com/?foo=bar',
'http://example8.com/?foo=bar#some-fragment',
]:
client.validate_redirect_uri(uri)
# nok
for uri in [
'http://coin.example.com/',
'http://example.com/toto/',
'http://coin.example.com',
'http://example3.com/',
'http://example3.com',
'http://coinexample4.com',
'http://coinexample4.com/',
'http://example5.com/tototata/',
'http://example5.com/tototata',
'http://example6.com/?foo=bar',
'http://example7.com/#some-fragment',
]:
with pytest.raises(ValueError, match=r'is not declared'):
client.validate_redirect_uri(uri)
client.validate_redirect_uri('http://example5.com/toto/' + 'a' * 500)
with pytest.raises(ValueError, match=r'redirect_uri length >'):
client.validate_redirect_uri('http://example5.com/toto/' + 'a' * 1024)
def test_filter_api_users(app, oidc_client, admin, simple_user, role_random):
oidc_client.has_api_access = True
oidc_client.save()
if oidc_client.identifier_policy not in (oidc_client.POLICY_UUID, oidc_client.POLICY_PAIRWISE_REVERSIBLE):
return
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
response = app.get('/api/users/')
count = len(response.json['results'])
assert count > 0
AuthorizedRole.objects.create(service=oidc_client, role=role_random)
response = app.get('/api/users/')
assert len(response.json['results']) == 0
role_random.members.add(simple_user)
response = app.get('/api/users/')
assert len(response.json['results']) == 1
AuthorizedRole.objects.all().delete()
response = app.get('/api/users/')
assert len(response.json['results']) == count
def test_credentials_grant(app, oidc_client, admin, simple_user):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.scope = 'openid'
oidc_client.save()
token_url = make_url('oidc-token')
if oidc_client.idtoken_algo == OIDCClient.ALGO_HMAC:
k = base64url(oidc_client.client_secret.encode('utf-8'))
jwk = JWK(kty='oct', k=force_str(k))
elif oidc_client.idtoken_algo == OIDCClient.ALGO_RSA:
jwk = get_first_rsa_sig_key()
elif oidc_client.idtoken_algo == OIDCClient.ALGO_EC:
jwk = get_first_ec_sig_key()
# 1. test in-request client credentials
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
response = app.post(token_url, params=params)
assert 'id_token' in response.json
token = response.json['id_token']
assert len(token.split('.')) == 3
jwt = JWT()
# jwt deserialization implicitly checks the token signature:
jwt.deserialize(token, key=jwk)
claims = json.loads(jwt.claims)
assert set(claims) == {'acr', 'aud', 'auth_time', 'exp', 'iat', 'iss', 'sub'}
assert all(claims.values())
# 2. test basic authz
params.pop('client_id')
params.pop('client_secret')
response = app.post(token_url, params=params, headers=client_authentication_headers(oidc_client))
assert 'id_token' in response.json
token = response.json['id_token']
assert len(token.split('.')) == 3
jwt = JWT()
# jwt deserialization implicitly checks the token signature:
jwt.deserialize(token, key=jwk)
claims = json.loads(jwt.claims)
assert set(claims) == {'acr', 'aud', 'auth_time', 'exp', 'iat', 'iss', 'sub'}
assert all(claims.values())
def test_credentials_grant_ratelimitation_invalid_client(
app, oidc_client, admin, simple_user, oidc_settings, freezer
):
freezer.move_to('2020-01-01')
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
token_url = make_url('oidc-token')
params = {
'client_id': oidc_client.client_id,
'client_secret': 'notgood',
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
for _ in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])):
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_client'
assert 'Wrong client secret' in response.json['error_description']
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_request'
assert response.json['error_description'] == 'Rate limit exceeded for IP address "127.0.0.1"'
def test_credentials_grant_ratelimitation_valid_client(
app, oidc_client, admin, simple_user, oidc_settings, freezer
):
freezer.move_to('2020-01-01')
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
token_url = make_url('oidc-token')
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
for _ in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])):
app.post(token_url, params=params)
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_client'
assert response.json['error_description'] == 'Rate limit of 100/m exceeded for client "oidcclient"'
def test_credentials_grant_retrytimout(app, oidc_client, admin, simple_user, settings, freezer):
freezer.move_to('2020-01-01')
settings.A2_LOGIN_EXPONENTIAL_RETRY_TIMEOUT_DURATION = 2
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
token_url = make_url('oidc-token')
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': 'SurelyNotTheRightPassword',
}
attempts = 0
while attempts < 100:
response = app.post(token_url, params=params, status=400)
attempts += 1
if attempts >= 10:
assert response.json['error'] == 'invalid_request'
assert 'Too many attempts with erroneous RO password' in response.json['error_description']
# freeze some time after backoff delay expiration
freezer.move_to(datetime.timedelta(days=2))
# obtain a successful login
params['password'] = simple_user.username
response = app.post(token_url, params=params, status=200)
assert 'id_token' in response.json
def test_credentials_grant_invalid_flow(app, oidc_client, admin, simple_user, settings):
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': 'SurelyNotTheRightPassword',
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'unauthorized_client'
assert 'is not configured' in response.json['error_description']
def test_credentials_grant_invalid_client(app, oidc_client, admin, simple_user, settings, caplog):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': 'tryingthis', # Nope, wrong secret
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_client'
assert response.json['error_description'] == 'Wrong client secret'
assert 'tryingthis' not in str(response.json)
assert 'received tryingthis' in caplog.messages[0]
def test_credentials_grant_invalid_client_identifier(app, oidc_client, admin, simple_user, settings, caplog):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': 'xxx',
'client_secret': 'tryingthis',
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_client'
assert response.json['error_description'] == 'Wrong client identifier: xxx'
params['client_id'] = ''
response = app.post(token_url, params=params, status=400)
assert response.json['error_description'] == 'Empty client identifier'
def test_credentials_grant_unauthz_client(app, oidc_client, admin, simple_user, settings):
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'unauthorized_client'
assert 'Client is not configured for resource owner' in response.json['error_description']
def test_credentials_grant_invalid_content_type(app, oidc_client, admin, simple_user, settings):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'username': simple_user.username,
'password': simple_user.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, content_type='multipart/form-data', status=400)
assert response.json['error'] == 'invalid_request'
assert 'Wrong content type' in response.json['error_description']
def test_credentials_grant_ou_selection_simple(
app, oidc_client, admin, user_ou1, user_ou2, ou1, ou2, settings
):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'ou_slug': ou1.slug,
'username': user_ou1.username,
'password': user_ou1.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=200)
params['username'] = user_ou2.username
params['password'] = user_ou2.password
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'access_denied'
assert response.json['error_description'] == 'Invalid user credentials'
def test_credentials_grant_ou_selection_username_not_unique(
app, oidc_client, admin, user_ou1, admin_ou2, ou1, ou2, settings
):
settings.A2_USERNAME_IS_UNIQUE = False
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
admin_ou2.username = user_ou1.username
admin_ou2.set_password(user_ou1.username)
admin_ou2.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'ou_slug': ou1.slug,
'username': user_ou1.username,
'password': user_ou1.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params)
assert OIDCAccessToken.objects.get(uuid=response.json['access_token']).user == user_ou1
params['ou_slug'] = ou2.slug
response = app.post(token_url, params=params)
assert OIDCAccessToken.objects.get(uuid=response.json['access_token']).user == admin_ou2
def test_credentials_grant_ou_selection_username_not_unique_wrong_ou(
app, oidc_client, admin, user_ou1, admin_ou2, ou1, ou2, settings
):
settings.A2_USERNAME_IS_UNIQUE = False
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'ou_slug': ou2.slug,
'username': user_ou1.username,
'password': user_ou1.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
params['ou_slug'] = ou1.slug
params['username'] = admin_ou2.username
params['password'] = admin_ou2.password
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'access_denied'
assert response.json['error_description'] == 'Invalid user credentials'
def test_credentials_grant_ou_selection_invalid_ou(app, oidc_client, admin, user_ou1, settings):
oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED
oidc_client.save()
params = {
'client_id': oidc_client.client_id,
'client_secret': oidc_client.client_secret,
'grant_type': 'password',
'ou_slug': 'invalidslug',
'username': user_ou1.username,
'password': user_ou1.username,
}
token_url = make_url('oidc-token')
response = app.post(token_url, params=params, status=400)
assert response.json['error'] == 'invalid_request'
assert (
response.json['error_description']
== 'Parameter "ou_slug" does not match an existing organizational unit'
)
def test_oidc_client_clean():
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/', identifier_policy=OIDCClient.POLICY_UUID
).clean()
with pytest.raises(ValidationError, match=r'same domain'):
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/',
identifier_policy=OIDCClient.POLICY_PAIRWISE_REVERSIBLE,
).clean()
with pytest.raises(ValidationError, match=r'same domain'):
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/',
identifier_policy=OIDCClient.POLICY_PAIRWISE,
).clean()
with pytest.raises(ValidationError, match=r'same domain'):
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/',
identifier_policy=OIDCClient.POLICY_PAIRWISE,
).clean()
OIDCClient(
redirect_uris='https://example.com/ https://example2.com/',
sector_identifier_uri='https://example.com/',
).clean()
def test_consents_view(app, oidc_client, simple_user):
url = '/accounts/consents/'
response = app.get(url, status=302)
assert '/login/' in response.location
utils.login(app, simple_user)
response = app.get(url, status=200)
assert "You have not given any authorization to access your account profile data." in response.text
# create an ou authz
ou1 = OrganizationalUnit.objects.create(name='Orgunit1', slug='orgunit1')
OIDCAuthorization.objects.create(
client=ou1,
user=simple_user,
scopes='openid profile email',
expired=now() + datetime.timedelta(days=2),
)
# create service authzs
OIDCAuthorization.objects.create(
client=oidc_client, user=simple_user, scopes='openid', expired=now() + datetime.timedelta(days=2)
)
OIDCAuthorization.objects.create(
client=oidc_client,
user=simple_user,
scopes='openid profile',
expired=now() + datetime.timedelta(days=2),
)
OIDCAuthorization.objects.create(
client=oidc_client,
user=simple_user,
scopes='openid profile email',
expired=now() + datetime.timedelta(days=2),
)
response = app.get(url, status=200)
assert "You have given authorizations to access your account profile data." in response.text
assert len(response.html.find_all('button', {'class': 'consents--revoke-button'})) == 4
# revoke two service authz
response = response.forms[1].submit()
response = response.follow()
assert len(response.html.find_all('button', {'class': 'consents--revoke-button'})) == 3
assert OIDCAuthorization.objects.filter(client_ct__model='oidcclient').count() == 2
utils.assert_event(
'user.service.sso.unauthorization',
session=app.session,
user=simple_user,
service=oidc_client,
)
response = response.forms[1].submit()
response = response.follow()
assert len(response.html.find_all('button', {'class': 'consents--revoke-button'})) == 2
assert OIDCAuthorization.objects.filter(client_ct__model='oidcclient').count() == 1
# revoke the only OU authz
response = response.forms[0].submit()
response = response.follow()
assert len(response.html.find_all('button', {'class': 'consents--revoke-button'})) == 1
assert OIDCAuthorization.objects.filter(client_ct__model='organizationalunit').count() == 0
def test_oidc_good_next_url_hook(app, oidc_client):
from django.test.client import RequestFactory
rf = RequestFactory()
request = rf.get('/')
assert good_next_url(request, 'https://example.com/')
def test_authorize_with_prompt_none_and_view_restriction(
oidc_settings, app, simple_oidc_client, simple_user, cgu_attribute
):
redirect_uri = simple_oidc_client.redirect_uris.split()[0]
params = {
'client_id': simple_oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
'prompt': 'none',
}
# login first
utils.login(app, simple_user)
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
location = urllib.parse.urlparse(response['Location'])
assert QueryDict(location.query)['error'] == 'interaction_required'
def test_authorize_with_view_restriction(oidc_settings, app, simple_oidc_client, simple_user, cgu_attribute):
redirect_uri = simple_oidc_client.redirect_uris.split()[0]
params = {
'client_id': simple_oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'response_type': 'code',
}
# login first
utils.login(app, simple_user)
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
assert response.location.startswith('/accounts/edit/required/?')
def test_token_endpoint_code_timeout(oidc_client, oidc_settings, simple_user, app, caplog, rf, freezer):
'''Verify codes are valid during 30 seconds'''
utils.login(app, simple_user)
oidc_client.authorization_mode = oidc_client.AUTHORIZATION_MODE_NONE
oidc_client.save()
redirect_uri = oidc_client.redirect_uris.split()[0]
params = {
'client_id': oidc_client.client_id,
'scope': 'openid profile email',
'redirect_uri': redirect_uri,
'state': 'xxx',
'nonce': 'yyy',
'login_hint': 'backoffice john@example.com',
'response_type': 'code',
}
authorize_url = make_url('oidc-authorize', params=params)
response = app.get(authorize_url)
location = urllib.parse.urlparse(response['Location'])
query = urllib.parse.parse_qs(location.query)
code = query['code'][0]
def resolve_code(**kwargs):
token_url = make_url('oidc-token')
return app.post(
token_url,
params={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': oidc_client.redirect_uris.split()[0],
},
headers=client_authentication_headers(oidc_client),
**kwargs,
)
response = resolve_code()
assert 'access_token' in response.json
freezer.move_to(datetime.timedelta(seconds=29))
response = resolve_code()
assert 'access_token' in response.json
# code should expire after 30 seconds
freezer.move_to(datetime.timedelta(seconds=1.1))
response = resolve_code(status=400)
assert 'access_token' not in response.json