auth_oidc: check provider.jwkset before using it (#75786) #180

Merged
bdauvergne merged 1 commits from wip/75786-auth-oidc-InvalidJWSSignature-qu into main 2023-11-27 10:36:47 +01:00
3 changed files with 51 additions and 6 deletions

View File

@ -56,6 +56,14 @@ class OIDCBackend(ModelBackend):
id_token = utils.IDToken(id_token)
id_token.deserialize(provider)
except utils.IDTokenError as e:
messages.warning(
request, _('OpenIDConnect provider {provider} is currently down.').format(provider=provider)
)
if settings.DEBUG:
messages.warning(
request,
_('Unable to validate the idtoken: {error}').format(id_token=original_id_token, error=e),
)
logger.warning('auth_oidc: invalid id_token %s: %s', original_id_token, e)
return None

View File

@ -64,7 +64,7 @@ def parse_id_token(encoded, provider):
try:
if alg in ('RS256', 'RS384', 'RS512', 'ES256', 'ES384', 'ES512'):
kid = header.get('kid', None)
key = provider.jwkset.get_key(kid=kid)
key = provider.jwkset and provider.jwkset.get_key(kid=kid)
if not key:
raise IDTokenError(_('Key ID %r not in key set') % kid)
elif alg in ('HS256', 'HS384', 'HS512'):

View File

@ -256,10 +256,12 @@ def oidc_provider_mock(
nonce=None,
provides_kid_header=False,
kid=None,
idtoken_algo=None,
):
token_endpoint = urllib.parse.urlparse(oidc_provider.token_endpoint)
userinfo_endpoint = urllib.parse.urlparse(oidc_provider.userinfo_endpoint)
token_revocation_endpoint = urllib.parse.urlparse(oidc_provider.token_revocation_endpoint)
idtoken_algo = idtoken_algo or oidc_provider.idtoken_algo
@urlmatch(netloc=token_endpoint.netloc, path=token_endpoint.path)
def token_endpoint_mock(url, request):
@ -278,17 +280,17 @@ def oidc_provider_mock(
if extra_id_token:
id_token.update(extra_id_token)
if oidc_provider.idtoken_algo in (OIDCProvider.ALGO_RSA, OIDCProvider.ALGO_EC):
if idtoken_algo in (OIDCProvider.ALGO_RSA, OIDCProvider.ALGO_EC):
alg = {
OIDCProvider.ALGO_RSA: 'RS256',
OIDCProvider.ALGO_EC: 'ES256',
}.get(oidc_provider.idtoken_algo)
}.get(idtoken_algo)
jwk = None
for key in oidc_provider_jwkset['keys']:
if key.key_type == {
OIDCProvider.ALGO_RSA: 'RSA',
OIDCProvider.ALGO_EC: 'EC',
}.get(oidc_provider.idtoken_algo):
}.get(idtoken_algo):
jwk = key
break
if provides_kid_header:
@ -574,7 +576,7 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
assert not hooks.auth_oidc_backend_modify_user
assert len(utils.decode_cookie(app.cookies['messages'])) == 1
assert len(utils.decode_cookie(app.cookies['messages'])) == 5
alt_state_content = crypto.loads(state)
alt_state_content['prompt'] = ['none']
with utils.check_log(caplog, 'consent_required'):
@ -584,7 +586,7 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
params={'error': 'consent_required', 'state': crypto.dumps(alt_state_content)},
)
# prompt=none, no message displayed to end user, no additional set cookie
assert len(utils.decode_cookie(app.cookies['messages'])) == 1
assert len(utils.decode_cookie(app.cookies['messages'])) == 5
alt_state_content = crypto.loads(state)
alt_state_content['prompt'] = ['whatever'] # any value other than none
with utils.check_log(caplog, 'some_other_error'):
@ -1640,3 +1642,38 @@ def test_passive_login_main_view_deactivated(get_provider, rf):
response = passive_login(req, next_url='/manage/')
assert response is None
def test_missing_jwkset(app, caplog, code, simple_user, oidc_provider_jwkset, settings):
provider1 = make_oidc_provider(idtoken_algo=OIDCProvider.ALGO_HMAC, name='provider1')
response = app.get('/').maybe_follow()
response = response.click('provider1')
location = urllib.parse.urlparse(response.location)
query = QueryDict(location.query)
state = query['state']
nonce = query['nonce']
# sub=john.doe
with oidc_provider_mock(
provider1,
oidc_provider_jwkset,
code,
nonce=nonce,
idtoken_algo=OIDCProvider.ALGO_RSA,
extra_id_token={'email': simple_user.email},
extra_user_info={'email': simple_user.email},
):
response = app.get(login_callback_url(provider1), params={'code': code, 'state': state})
response = response.maybe_follow()
assert [elt.text() for elt in response.pyquery('.messages .warning').items()] == [
'OpenIDConnect provider OpenID Connect - provider1 is currently down.',
]
settings.DEBUG = True
response = app.get(login_callback_url(provider1), params={'code': code, 'state': state})
response = response.maybe_follow()
assert [elt.text() for elt in response.pyquery('.messages .warning').items()] == [
'OpenIDConnect provider OpenID Connect - provider1 is currently down.',
'Unable to validate the idtoken: Key ID \'1e9gdk7\' not in key set',
]