auth_oidc: use a signed state (#47825)

State is no more stored in the session, it's made using signing.dumps()
instead, to be more resilient. It's associated to a cookie scoped to the
callback path and the nonce created from the state id using an HMAC
construction with settings.SECRET_KEY.
This commit is contained in:
Benjamin Dauvergne 2020-10-18 12:54:50 +02:00
parent 6cd84ac407
commit 7b002f861f
3 changed files with 203 additions and 125 deletions

View File

@ -14,12 +14,15 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import uuid
import logging
import hashlib
import json
import logging
import uuid
import requests
from django.conf import settings
from django.core import signing
from django.urls import reverse
from django.utils.translation import get_language, ugettext as _
from django.contrib import messages
@ -34,22 +37,36 @@ from authentic2.utils import redirect, login, good_next_url, authenticate
from . import app_settings, models
from .utils import get_provider, get_provider_by_issuer
logger = logging.getLogger(__name__)
def make_nonce(state):
return hashlib.sha256(state.encode() + settings.SECRET_KEY.encode()).hexdigest()
@setting_enabled('ENABLE', settings=app_settings)
def oidc_login(request, pk, next_url=None, *args, **kwargs):
logger = logging.getLogger(__name__)
provider = get_provider(pk)
scopes = set(provider.scopes.split()) | set(['openid'])
state = str(uuid.uuid4())
nonce = request.GET.get('nonce') or str(uuid.uuid4())
state_id = str(uuid.uuid4())
next_url = next_url or request.GET.get(REDIRECT_FIELD_NAME, '')
if next_url and not good_next_url(request, next_url):
next_url = None
nonce = make_nonce(state_id)
display = set()
prompt = set()
state_content = {
'state_id': state_id,
'issuer': provider.issuer,
}
if next_url:
state_content['next'] = next_url
params = {
'client_id': provider.client_id,
'scope': ' '.join(scopes),
'response_type': 'code',
'redirect_uri': request.build_absolute_uri(reverse('oidc-login-callback')),
'state': state,
'state': signing.dumps(state_content),
'nonce': nonce,
}
if provider.claims_parameter_supported:
@ -70,15 +87,13 @@ def oidc_login(request, pk, next_url=None, *args, **kwargs):
# FIXME: id_token_hint ?
# FIXME: acr_values ?
# save request state
saved_state = request.session.setdefault('auth_oidc', {}).setdefault(state, {})
saved_state['request'] = params
saved_state['issuer'] = provider.issuer
next_url = next_url or request.GET.get(REDIRECT_FIELD_NAME, '')
if good_next_url(request, next_url):
saved_state['next_url'] = next_url
request.session.modified = True # necessary if auth_oidc already exists
logger.debug('auth_oidc: sent request to authorization endpoint %r', params)
return redirect(request, provider.authorization_endpoint, params=params, resolve=False)
logger.debug('auth_oidc: sent request %s to authorization endpoint "%s"',
params, provider.authorization_endpoint)
response = redirect(request, provider.authorization_endpoint, params=params, resolve=False)
response.set_cookie(
'oidc-state', value=state_id, path=reverse('oidc-login-callback'),
httponly=True, secure=request.is_secure())
return response
@setting_enabled('ENABLE', settings=app_settings)
@ -89,78 +104,83 @@ def login_initiate(request, *args, **kwargs):
try:
provider = get_provider_by_issuer(issuer)
except models.OIDCProvider.DoesNotExist:
return HttpResponseBadRequest(u'unknown issuer %s' % issuer, content_type='text/plain')
return HttpResponseBadRequest('unknown issuer %s' % issuer, content_type='text/plain')
return oidc_login(request, pk=provider.pk, next_url=request.GET.get('target_link_uri'))
class LoginCallback(View):
def continue_to_next_url(self):
return redirect(self.request,
self.oidc_state.get('next_url', settings.LOGIN_REDIRECT_URL),
resolve=False)
next_url = None
def continue_to_next_url(self, request):
if self.next_url:
return redirect(request, self.next_url, resolve=False)
else:
return redirect(request, settings.LOGIN_REDIRECT_URL)
def get(self, request, *args, **kwargs):
logger = logging.getLogger(__name__)
response = self.handle_authorization_response(request)
# clean the state cookie in all cases
if 'oidc-state' in request.COOKIES:
response.delete_cookie('oidc-state')
return response
def handle_authorization_response(self, request):
code = request.GET.get('code')
state = request.GET.get('state')
oidc_state = self.oidc_state = request.session.get('auth_oidc', {}).get(state)
if not state or not oidc_state or 'request' not in oidc_state:
messages.warning(request, _('Login with OpenIDConnect failed, state lost.'))
logger.warning('auth_oidc: state lost')
raw_state = request.GET.get('state')
if not raw_state:
return redirect(request, settings.LOGIN_REDIRECT_URL)
oidc_request = oidc_state.get('request')
assert isinstance(oidc_request, dict), 'state is not properly initialized'
nonce = oidc_request.get('nonce')
try:
issuer = oidc_state.get('issuer')
state_content = signing.loads(raw_state)
except signing.BadSignature:
return redirect(request, settings.LOGIN_REDIRECT_URL)
state = state_content['state_id']
issuer = state_content['issuer']
nonce = make_nonce(state)
self.next_url = state_content.get('next')
try:
provider = get_provider_by_issuer(issuer)
except models.OIDCProvider.DoesNotExist:
messages.warning(request, _('Unknown OpenID connect issuer'))
messages.warning(request, _('Unknown OpenID connect issuer: "%s"') % issuer)
logger.warning('auth_oidc: unknown issuer, %s', issuer)
return self.continue_to_next_url()
return self.continue_to_next_url(request)
# Check state
if 'oidc-state' not in request.COOKIES or request.COOKIES['oidc-state'] != state:
logger.warning('auth-oidc: state %s for issuer %s has been lost', state, issuer)
params = {}
if self.next_url:
params['next'] = self.next_url
response = redirect(request, 'oidc-login', kwargs={'pk': str(provider.pk)}, params=params)
return response
# FIXME is idp initiated SSO allowed ? in this case state is maybe not mandatory
if 'error' in request.GET: # error code path
error_description = request.GET.get('error_description')
error_url = request.GET.get('error_url')
msg = u'auth_oidc: error received '
if error_description:
msg += u'%s (%s)' % (error_description, request.GET['error'])
else:
msg += request.GET['error']
if error_url:
msg += u' see %s' % error_url
logger.warning(msg)
if provider:
messages.warning(request, _('Login with %(name)s failed, report %(request_id)s '
'to an administrator.')
% {
'name': provider.name,
'request_id': request.request_id,
})
else:
messages.warning(request, _('Login with OpenIDConnect failed, report %s to an '
'administrator') % request.request_id)
return self.continue_to_next_url()
if not code:
return self.handle_error(request, provider)
elif not code:
messages.warning(request, _('Missing code, report %s to an administrator') %
request.request_id)
logger.warning('auth_oidc: missing code, %r', request.GET)
return self.continue_to_next_url()
return self.continue_to_next_url(request)
else:
return self.handle_code(request, provider, nonce, code)
def handle_code(self, request, provider, nonce, code):
try:
token_endpoint_request = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': request.build_absolute_uri(request.path),
}
logger.debug('auth_oidc: sent request to token endpoint %r', token_endpoint_request)
logger.debug('auth_oidc: sent request %s to token endpoint "%s"',
token_endpoint_request, token_endpoint_request)
response = requests.post(provider.token_endpoint, data=token_endpoint_request,
auth=(provider.client_id, provider.client_secret), timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.warning(
'auth_oidc: failed to contact the token_endpoint for %(issuer)s, %(exception)s' % {
'issuer': issuer,
'issuer': provider.issuer,
'exception': e,
})
messages.warning(request, _('Provider %(name)s is down, report %(request_id)s to '
@ -169,11 +189,11 @@ class LoginCallback(View):
'name': provider.name,
'request_id': request.request_id,
})
return self.continue_to_next_url()
return self.continue_to_next_url(request)
try:
result = response.json()
except ValueError as e:
logger.warning(u'auth_oidc: response from %s is not a JSON document, %s, %r' %
logger.warning('auth_oidc: response from %s is not a JSON document, %s, %r' %
(provider.token_endpoint, e, response.content))
messages.warning(request, _('Provider %(name)s is down, report %(request_id)s to '
'an administrator. ') %
@ -181,13 +201,13 @@ class LoginCallback(View):
'name': provider.name,
'request_id': request.request_id,
})
return self.continue_to_next_url()
return self.continue_to_next_url(request)
# token_type is case insensitive, https://tools.ietf.org/html/rfc6749#section-4.2.2
if ('access_token' not in result
or 'token_type' not in result
or result['token_type'].lower() != 'bearer'
or 'id_token' not in result):
logger.warning(u'auth_oidc: invalid token endpoint response from %s: %r' % (
logger.warning('auth_oidc: invalid token endpoint response from %s: %r' % (
provider.token_endpoint, result))
messages.warning(request, _('Provider %(name)s is down, report %(request_id)s to '
'an administrator. ') %
@ -195,22 +215,49 @@ class LoginCallback(View):
'name': provider.name,
'request_id': request.request_id,
})
return self.continue_to_next_url()
logger.info(u'got token response %s', result)
return self.continue_to_next_url(request)
logger.debug('auth_oidc: got token response %s', result)
access_token = result.get('access_token')
user = authenticate(request, access_token=access_token, nonce=nonce, id_token=result['id_token'], provider=provider)
user = authenticate(
request,
access_token=access_token,
nonce=nonce,
id_token=result['id_token'],
provider=provider)
if user:
# remember last tokens for logout
login(request, user, 'oidc', nonce=nonce)
tokens = request.session.setdefault('auth_oidc', {}).setdefault('tokens', [])
tokens.append({
'token_response': result,
'provider_pk': provider.pk,
})
request.session.modified = True
login(request, user, 'oidc', nonce=nonce)
else:
messages.warning(request, _('No user found'))
return self.continue_to_next_url()
return self.continue_to_next_url(request)
def handle_error(self, request, provider):
error_description = request.GET.get('error_description')
error_url = request.GET.get('error_url')
msg = 'auth_oidc: error received '
if error_description:
msg += '%s (%s)' % (error_description, request.GET['error'])
else:
msg += request.GET['error']
if error_url:
msg += ' see %s' % error_url
logger.warning(msg)
if provider:
messages.warning(request, _('Login with %(name)s failed, report %(request_id)s '
'to an administrator.')
% {
'name': provider.name,
'request_id': request.request_id,
})
else:
messages.warning(request, _('Login with OpenIDConnect failed, report %s to an '
'administrator') % request.request_id)
return self.continue_to_next_url(request)
login_callback = setting_enabled('ENABLE', settings=app_settings)(LoginCallback.as_view())

View File

@ -16,25 +16,27 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
import os
import pytest
import json
import time
import random
import re
import time
from jwcrypto.jwk import JWKSet, JWK
from jwcrypto.jwt import JWT
from jwcrypto.jws import JWS, InvalidJWSObject
from jwcrypto.common import base64url_encode, base64url_decode, json_encode
from jwcrypto.jwk import JWKSet, JWK
from jwcrypto.jws import JWS, InvalidJWSObject
from jwcrypto.jwt import JWT
from httmock import urlmatch, HTTMock
from django.urls import reverse
from django.utils.timezone import utc
from django.contrib.auth import get_user_model
from django.urls import reverse
from django.utils.encoding import force_text, force_str
from django.utils.timezone import now
from django.http import QueryDict
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.timezone import now
from django.utils.timezone import utc
from django_rbac.utils import get_ou_model
@ -261,8 +263,7 @@ def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token
if extra_id_token:
id_token.update(extra_id_token)
if oidc_provider.idtoken_algo in (OIDCProvider.ALGO_RSA,
OIDCProvider.ALGO_EC):
if oidc_provider.idtoken_algo in (OIDCProvider.ALGO_RSA, OIDCProvider.ALGO_EC):
alg = {
OIDCProvider.ALGO_RSA: 'RS256',
OIDCProvider.ALGO_EC: 'ES256',
@ -270,9 +271,9 @@ def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token
jwk = None
for key in oidc_provider_jwkset['keys']:
if key.key_type == {
OIDCProvider.ALGO_RSA: 'RSA',
OIDCProvider.ALGO_EC: 'EC',
}.get(oidc_provider.idtoken_algo):
OIDCProvider.ALGO_RSA: 'RSA',
OIDCProvider.ALGO_EC: 'EC',
}.get(oidc_provider.idtoken_algo):
jwk = key
break
if provides_kid_header:
@ -281,7 +282,7 @@ def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token
header = {'alg': alg, 'kid': jwk.key_id}
jwt = JWT(header=header, claims=id_token)
jwt.make_signed_token(jwk)
else: # hmac
else: # hmac
jwt = JWT(header={'alg': 'HS256'},
claims=id_token)
k = base64url_encode(oidc_provider.client_secret.encode('utf-8'))
@ -346,13 +347,6 @@ def login_callback_url(oidc_provider):
return reverse('oidc-login-callback')
def check_simple_qs(qs):
for k in qs:
assert len(qs[k]) == 1
qs[k] = qs[k][0]
return qs
def test_providers_on_login_page(oidc_provider, app):
response = app.get('/login/')
# two frontends should be present on login page
@ -381,7 +375,7 @@ def test_providers_on_login_page(oidc_provider, app):
def test_login_with_conditional_authenticators(oidc_provider, app, settings, caplog):
oidc2_provider = OIDCProvider.objects.create(
OIDCProvider.objects.create(
id=2,
ou=get_default_ou(),
name='My IDP',
@ -482,7 +476,6 @@ def test_login_autorun(oidc_provider, app, settings):
assert response['Location'] == '/accounts/oidc/login/%s/' % oidc_provider.pk
def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
OU = get_ou_model()
cassis = OU.objects.create(name='Cassis', slug='cassis')
@ -495,14 +488,13 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
assert location.scheme == endpoint.scheme
assert location.netloc == endpoint.netloc
assert location.path == endpoint.path
query = check_simple_qs(urlparse.parse_qs(location.query))
assert query['state'] in app.session['auth_oidc']
query = QueryDict(location.query)
state = query['state']
assert query['response_type'] == 'code'
assert query['client_id'] == str(oidc_provider.client_id)
assert query['scope'] == 'openid'
assert query['redirect_uri'] == 'http://testserver' + reverse('oidc-login-callback')
# get the nonce
nonce = app.session['auth_oidc'][query['state']]['request']['nonce']
nonce = query['nonce']
if oidc_provider.claims_parameter_supported:
claims = json.loads(query['claims'])
@ -517,34 +509,34 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
with utils.check_log(caplog, 'failed to contact the token_endpoint'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
response = app.get(login_callback_url(oidc_provider), params={'code': 'yyyy', 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': 'yyyy', 'state': state})
with utils.check_log(caplog, 'invalid id_token'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_id_token={'iss': None}):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
with utils.check_log(caplog, 'invalid id_token'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_id_token={'sub': None}):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
with utils.check_log(caplog, 'authentication is too old'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_id_token={'iat': 1}):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
with utils.check_log(caplog, 'invalid id_token'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_id_token={'exp': 1}):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
with utils.check_log(caplog, 'invalid id_token audience'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_id_token={'aud': 'zz'}):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
with utils.check_log(caplog, 'expected nonce'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert not hooks.auth_oidc_backend_modify_user
with utils.check_log(caplog, 'created user'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert len(hooks.auth_oidc_backend_modify_user) == 1
assert set(hooks.auth_oidc_backend_modify_user[0]['kwargs']) >= set(
['user', 'provider', 'user_info', 'id_token', 'access_token'])
@ -564,19 +556,19 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_user_info={'family_name_verified': True}, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert AttributeValue.objects.filter(content='Doe', verified=False).count() == 0
assert AttributeValue.objects.filter(content='Doe', verified=True).count() == 1
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
extra_user_info={'ou': 'cassis'}, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert User.objects.count() == 1
user = User.objects.get()
assert user.ou == cassis
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert User.objects.count() == 1
user = User.objects.get()
assert user.ou == get_default_ou()
@ -585,7 +577,7 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
time.sleep(0.1)
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert User.objects.count() == 1
user = User.objects.get()
assert user.ou == get_default_ou()
@ -626,18 +618,19 @@ def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwks
assert oidc_provider.name in response.text
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
query = check_simple_qs(urlparse.parse_qs(location.query))
nonce = app.session['auth_oidc'][query['state']]['request']['nonce']
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
# sub=john.doe, MUST not work
with utils.check_log(caplog, 'cannot create user'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
# sub=simple_user.uuid MUST work
with utils.check_log(caplog, 'found user using UUID'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, sub=simple_user.uuid, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert urlparse.urlparse(response['Location']).path == '/'
assert User.objects.count() == 1
@ -668,24 +661,25 @@ def test_strategy_create(app, caplog, code, oidc_provider, oidc_provider_jwkset)
assert oidc_provider.name in response.text
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
query = check_simple_qs(urlparse.parse_qs(location.query))
nonce = app.session['auth_oidc'][query['state']]['request']['nonce']
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
# sub=john.doe
with utils.check_log(caplog, 'auth_oidc: created user'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert User.objects.count() == 1
# second time
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert User.objects.count() == 1
# different sub, same user
with utils.check_log(caplog, 'auth_oidc: changed user'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, sub='other', nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert User.objects.count() == 1
@ -742,18 +736,25 @@ def test_invalid_kid(app, caplog, code, oidc_provider_rsa,
assert oidc_provider_rsa.name in response.text
response = response.click(oidc_provider_rsa.name)
location = urlparse.urlparse(response.location)
query = check_simple_qs(urlparse.parse_qs(location.query))
nonce = app.session['auth_oidc'][query['state']]['request']['nonce']
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
# test invalid kid
with utils.check_log(caplog, message='not in key set', levelname='WARNING'):
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, nonce=nonce, provides_kid_header=True, kid='coin'):
response = app.get(login_callback_url(oidc_provider_rsa), params={'code': code, 'state': query['state']})
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code,
nonce=nonce, provides_kid_header=True,
kid='coin'):
response = app.get(login_callback_url(oidc_provider_rsa),
params={'code': code, 'state': state})
# test missing kid
with utils.check_log(caplog, message='Key ID None not in key set', levelname='WARNING'):
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, nonce=nonce, provides_kid_header=True, kid=None):
response = app.get(login_callback_url(oidc_provider_rsa), params={'code': code, 'state': query['state']})
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code,
nonce=nonce, provides_kid_header=True,
kid=None):
response = app.get(login_callback_url(oidc_provider_rsa),
params={'code': code, 'state': state})
def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider_jwkset):
@ -807,11 +808,13 @@ def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider
response = app.get('/').maybe_follow()
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
query = check_simple_qs(urlparse.parse_qs(location.query))
nonce = app.session['auth_oidc'][query['state']]['request']['nonce']
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}).maybe_follow()
response = app.get(login_callback_url(oidc_provider),
params={'code': code, 'state': state}).maybe_follow()
assert User.objects.count() == 1
user = User.objects.first()
@ -822,3 +825,24 @@ def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider
assert user.last_name == 'DOE'
# typo in template string, no rendering
assert user.first_name == '{{ given_name'
def test_lost_state(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
response = app.get('/login/?next=/whatever/')
assert oidc_provider.name in response.text
response = response.click(oidc_provider.name)
qs = urlparse.parse_qs(urlparse.urlparse(response.location).query)
state = qs['state']
# reset the session to forget the state
app.cookiejar.clear()
caplog.clear()
with utils.norequest:
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
# not logged
assert re.match('^auth-oidc: state.*has been lost', caplog.records[-1].message)
# event is recorded
assert '_auth_user_id' not in app.session
# we are automatically redirected to our destination
assert response.location == '/accounts/oidc/login/%s/?next=/whatever/' % oidc_provider.pk

View File

@ -20,6 +20,7 @@ import base64
import socket
from contextlib import contextmanager, closing
import httmock
from lxml import etree
from django.core.management import call_command as django_call_command
@ -277,3 +278,9 @@ def assert_event(event_type_name, user=None, session=None, service=None, **data)
assert event.data.get(key) == value, (
'event.data[%s] != data[%s] (%s != %s)' % (key, key, event.data.get(key), value)
)
@httmock.HTTMock
@httmock.urlmatch()
def norequest(request, url):
assert False, 'no request should be done'