authentic/tests/test_auth_oidc.py

865 lines
33 KiB
Python

# -*- coding: utf-8 -*-
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
import os
import pytest
import random
import re
import time
from jwcrypto.common import base64url_encode, base64url_decode, json_encode
from jwcrypto.jwk import JWKSet, JWK
from jwcrypto.jws import JWS, InvalidJWSObject
from jwcrypto.jwt import JWT
from httmock import urlmatch, HTTMock
from django.contrib.auth import get_user_model
from django.db import IntegrityError, transaction
from django.urls import reverse
from django.utils.encoding import force_text, force_str
from django.http import QueryDict
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.timezone import now
from django.utils.timezone import utc
from django_rbac.utils import get_ou_model
from authentic2_auth_oidc.utils import (
parse_id_token, IDToken, get_providers, has_providers, register_issuer,
IDTokenError)
from authentic2_auth_oidc.models import OIDCProvider, OIDCClaimMapping, OIDCAccount
from authentic2.models import Attribute
from authentic2.models import AttributeValue
from authentic2.utils import last_authentication_event
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.custom_user.models import DeletedUser
from . import utils
pytestmark = pytest.mark.django_db
User = get_user_model()
def test_base64url_decode():
with pytest.raises(ValueError):
base64url_decode('x')
base64url_decode('aa')
KID_RSA = '1e9gdk7'
KID_EC = 'jb20Cg8'
header_rsa_decoded = {'alg': 'RS256', 'kid': KID_RSA}
header_ec_decoded = {'alg': 'ES256', 'kid': KID_EC}
header_hmac_decoded = {'alg': 'HS256'}
payload_decoded = {
'sub': '248289761001',
'iss': 'http://server.example.com',
'aud': 's6BhdRkqt3',
'nonce': 'n-0S6_WzA2Mj',
'iat': 1311280970,
'exp': 2201094278,
}
header_rsa = 'eyJhbGciOiJSUzI1NiIsImtpZCI6IjFlOWdkazcifQ'
header_ec = 'eyJhbGciOiJFUzI1NiIsImtpZCI6ImpiMjBDZzgifQ'
header_hmac = 'eyJhbGciOiJIUzI1NiJ9'
payload = ('eyJhdWQiOiJzNkJoZFJrcXQzIiwiZXhwIjoyMjAxMDk0Mjc4LCJpYXQiOjEzMTEyOD'
'A5NzAsImlzcyI6Imh0dHA6Ly9zZXJ2ZXIuZXhhbXBsZS5jb20iLCJub25jZSI6Im4t'
'MFM2X1d6QTJNaiIsInN1YiI6IjI0ODI4OTc2MTAwMSJ9')
def test_parse_id_token(code, oidc_provider, oidc_provider_jwkset):
header = _header(oidc_provider)
signature = _signature(oidc_provider)
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
with pytest.raises(InvalidJWSObject):
parse_id_token('x%s.%s.%s' % (header, payload, signature), oidc_provider)
with pytest.raises(InvalidJWSObject):
parse_id_token('%s.%s.%s' % ('$', payload, signature), oidc_provider)
with pytest.raises(InvalidJWSObject):
parse_id_token('%s.x%s.%s' % (header, payload, signature), oidc_provider)
with pytest.raises(InvalidJWSObject):
parse_id_token('%s.%s.%s' % (header, '$', signature), oidc_provider)
with pytest.raises(InvalidJWSObject):
parse_id_token('%s.%s.%s' % (header, payload, '-'), oidc_provider)
assert parse_id_token('%s.%s.%s' % (header, payload, signature), oidc_provider)
def test_idtoken(oidc_provider):
signature = _signature(oidc_provider)
header = _header(oidc_provider)
token = IDToken('%s.%s.%s' % (header, payload, signature))
token.deserialize(oidc_provider)
assert token.sub == payload_decoded['sub']
assert token.iss == payload_decoded['iss']
assert token.aud == payload_decoded['aud']
assert token.nonce == payload_decoded['nonce']
assert token.iat == datetime.datetime(2011, 7, 21, 20, 42, 50, tzinfo=utc)
assert token.exp == datetime.datetime(2039, 10, 1, 15, 4, 38, tzinfo=utc)
@pytest.fixture
def oidc_provider_jwkset():
key_rsa = JWK.generate(kty='RSA', size=512, kid=KID_RSA)
key_ec = JWK.generate(kty='EC', size=256, kid=KID_EC)
jwkset = JWKSet()
jwkset.add(key_rsa)
jwkset.add(key_ec)
return jwkset
OIDC_PROVIDER_PARAMS = [
{},
{
'idtoken_algo': OIDCProvider.ALGO_HMAC,
},
{
'idtoken_algo': OIDCProvider.ALGO_EC,
},
{
'claims_parameter_supported': True,
}
]
@pytest.fixture(params=OIDC_PROVIDER_PARAMS)
def oidc_provider(request, db, oidc_provider_jwkset):
claims_parameter_supported = request.param.get('claims_parameter_supported', False)
idtoken_algo = request.param.get('idtoken_algo', OIDCProvider.ALGO_RSA)
return make_oidc_provider(
idtoken_algo=idtoken_algo,
jwkset=oidc_provider_jwkset,
claims_parameter_supported=claims_parameter_supported)
@pytest.fixture
def oidc_provider_rsa(request, db, oidc_provider_jwkset):
return make_oidc_provider(
idtoken_algo=OIDCProvider.ALGO_RSA,
jwkset=oidc_provider_jwkset)
def make_oidc_provider(
name='Server',
slug=None,
issuer=None,
max_auth_age=10,
strategy=OIDCProvider.STRATEGY_CREATE,
idtoken_algo=OIDCProvider._meta.get_field('idtoken_algo').default,
jwkset=None,
claims_parameter_supported=False):
slug = slug or name.lower()
issuer = issuer or ('https://%s.example.com' % slug)
jwkset = json.loads(jwkset.export()) if jwkset else None
provider = OIDCProvider.objects.create(
ou=get_default_ou(),
name=name,
slug=slug,
issuer=issuer,
authorization_endpoint='%s/authorize' % issuer,
token_endpoint='%s/token' % issuer,
end_session_endpoint='%s/logout' % issuer,
userinfo_endpoint='%s/user_info' % issuer,
token_revocation_endpoint='%s/revoke' % issuer,
max_auth_age=max_auth_age,
strategy=strategy,
jwkset_json=jwkset,
idtoken_algo=idtoken_algo,
claims_parameter_supported=claims_parameter_supported,
)
provider.full_clean()
OIDCClaimMapping.objects.create(
provider=provider,
claim='sub',
attribute='username',
idtoken_claim=True)
OIDCClaimMapping.objects.create(
provider=provider,
claim='email',
attribute='email')
OIDCClaimMapping.objects.create(
provider=provider,
claim='email',
required=True,
attribute='email')
OIDCClaimMapping.objects.create(
provider=provider,
claim='given_name',
required=True,
verified=OIDCClaimMapping.ALWAYS_VERIFIED,
attribute='first_name')
OIDCClaimMapping.objects.create(
provider=provider,
claim='family_name',
required=True,
verified=OIDCClaimMapping.VERIFIED_CLAIM,
attribute='last_name')
OIDCClaimMapping.objects.create(
provider=provider,
claim='ou',
attribute='ou__slug')
return provider
@pytest.fixture
def code():
return 'xxxx'
def _header(oidc_provider):
return {
OIDCProvider.ALGO_RSA: header_rsa,
OIDCProvider.ALGO_EC: header_ec,
OIDCProvider.ALGO_HMAC: header_hmac,
}.get(oidc_provider.idtoken_algo)
def _signature(oidc_provider):
if oidc_provider.idtoken_algo == OIDCProvider.ALGO_RSA:
key = oidc_provider.jwkset.get_key(kid=KID_RSA)
header_decoded = header_rsa_decoded
elif oidc_provider.idtoken_algo == OIDCProvider.ALGO_EC:
key = oidc_provider.jwkset.get_key(kid=KID_EC)
header_decoded = header_ec_decoded
elif oidc_provider.idtoken_algo == OIDCProvider.ALGO_HMAC:
key = JWK(kty='oct',
k=base64url_encode(
oidc_provider.client_secret.encode('utf-8')))
header_decoded = header_hmac_decoded
jws = JWS(payload=json_encode(payload_decoded))
jws.add_signature(key=key, protected=header_decoded)
return json.loads(jws.serialize())['signature']
def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token=None,
extra_user_info=None, sub='john.doe', nonce=None,
provides_kid_header=False, kid=None):
token_endpoint = urlparse.urlparse(oidc_provider.token_endpoint)
userinfo_endpoint = urlparse.urlparse(oidc_provider.userinfo_endpoint)
token_revocation_endpoint = urlparse.urlparse(oidc_provider.token_revocation_endpoint)
@urlmatch(netloc=token_endpoint.netloc, path=token_endpoint.path)
def token_endpoint_mock(url, request):
if urlparse.parse_qs(request.body).get('code') == [code]:
exp = now() + datetime.timedelta(seconds=10)
id_token = {
'iss': oidc_provider.issuer,
'sub': sub,
'iat': int(now().timestamp()),
'aud': str(oidc_provider.client_id),
'exp': int(exp.timestamp()),
'name': 'doe',
}
if nonce:
id_token['nonce'] = nonce
if extra_id_token:
id_token.update(extra_id_token)
if oidc_provider.idtoken_algo in (OIDCProvider.ALGO_RSA, OIDCProvider.ALGO_EC):
alg = {
OIDCProvider.ALGO_RSA: 'RS256',
OIDCProvider.ALGO_EC: 'ES256',
}.get(oidc_provider.idtoken_algo)
jwk = None
for key in oidc_provider_jwkset['keys']:
if key.key_type == {
OIDCProvider.ALGO_RSA: 'RSA',
OIDCProvider.ALGO_EC: 'EC',
}.get(oidc_provider.idtoken_algo):
jwk = key
break
if provides_kid_header:
header = {'alg': alg, 'kid': kid}
else:
header = {'alg': alg, 'kid': jwk.key_id}
jwt = JWT(header=header, claims=id_token)
jwt.make_signed_token(jwk)
else: # hmac
jwt = JWT(header={'alg': 'HS256'},
claims=id_token)
k = base64url_encode(oidc_provider.client_secret.encode('utf-8'))
jwt.make_signed_token(
JWK(kty='oct',
k=force_text(k)))
content = {
'access_token': '1234',
# check token_type is case insensitive
'token_type': random.choice(['B', 'b']) + 'earer',
'id_token': jwt.serialize(),
}
return {
'content': json.dumps(content),
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
}
else:
return {
'content': json.dumps({'error': 'invalid request'}),
'headers': {
'content-type': 'application/json',
},
'status_code': 400,
}
@urlmatch(netloc=userinfo_endpoint.netloc, path=userinfo_endpoint.path)
def user_info_endpoint_mock(url, request):
user_info = {
'sub': sub,
'iss': oidc_provider.issuer,
'given_name': 'John',
'family_name': 'Doe',
'email': 'john.doe@example.com',
'phone_number': '0123456789',
'nickname': 'Hefty',
}
if extra_user_info:
user_info.update(extra_user_info)
return {
'content': json.dumps(user_info),
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
}
@urlmatch(netloc=token_revocation_endpoint.netloc, path=token_revocation_endpoint.path)
def token_revocation_endpoint_mock(url, request):
query = urlparse.parse_qs(request.body)
assert 'token' in query
return {
'status_code': 200,
}
return HTTMock(token_endpoint_mock, user_info_endpoint_mock, token_revocation_endpoint_mock)
def login_callback_url(oidc_provider):
return reverse('oidc-login-callback')
def test_providers_on_login_page(oidc_provider, app):
response = app.get('/login/')
# two frontends should be present on login page
assert response.pyquery('p#oidc-p-server')
OIDCProvider.objects.create(
id=2,
ou=get_default_ou(),
name='OIDCIDP 2',
slug='oidcidp-2',
issuer='https://idp2.example.com/',
authorization_endpoint='https://idp2.example.com/authorize',
token_endpoint='https://idp2.example.com/token',
end_session_endpoint='https://idp2.example.com/logout',
userinfo_endpoint='https://idp*é.example.com/user_info',
token_revocation_endpoint='https://idp2.example.com/revoke',
max_auth_age=10,
strategy=OIDCProvider.STRATEGY_CREATE,
jwkset_json=None,
idtoken_algo=OIDCProvider.ALGO_RSA,
claims_parameter_supported=False
)
response = app.get('/login/')
assert response.pyquery('p#oidc-p-server')
assert response.pyquery('p#oidc-p-oidcidp-2')
def test_login_with_conditional_authenticators(oidc_provider, app, settings, caplog):
make_oidc_provider(name='My IDP', slug='myidp')
response = app.get('/login/')
assert 'My IDP' in response
assert 'Server' in response
settings.AUTH_FRONTENDS_KWARGS = {
'oidc': {
'show_condition': {
'myidp': 'remote_addr==\'0.0.0.0\''
}
}
}
response = app.get('/login/')
assert 'Server' in response
assert 'My IDP' not in response
settings.AUTH_FRONTENDS_KWARGS = {
'oidc': {
'show_condition': {
'myid': 'remote_addr==\'0.0.0.0\'',
'server': 'remote_addr==\'127.0.0.1\''
}
}
}
response = app.get('/login/')
assert 'Server' in response
assert 'My IDP' in response
settings.AUTH_FRONTENDS_KWARGS = {
'oidc': {
'show_condition': {
'myidp': 'remote_addr==\'0.0.0.0\'',
'server': 'remote_addr==\'127.0.0.1\''
}
}
}
response = app.get('/login/')
assert 'Server' in response
assert 'My IDP' not in response
settings.AUTH_FRONTENDS_KWARGS = {
'oidc': {
'show_condition': 'remote_addr==\'127.0.0.1\''
}
}
response = app.get('/login/')
assert 'Server' in response
assert 'My IDP' in response
settings.AUTH_FRONTENDS_KWARGS = {
'oidc': {
'show_condition': {
'myidp': 'remote_addr==\'127.0.0.1\' and \'backoffice\' not in login_hint',
'server': '\'backoffice\' in login_hint',
}
}
}
response = app.get('/login/')
assert 'Server' not in response
assert 'My IDP' in response
# As we do not create a session on each access to the login page, we need
# to force its creation by making django-webtest believe a session exists.
# use of force_str() can be removed with support for python2.
app.set_cookie(force_str(settings.SESSION_COOKIE_NAME), force_str('initial'))
session = app.session
session['login-hint'] = ['backoffice']
session.save()
app.set_cookie(force_str(settings.SESSION_COOKIE_NAME), force_str(session.session_key))
response = app.get('/login/')
assert 'Server' in response
assert 'My IDP' not in response
def test_login_autorun(oidc_provider, app, settings):
response = app.get('/login/')
assert 'Server' in response
# hide password block
settings.AUTH_FRONTENDS_KWARGS = {'password': {'show_condition': 'remote_addr==\'0.0.0.0\''}}
response = app.get('/login/', status=302)
assert response['Location'] == '/accounts/oidc/login/%s/' % oidc_provider.pk
def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
OU = get_ou_model()
cassis = OU.objects.create(name='Cassis', slug='cassis')
response = app.get('/admin/').maybe_follow()
assert oidc_provider.name in response.text
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
endpoint = urlparse.urlparse(oidc_provider.authorization_endpoint)
assert location.scheme == endpoint.scheme
assert location.netloc == endpoint.netloc
assert location.path == endpoint.path
query = QueryDict(location.query)
state = query['state']
assert query['response_type'] == 'code'
assert query['client_id'] == str(oidc_provider.client_id)
assert query['scope'] == 'openid'
assert query['redirect_uri'] == 'http://testserver' + reverse('oidc-login-callback')
nonce = query['nonce']
if oidc_provider.claims_parameter_supported:
claims = json.loads(query['claims'])
assert claims['id_token']['sub'] is None
assert claims['userinfo']['email']['essential']
assert claims['userinfo']['given_name']['essential']
assert claims['userinfo']['family_name']['essential']
assert claims['userinfo']['ou'] is None
assert User.objects.count() == 0
with utils.check_log(caplog, 'failed to contact the token_endpoint'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
response = app.get(login_callback_url(oidc_provider), params={'code': 'yyyy', 'state': state})
with utils.check_log(caplog, 'invalid id_token'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_id_token={'iss': None}):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
with utils.check_log(caplog, 'invalid id_token'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_id_token={'sub': None}):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
with utils.check_log(caplog, 'authentication is too old'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_id_token={'iat': 1}):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
with utils.check_log(caplog, 'invalid id_token'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_id_token={'exp': 1}):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
with utils.check_log(caplog, 'invalid id_token audience'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_id_token={'aud': 'zz'}):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
with utils.check_log(caplog, 'expected nonce'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert not hooks.auth_oidc_backend_modify_user
with utils.check_log(caplog, 'created user'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert len(hooks.auth_oidc_backend_modify_user) == 1
assert set(hooks.auth_oidc_backend_modify_user[0]['kwargs']) >= set(
['user', 'provider', 'user_info', 'id_token', 'access_token'])
assert urlparse.urlparse(response['Location']).path == '/admin/'
assert User.objects.count() == 1
user = User.objects.get()
assert user.ou == get_default_ou()
assert user.username == 'john.doe'
assert user.first_name == 'John'
assert user.last_name == 'Doe'
assert user.email == 'john.doe@example.com'
assert user.attributes.first_name == 'John'
assert user.attributes.last_name == 'Doe'
assert AttributeValue.objects.filter(content='John', verified=True).count() == 1
assert AttributeValue.objects.filter(content='Doe', verified=False).count() == 1
assert last_authentication_event(session=app.session)['nonce'] == nonce
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_user_info={'family_name_verified': True}, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert AttributeValue.objects.filter(content='Doe', verified=False).count() == 0
assert AttributeValue.objects.filter(content='Doe', verified=True).count() == 1
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_user_info={'ou': 'cassis'}, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert User.objects.count() == 1
user = User.objects.get()
assert user.ou == cassis
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert User.objects.count() == 1
user = User.objects.get()
assert user.ou == get_default_ou()
last_modified = user.modified
time.sleep(0.1)
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert User.objects.count() == 1
user = User.objects.get()
assert user.ou == get_default_ou()
assert user.modified == last_modified
response = app.get(reverse('account_management'))
with utils.check_log(caplog, 'revoked token from OIDC'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
response = response.click(href='logout')
assert response.location.startswith('https://server.example.com/logout?')
def test_show_on_login_page(app, oidc_provider):
response = app.get('/login/')
assert 'oidc-a-server' in response.text
# do not show this provider on login page anymore
oidc_provider.show = False
oidc_provider.save()
# we have a 5 seconds cache on list of providers, we have to work around it
get_providers.cache.clear()
has_providers.cache.clear()
response = app.get('/login/')
assert 'oidc-a-server' not in response.text
def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwkset, simple_user):
# no mapping please
OIDCClaimMapping.objects.all().delete()
oidc_provider.strategy = oidc_provider.STRATEGY_FIND_UUID
oidc_provider.save()
assert User.objects.count() == 1
response = app.get('/').maybe_follow()
assert oidc_provider.name in response.text
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
# sub=john.doe, MUST not work
with utils.check_log(caplog, 'cannot create user'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
# sub=simple_user.uuid MUST work
with utils.check_log(caplog, 'found user using UUID'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, sub=simple_user.uuid, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert urlparse.urlparse(response['Location']).path == '/'
assert User.objects.count() == 1
user = User.objects.get()
# verify user was not modified
assert user.username == 'user'
assert user.first_name == u'Jôhn'
assert user.last_name == u'Dôe'
assert user.email == 'user@example.net'
assert user.attributes.first_name == u'Jôhn'
assert user.attributes.last_name == u'Dôe'
response = app.get(reverse('account_management'))
with utils.check_log(caplog, 'revoked token from OIDC'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = response.click(href='logout')
assert response.location.startswith('https://server.example.com/logout?')
def test_strategy_create(app, caplog, code, oidc_provider, oidc_provider_jwkset):
oidc_provider.ou.email_is_unique = True
oidc_provider.ou.save()
User.objects.all().delete()
response = app.get('/').maybe_follow()
assert oidc_provider.name in response.text
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
# sub=john.doe
with utils.check_log(caplog, 'auth_oidc: created user'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert User.objects.count() == 1
# second time
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert User.objects.count() == 1
# different sub, same user
with utils.check_log(caplog, 'auth_oidc: changed user'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, sub='other', nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert User.objects.count() == 1
def test_register_issuer(db, app, caplog, oidc_provider_jwkset):
config_dir = os.path.dirname(__file__)
config_file = os.path.join(config_dir, 'openid_configuration.json')
with open(config_file) as f:
oidc_conf = json.load(f)
jwks_uri = urlparse.urlparse(oidc_conf['jwks_uri'])
@urlmatch(netloc=jwks_uri.netloc, path=jwks_uri.path)
def jwks_mock(url, request):
return oidc_provider_jwkset.export()
with HTTMock(jwks_mock):
register_issuer(
name='test_issuer',
issuer='https://default.issuer',
openid_configuration=oidc_conf)
oidc_conf['id_token_signing_alg_values_supported'] = ['HS256']
with HTTMock(jwks_mock):
register_issuer(
name='test_issuer_hmac_only',
issuer='https://hmac_only.issuer',
openid_configuration=oidc_conf)
def test_required_keys(db, oidc_provider, caplog):
erroneous_payload = base64url_encode(json.dumps({
'sub': '248289761001',
'iss': 'http://server.example.com',
'iat': 1311280970,
'exp': 1311281970, # Missing 'aud' and 'nonce' required claims
'extra_stuff': 'hi there', # Wrong claim
}).encode('ascii'))
with pytest.raises(IDTokenError):
with utils.check_log(caplog, 'missing field'):
token = IDToken('{}.{}.{}'.format(_header(oidc_provider), erroneous_payload, _signature(oidc_provider)))
token.deserialize(oidc_provider)
def test_invalid_kid(app, caplog, code, oidc_provider_rsa,
oidc_provider_jwkset, simple_user):
# no mapping please
OIDCClaimMapping.objects.all().delete()
assert User.objects.count() == 1
response = app.get('/').maybe_follow()
assert oidc_provider_rsa.name in response.text
response = response.click(oidc_provider_rsa.name)
location = urlparse.urlparse(response.location)
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
# test invalid kid
with utils.check_log(caplog, message='not in key set', levelname='WARNING'):
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code,
nonce=nonce, provides_kid_header=True,
kid='coin'):
response = app.get(login_callback_url(oidc_provider_rsa),
params={'code': code, 'state': state})
# test missing kid
with utils.check_log(caplog, message='Key ID None not in key set', levelname='WARNING'):
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code,
nonce=nonce, provides_kid_header=True,
kid=None):
response = app.get(login_callback_url(oidc_provider_rsa),
params={'code': code, 'state': state})
def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider_jwkset):
Attribute.objects.create(
name='pro_phone',
label='professonial phone',
kind='phone_number',
asked_on_registration=True
)
# no default mapping
OIDCClaimMapping.objects.all().delete()
OIDCClaimMapping.objects.create(
provider=oidc_provider,
attribute='username',
idtoken_claim=False,
claim='{{ given_name }} "{{ nickname }}" {{ family_name }}',
)
OIDCClaimMapping.objects.create(
provider=oidc_provider,
attribute='pro_phone',
idtoken_claim=False,
claim='(prefix +33) {{ phone_number }}',
)
OIDCClaimMapping.objects.create(
provider=oidc_provider,
attribute='email',
idtoken_claim=False,
claim='{{ given_name }}@foo.bar',
)
# last one, with an idtoken claim
OIDCClaimMapping.objects.create(
provider=oidc_provider,
attribute='last_name',
idtoken_claim=True,
claim='{{ name|upper }}',
)
# typo in template string
OIDCClaimMapping.objects.create(
provider=oidc_provider,
attribute='first_name',
idtoken_claim=True,
claim='{{ given_name',
)
oidc_provider.save()
assert User.objects.count() == 0
response = app.get('/').maybe_follow()
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = app.get(login_callback_url(oidc_provider),
params={'code': code, 'state': state}).maybe_follow()
assert User.objects.count() == 1
user = User.objects.first()
assert user.username == 'John "Hefty" Doe'
assert user.attributes.pro_phone == '(prefix +33) 0123456789'
assert user.email == 'John@foo.bar'
assert user.last_name == 'DOE'
# typo in template string, no rendering
assert user.first_name == '{{ given_name'
def test_lost_state(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
response = app.get('/login/?next=/whatever/')
assert oidc_provider.name in response.text
response = response.click(oidc_provider.name)
qs = urlparse.parse_qs(urlparse.urlparse(response.location).query)
state = qs['state']
# reset the session to forget the state
app.cookiejar.clear()
caplog.clear()
with utils.norequest:
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
# not logged
assert re.match('^auth-oidc: state.*has been lost', caplog.records[-1].message)
# event is recorded
assert '_auth_user_id' not in app.session
# we are automatically redirected to our destination
assert response.location == '/accounts/oidc/login/%s/?next=/whatever/' % oidc_provider.pk
def test_multiple_accounts(db):
user1 = User.objects.create()
user2 = User.objects.create()
provider1 = make_oidc_provider(name='Provider1')
provider2 = make_oidc_provider(name='Provider2')
OIDCAccount.objects.create(user=user1, provider=provider1, sub='1234')
with pytest.raises(IntegrityError):
with transaction.atomic():
OIDCAccount.objects.create(user=user1, provider=provider2, sub='4567')
OIDCAccount.objects.create(user=user2, provider=provider2, sub='1234')
def test_save_account_on_delete_user(db):
provider = make_oidc_provider(name='Provider1')
user = User.objects.create()
OIDCAccount.objects.create(user=user, provider=provider, sub='1234')
user.mark_as_deleted()
User.objects.cleanup(threshold=0, timestamp=now() + datetime.timedelta(seconds=1))
assert OIDCAccount.objects.count() == 0
deleted_user = DeletedUser.objects.get()
assert deleted_user.old_data.get('oidc_accounts') == [
{
'issuer': 'https://provider1.example.com',
'sub': '1234',
}
]