auth_oidc: use a signed state (#47825)
State is no more stored in the session, it's made using signing.dumps() instead, to be more resilient. It's associated to a cookie scoped to the callback path and the nonce created from the state id using an HMAC construction with settings.SECRET_KEY.
This commit is contained in:
parent
6cd84ac407
commit
7b002f861f
|
@ -14,12 +14,15 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import uuid
|
||||
import logging
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import requests
|
||||
|
||||
from django.conf import settings
|
||||
from django.core import signing
|
||||
from django.urls import reverse
|
||||
from django.utils.translation import get_language, ugettext as _
|
||||
from django.contrib import messages
|
||||
|
@ -34,22 +37,36 @@ from authentic2.utils import redirect, login, good_next_url, authenticate
|
|||
from . import app_settings, models
|
||||
from .utils import get_provider, get_provider_by_issuer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def make_nonce(state):
|
||||
return hashlib.sha256(state.encode() + settings.SECRET_KEY.encode()).hexdigest()
|
||||
|
||||
|
||||
@setting_enabled('ENABLE', settings=app_settings)
|
||||
def oidc_login(request, pk, next_url=None, *args, **kwargs):
|
||||
logger = logging.getLogger(__name__)
|
||||
provider = get_provider(pk)
|
||||
scopes = set(provider.scopes.split()) | set(['openid'])
|
||||
state = str(uuid.uuid4())
|
||||
nonce = request.GET.get('nonce') or str(uuid.uuid4())
|
||||
state_id = str(uuid.uuid4())
|
||||
next_url = next_url or request.GET.get(REDIRECT_FIELD_NAME, '')
|
||||
if next_url and not good_next_url(request, next_url):
|
||||
next_url = None
|
||||
nonce = make_nonce(state_id)
|
||||
display = set()
|
||||
prompt = set()
|
||||
state_content = {
|
||||
'state_id': state_id,
|
||||
'issuer': provider.issuer,
|
||||
}
|
||||
if next_url:
|
||||
state_content['next'] = next_url
|
||||
params = {
|
||||
'client_id': provider.client_id,
|
||||
'scope': ' '.join(scopes),
|
||||
'response_type': 'code',
|
||||
'redirect_uri': request.build_absolute_uri(reverse('oidc-login-callback')),
|
||||
'state': state,
|
||||
'state': signing.dumps(state_content),
|
||||
'nonce': nonce,
|
||||
}
|
||||
if provider.claims_parameter_supported:
|
||||
|
@ -70,15 +87,13 @@ def oidc_login(request, pk, next_url=None, *args, **kwargs):
|
|||
# FIXME: id_token_hint ?
|
||||
# FIXME: acr_values ?
|
||||
# save request state
|
||||
saved_state = request.session.setdefault('auth_oidc', {}).setdefault(state, {})
|
||||
saved_state['request'] = params
|
||||
saved_state['issuer'] = provider.issuer
|
||||
next_url = next_url or request.GET.get(REDIRECT_FIELD_NAME, '')
|
||||
if good_next_url(request, next_url):
|
||||
saved_state['next_url'] = next_url
|
||||
request.session.modified = True # necessary if auth_oidc already exists
|
||||
logger.debug('auth_oidc: sent request to authorization endpoint %r', params)
|
||||
return redirect(request, provider.authorization_endpoint, params=params, resolve=False)
|
||||
logger.debug('auth_oidc: sent request %s to authorization endpoint "%s"',
|
||||
params, provider.authorization_endpoint)
|
||||
response = redirect(request, provider.authorization_endpoint, params=params, resolve=False)
|
||||
response.set_cookie(
|
||||
'oidc-state', value=state_id, path=reverse('oidc-login-callback'),
|
||||
httponly=True, secure=request.is_secure())
|
||||
return response
|
||||
|
||||
|
||||
@setting_enabled('ENABLE', settings=app_settings)
|
||||
|
@ -89,78 +104,83 @@ def login_initiate(request, *args, **kwargs):
|
|||
try:
|
||||
provider = get_provider_by_issuer(issuer)
|
||||
except models.OIDCProvider.DoesNotExist:
|
||||
return HttpResponseBadRequest(u'unknown issuer %s' % issuer, content_type='text/plain')
|
||||
return HttpResponseBadRequest('unknown issuer %s' % issuer, content_type='text/plain')
|
||||
return oidc_login(request, pk=provider.pk, next_url=request.GET.get('target_link_uri'))
|
||||
|
||||
|
||||
class LoginCallback(View):
|
||||
def continue_to_next_url(self):
|
||||
return redirect(self.request,
|
||||
self.oidc_state.get('next_url', settings.LOGIN_REDIRECT_URL),
|
||||
resolve=False)
|
||||
next_url = None
|
||||
|
||||
def continue_to_next_url(self, request):
|
||||
if self.next_url:
|
||||
return redirect(request, self.next_url, resolve=False)
|
||||
else:
|
||||
return redirect(request, settings.LOGIN_REDIRECT_URL)
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
logger = logging.getLogger(__name__)
|
||||
response = self.handle_authorization_response(request)
|
||||
# clean the state cookie in all cases
|
||||
if 'oidc-state' in request.COOKIES:
|
||||
response.delete_cookie('oidc-state')
|
||||
return response
|
||||
|
||||
def handle_authorization_response(self, request):
|
||||
code = request.GET.get('code')
|
||||
state = request.GET.get('state')
|
||||
oidc_state = self.oidc_state = request.session.get('auth_oidc', {}).get(state)
|
||||
if not state or not oidc_state or 'request' not in oidc_state:
|
||||
messages.warning(request, _('Login with OpenIDConnect failed, state lost.'))
|
||||
logger.warning('auth_oidc: state lost')
|
||||
raw_state = request.GET.get('state')
|
||||
if not raw_state:
|
||||
return redirect(request, settings.LOGIN_REDIRECT_URL)
|
||||
oidc_request = oidc_state.get('request')
|
||||
assert isinstance(oidc_request, dict), 'state is not properly initialized'
|
||||
nonce = oidc_request.get('nonce')
|
||||
try:
|
||||
issuer = oidc_state.get('issuer')
|
||||
state_content = signing.loads(raw_state)
|
||||
except signing.BadSignature:
|
||||
return redirect(request, settings.LOGIN_REDIRECT_URL)
|
||||
|
||||
state = state_content['state_id']
|
||||
issuer = state_content['issuer']
|
||||
nonce = make_nonce(state)
|
||||
self.next_url = state_content.get('next')
|
||||
|
||||
try:
|
||||
provider = get_provider_by_issuer(issuer)
|
||||
except models.OIDCProvider.DoesNotExist:
|
||||
messages.warning(request, _('Unknown OpenID connect issuer'))
|
||||
messages.warning(request, _('Unknown OpenID connect issuer: "%s"') % issuer)
|
||||
logger.warning('auth_oidc: unknown issuer, %s', issuer)
|
||||
return self.continue_to_next_url()
|
||||
return self.continue_to_next_url(request)
|
||||
|
||||
# Check state
|
||||
if 'oidc-state' not in request.COOKIES or request.COOKIES['oidc-state'] != state:
|
||||
logger.warning('auth-oidc: state %s for issuer %s has been lost', state, issuer)
|
||||
params = {}
|
||||
if self.next_url:
|
||||
params['next'] = self.next_url
|
||||
response = redirect(request, 'oidc-login', kwargs={'pk': str(provider.pk)}, params=params)
|
||||
return response
|
||||
|
||||
# FIXME is idp initiated SSO allowed ? in this case state is maybe not mandatory
|
||||
if 'error' in request.GET: # error code path
|
||||
error_description = request.GET.get('error_description')
|
||||
error_url = request.GET.get('error_url')
|
||||
msg = u'auth_oidc: error received '
|
||||
if error_description:
|
||||
msg += u'%s (%s)' % (error_description, request.GET['error'])
|
||||
else:
|
||||
msg += request.GET['error']
|
||||
if error_url:
|
||||
msg += u' see %s' % error_url
|
||||
logger.warning(msg)
|
||||
if provider:
|
||||
messages.warning(request, _('Login with %(name)s failed, report %(request_id)s '
|
||||
'to an administrator.')
|
||||
% {
|
||||
'name': provider.name,
|
||||
'request_id': request.request_id,
|
||||
})
|
||||
else:
|
||||
messages.warning(request, _('Login with OpenIDConnect failed, report %s to an '
|
||||
'administrator') % request.request_id)
|
||||
return self.continue_to_next_url()
|
||||
if not code:
|
||||
return self.handle_error(request, provider)
|
||||
elif not code:
|
||||
messages.warning(request, _('Missing code, report %s to an administrator') %
|
||||
request.request_id)
|
||||
logger.warning('auth_oidc: missing code, %r', request.GET)
|
||||
return self.continue_to_next_url()
|
||||
return self.continue_to_next_url(request)
|
||||
else:
|
||||
return self.handle_code(request, provider, nonce, code)
|
||||
|
||||
def handle_code(self, request, provider, nonce, code):
|
||||
try:
|
||||
token_endpoint_request = {
|
||||
'grant_type': 'authorization_code',
|
||||
'code': code,
|
||||
'redirect_uri': request.build_absolute_uri(request.path),
|
||||
}
|
||||
logger.debug('auth_oidc: sent request to token endpoint %r', token_endpoint_request)
|
||||
logger.debug('auth_oidc: sent request %s to token endpoint "%s"',
|
||||
token_endpoint_request, token_endpoint_request)
|
||||
response = requests.post(provider.token_endpoint, data=token_endpoint_request,
|
||||
auth=(provider.client_id, provider.client_secret), timeout=10)
|
||||
response.raise_for_status()
|
||||
except requests.RequestException as e:
|
||||
logger.warning(
|
||||
'auth_oidc: failed to contact the token_endpoint for %(issuer)s, %(exception)s' % {
|
||||
'issuer': issuer,
|
||||
'issuer': provider.issuer,
|
||||
'exception': e,
|
||||
})
|
||||
messages.warning(request, _('Provider %(name)s is down, report %(request_id)s to '
|
||||
|
@ -169,11 +189,11 @@ class LoginCallback(View):
|
|||
'name': provider.name,
|
||||
'request_id': request.request_id,
|
||||
})
|
||||
return self.continue_to_next_url()
|
||||
return self.continue_to_next_url(request)
|
||||
try:
|
||||
result = response.json()
|
||||
except ValueError as e:
|
||||
logger.warning(u'auth_oidc: response from %s is not a JSON document, %s, %r' %
|
||||
logger.warning('auth_oidc: response from %s is not a JSON document, %s, %r' %
|
||||
(provider.token_endpoint, e, response.content))
|
||||
messages.warning(request, _('Provider %(name)s is down, report %(request_id)s to '
|
||||
'an administrator. ') %
|
||||
|
@ -181,13 +201,13 @@ class LoginCallback(View):
|
|||
'name': provider.name,
|
||||
'request_id': request.request_id,
|
||||
})
|
||||
return self.continue_to_next_url()
|
||||
return self.continue_to_next_url(request)
|
||||
# token_type is case insensitive, https://tools.ietf.org/html/rfc6749#section-4.2.2
|
||||
if ('access_token' not in result
|
||||
or 'token_type' not in result
|
||||
or result['token_type'].lower() != 'bearer'
|
||||
or 'id_token' not in result):
|
||||
logger.warning(u'auth_oidc: invalid token endpoint response from %s: %r' % (
|
||||
logger.warning('auth_oidc: invalid token endpoint response from %s: %r' % (
|
||||
provider.token_endpoint, result))
|
||||
messages.warning(request, _('Provider %(name)s is down, report %(request_id)s to '
|
||||
'an administrator. ') %
|
||||
|
@ -195,22 +215,49 @@ class LoginCallback(View):
|
|||
'name': provider.name,
|
||||
'request_id': request.request_id,
|
||||
})
|
||||
return self.continue_to_next_url()
|
||||
logger.info(u'got token response %s', result)
|
||||
return self.continue_to_next_url(request)
|
||||
logger.debug('auth_oidc: got token response %s', result)
|
||||
access_token = result.get('access_token')
|
||||
user = authenticate(request, access_token=access_token, nonce=nonce, id_token=result['id_token'], provider=provider)
|
||||
user = authenticate(
|
||||
request,
|
||||
access_token=access_token,
|
||||
nonce=nonce,
|
||||
id_token=result['id_token'],
|
||||
provider=provider)
|
||||
if user:
|
||||
# remember last tokens for logout
|
||||
login(request, user, 'oidc', nonce=nonce)
|
||||
tokens = request.session.setdefault('auth_oidc', {}).setdefault('tokens', [])
|
||||
tokens.append({
|
||||
'token_response': result,
|
||||
'provider_pk': provider.pk,
|
||||
})
|
||||
request.session.modified = True
|
||||
login(request, user, 'oidc', nonce=nonce)
|
||||
else:
|
||||
messages.warning(request, _('No user found'))
|
||||
return self.continue_to_next_url()
|
||||
return self.continue_to_next_url(request)
|
||||
|
||||
def handle_error(self, request, provider):
|
||||
error_description = request.GET.get('error_description')
|
||||
error_url = request.GET.get('error_url')
|
||||
msg = 'auth_oidc: error received '
|
||||
if error_description:
|
||||
msg += '%s (%s)' % (error_description, request.GET['error'])
|
||||
else:
|
||||
msg += request.GET['error']
|
||||
if error_url:
|
||||
msg += ' see %s' % error_url
|
||||
logger.warning(msg)
|
||||
if provider:
|
||||
messages.warning(request, _('Login with %(name)s failed, report %(request_id)s '
|
||||
'to an administrator.')
|
||||
% {
|
||||
'name': provider.name,
|
||||
'request_id': request.request_id,
|
||||
})
|
||||
else:
|
||||
messages.warning(request, _('Login with OpenIDConnect failed, report %s to an '
|
||||
'administrator') % request.request_id)
|
||||
return self.continue_to_next_url(request)
|
||||
|
||||
|
||||
login_callback = setting_enabled('ENABLE', settings=app_settings)(LoginCallback.as_view())
|
||||
|
|
|
@ -16,25 +16,27 @@
|
|||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import pytest
|
||||
import json
|
||||
import time
|
||||
import random
|
||||
import re
|
||||
import time
|
||||
|
||||
from jwcrypto.jwk import JWKSet, JWK
|
||||
from jwcrypto.jwt import JWT
|
||||
from jwcrypto.jws import JWS, InvalidJWSObject
|
||||
from jwcrypto.common import base64url_encode, base64url_decode, json_encode
|
||||
from jwcrypto.jwk import JWKSet, JWK
|
||||
from jwcrypto.jws import JWS, InvalidJWSObject
|
||||
from jwcrypto.jwt import JWT
|
||||
|
||||
from httmock import urlmatch, HTTMock
|
||||
|
||||
from django.urls import reverse
|
||||
from django.utils.timezone import utc
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_text, force_str
|
||||
from django.utils.timezone import now
|
||||
from django.http import QueryDict
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
from django.utils.timezone import now
|
||||
from django.utils.timezone import utc
|
||||
|
||||
from django_rbac.utils import get_ou_model
|
||||
|
||||
|
@ -261,8 +263,7 @@ def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token
|
|||
if extra_id_token:
|
||||
id_token.update(extra_id_token)
|
||||
|
||||
if oidc_provider.idtoken_algo in (OIDCProvider.ALGO_RSA,
|
||||
OIDCProvider.ALGO_EC):
|
||||
if oidc_provider.idtoken_algo in (OIDCProvider.ALGO_RSA, OIDCProvider.ALGO_EC):
|
||||
alg = {
|
||||
OIDCProvider.ALGO_RSA: 'RS256',
|
||||
OIDCProvider.ALGO_EC: 'ES256',
|
||||
|
@ -270,9 +271,9 @@ def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token
|
|||
jwk = None
|
||||
for key in oidc_provider_jwkset['keys']:
|
||||
if key.key_type == {
|
||||
OIDCProvider.ALGO_RSA: 'RSA',
|
||||
OIDCProvider.ALGO_EC: 'EC',
|
||||
}.get(oidc_provider.idtoken_algo):
|
||||
OIDCProvider.ALGO_RSA: 'RSA',
|
||||
OIDCProvider.ALGO_EC: 'EC',
|
||||
}.get(oidc_provider.idtoken_algo):
|
||||
jwk = key
|
||||
break
|
||||
if provides_kid_header:
|
||||
|
@ -281,7 +282,7 @@ def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token
|
|||
header = {'alg': alg, 'kid': jwk.key_id}
|
||||
jwt = JWT(header=header, claims=id_token)
|
||||
jwt.make_signed_token(jwk)
|
||||
else: # hmac
|
||||
else: # hmac
|
||||
jwt = JWT(header={'alg': 'HS256'},
|
||||
claims=id_token)
|
||||
k = base64url_encode(oidc_provider.client_secret.encode('utf-8'))
|
||||
|
@ -346,13 +347,6 @@ def login_callback_url(oidc_provider):
|
|||
return reverse('oidc-login-callback')
|
||||
|
||||
|
||||
def check_simple_qs(qs):
|
||||
for k in qs:
|
||||
assert len(qs[k]) == 1
|
||||
qs[k] = qs[k][0]
|
||||
return qs
|
||||
|
||||
|
||||
def test_providers_on_login_page(oidc_provider, app):
|
||||
response = app.get('/login/')
|
||||
# two frontends should be present on login page
|
||||
|
@ -381,7 +375,7 @@ def test_providers_on_login_page(oidc_provider, app):
|
|||
|
||||
|
||||
def test_login_with_conditional_authenticators(oidc_provider, app, settings, caplog):
|
||||
oidc2_provider = OIDCProvider.objects.create(
|
||||
OIDCProvider.objects.create(
|
||||
id=2,
|
||||
ou=get_default_ou(),
|
||||
name='My IDP',
|
||||
|
@ -482,7 +476,6 @@ def test_login_autorun(oidc_provider, app, settings):
|
|||
assert response['Location'] == '/accounts/oidc/login/%s/' % oidc_provider.pk
|
||||
|
||||
|
||||
|
||||
def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
|
||||
OU = get_ou_model()
|
||||
cassis = OU.objects.create(name='Cassis', slug='cassis')
|
||||
|
@ -495,14 +488,13 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
|
|||
assert location.scheme == endpoint.scheme
|
||||
assert location.netloc == endpoint.netloc
|
||||
assert location.path == endpoint.path
|
||||
query = check_simple_qs(urlparse.parse_qs(location.query))
|
||||
assert query['state'] in app.session['auth_oidc']
|
||||
query = QueryDict(location.query)
|
||||
state = query['state']
|
||||
assert query['response_type'] == 'code'
|
||||
assert query['client_id'] == str(oidc_provider.client_id)
|
||||
assert query['scope'] == 'openid'
|
||||
assert query['redirect_uri'] == 'http://testserver' + reverse('oidc-login-callback')
|
||||
# get the nonce
|
||||
nonce = app.session['auth_oidc'][query['state']]['request']['nonce']
|
||||
nonce = query['nonce']
|
||||
|
||||
if oidc_provider.claims_parameter_supported:
|
||||
claims = json.loads(query['claims'])
|
||||
|
@ -517,34 +509,34 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
|
|||
|
||||
with utils.check_log(caplog, 'failed to contact the token_endpoint'):
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': 'yyyy', 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': 'yyyy', 'state': state})
|
||||
with utils.check_log(caplog, 'invalid id_token'):
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
||||
extra_id_token={'iss': None}):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
with utils.check_log(caplog, 'invalid id_token'):
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
||||
extra_id_token={'sub': None}):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
with utils.check_log(caplog, 'authentication is too old'):
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
||||
extra_id_token={'iat': 1}):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
with utils.check_log(caplog, 'invalid id_token'):
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
||||
extra_id_token={'exp': 1}):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
with utils.check_log(caplog, 'invalid id_token audience'):
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
||||
extra_id_token={'aud': 'zz'}):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
with utils.check_log(caplog, 'expected nonce'):
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
assert not hooks.auth_oidc_backend_modify_user
|
||||
with utils.check_log(caplog, 'created user'):
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
assert len(hooks.auth_oidc_backend_modify_user) == 1
|
||||
assert set(hooks.auth_oidc_backend_modify_user[0]['kwargs']) >= set(
|
||||
['user', 'provider', 'user_info', 'id_token', 'access_token'])
|
||||
|
@ -564,19 +556,19 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
|
|||
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
||||
extra_user_info={'family_name_verified': True}, nonce=nonce):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
assert AttributeValue.objects.filter(content='Doe', verified=False).count() == 0
|
||||
assert AttributeValue.objects.filter(content='Doe', verified=True).count() == 1
|
||||
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
||||
extra_user_info={'ou': 'cassis'}, nonce=nonce):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
assert User.objects.count() == 1
|
||||
user = User.objects.get()
|
||||
assert user.ou == cassis
|
||||
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
assert User.objects.count() == 1
|
||||
user = User.objects.get()
|
||||
assert user.ou == get_default_ou()
|
||||
|
@ -585,7 +577,7 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
|
|||
time.sleep(0.1)
|
||||
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
assert User.objects.count() == 1
|
||||
user = User.objects.get()
|
||||
assert user.ou == get_default_ou()
|
||||
|
@ -626,18 +618,19 @@ def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwks
|
|||
assert oidc_provider.name in response.text
|
||||
response = response.click(oidc_provider.name)
|
||||
location = urlparse.urlparse(response.location)
|
||||
query = check_simple_qs(urlparse.parse_qs(location.query))
|
||||
nonce = app.session['auth_oidc'][query['state']]['request']['nonce']
|
||||
query = QueryDict(location.query)
|
||||
state = query['state']
|
||||
nonce = query['nonce']
|
||||
|
||||
# sub=john.doe, MUST not work
|
||||
with utils.check_log(caplog, 'cannot create user'):
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
|
||||
# sub=simple_user.uuid MUST work
|
||||
with utils.check_log(caplog, 'found user using UUID'):
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, sub=simple_user.uuid, nonce=nonce):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
|
||||
assert urlparse.urlparse(response['Location']).path == '/'
|
||||
assert User.objects.count() == 1
|
||||
|
@ -668,24 +661,25 @@ def test_strategy_create(app, caplog, code, oidc_provider, oidc_provider_jwkset)
|
|||
assert oidc_provider.name in response.text
|
||||
response = response.click(oidc_provider.name)
|
||||
location = urlparse.urlparse(response.location)
|
||||
query = check_simple_qs(urlparse.parse_qs(location.query))
|
||||
nonce = app.session['auth_oidc'][query['state']]['request']['nonce']
|
||||
query = QueryDict(location.query)
|
||||
state = query['state']
|
||||
nonce = query['nonce']
|
||||
|
||||
# sub=john.doe
|
||||
with utils.check_log(caplog, 'auth_oidc: created user'):
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
assert User.objects.count() == 1
|
||||
|
||||
# second time
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
assert User.objects.count() == 1
|
||||
|
||||
# different sub, same user
|
||||
with utils.check_log(caplog, 'auth_oidc: changed user'):
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, sub='other', nonce=nonce):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
assert User.objects.count() == 1
|
||||
|
||||
|
||||
|
@ -742,18 +736,25 @@ def test_invalid_kid(app, caplog, code, oidc_provider_rsa,
|
|||
assert oidc_provider_rsa.name in response.text
|
||||
response = response.click(oidc_provider_rsa.name)
|
||||
location = urlparse.urlparse(response.location)
|
||||
query = check_simple_qs(urlparse.parse_qs(location.query))
|
||||
nonce = app.session['auth_oidc'][query['state']]['request']['nonce']
|
||||
query = QueryDict(location.query)
|
||||
state = query['state']
|
||||
nonce = query['nonce']
|
||||
|
||||
# test invalid kid
|
||||
with utils.check_log(caplog, message='not in key set', levelname='WARNING'):
|
||||
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, nonce=nonce, provides_kid_header=True, kid='coin'):
|
||||
response = app.get(login_callback_url(oidc_provider_rsa), params={'code': code, 'state': query['state']})
|
||||
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code,
|
||||
nonce=nonce, provides_kid_header=True,
|
||||
kid='coin'):
|
||||
response = app.get(login_callback_url(oidc_provider_rsa),
|
||||
params={'code': code, 'state': state})
|
||||
|
||||
# test missing kid
|
||||
with utils.check_log(caplog, message='Key ID None not in key set', levelname='WARNING'):
|
||||
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, nonce=nonce, provides_kid_header=True, kid=None):
|
||||
response = app.get(login_callback_url(oidc_provider_rsa), params={'code': code, 'state': query['state']})
|
||||
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code,
|
||||
nonce=nonce, provides_kid_header=True,
|
||||
kid=None):
|
||||
response = app.get(login_callback_url(oidc_provider_rsa),
|
||||
params={'code': code, 'state': state})
|
||||
|
||||
|
||||
def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider_jwkset):
|
||||
|
@ -807,11 +808,13 @@ def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider
|
|||
response = app.get('/').maybe_follow()
|
||||
response = response.click(oidc_provider.name)
|
||||
location = urlparse.urlparse(response.location)
|
||||
query = check_simple_qs(urlparse.parse_qs(location.query))
|
||||
nonce = app.session['auth_oidc'][query['state']]['request']['nonce']
|
||||
query = QueryDict(location.query)
|
||||
state = query['state']
|
||||
nonce = query['nonce']
|
||||
|
||||
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}).maybe_follow()
|
||||
response = app.get(login_callback_url(oidc_provider),
|
||||
params={'code': code, 'state': state}).maybe_follow()
|
||||
|
||||
assert User.objects.count() == 1
|
||||
user = User.objects.first()
|
||||
|
@ -822,3 +825,24 @@ def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider
|
|||
assert user.last_name == 'DOE'
|
||||
# typo in template string, no rendering
|
||||
assert user.first_name == '{{ given_name'
|
||||
|
||||
|
||||
def test_lost_state(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
|
||||
response = app.get('/login/?next=/whatever/')
|
||||
assert oidc_provider.name in response.text
|
||||
response = response.click(oidc_provider.name)
|
||||
qs = urlparse.parse_qs(urlparse.urlparse(response.location).query)
|
||||
state = qs['state']
|
||||
|
||||
# reset the session to forget the state
|
||||
app.cookiejar.clear()
|
||||
|
||||
caplog.clear()
|
||||
with utils.norequest:
|
||||
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
||||
# not logged
|
||||
assert re.match('^auth-oidc: state.*has been lost', caplog.records[-1].message)
|
||||
# event is recorded
|
||||
assert '_auth_user_id' not in app.session
|
||||
# we are automatically redirected to our destination
|
||||
assert response.location == '/accounts/oidc/login/%s/?next=/whatever/' % oidc_provider.pk
|
||||
|
|
|
@ -20,6 +20,7 @@ import base64
|
|||
import socket
|
||||
from contextlib import contextmanager, closing
|
||||
|
||||
import httmock
|
||||
from lxml import etree
|
||||
|
||||
from django.core.management import call_command as django_call_command
|
||||
|
@ -277,3 +278,9 @@ def assert_event(event_type_name, user=None, session=None, service=None, **data)
|
|||
assert event.data.get(key) == value, (
|
||||
'event.data[%s] != data[%s] (%s != %s)' % (key, key, event.data.get(key), value)
|
||||
)
|
||||
|
||||
|
||||
@httmock.HTTMock
|
||||
@httmock.urlmatch()
|
||||
def norequest(request, url):
|
||||
assert False, 'no request should be done'
|
||||
|
|
Loading…
Reference in New Issue