auth_oidc: support signed authz requests through jwt bearer grants (#36966)
gitea/authentic/pipeline/head Build started... Details

This commit is contained in:
Paul Marillonnet 2020-07-06 14:38:32 +02:00
parent b31754c4ae
commit 4ae0881df1
4 changed files with 245 additions and 63 deletions

View File

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2020-07-22 12:52
from __future__ import unicode_literals
import authentic2_auth_oidc.models
import django.contrib.postgres.fields.jsonb
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('authentic2_auth_oidc', '0007_auto_20200317_1732'),
]
operations = [
migrations.AddField(
model_name='oidcprovider',
name='request_signature_jwkset_json',
field=django.contrib.postgres.fields.jsonb.JSONField(blank=True, null=True, validators=[authentic2_auth_oidc.models.validate_jwkset], verbose_name='JSON WebKeyset used for outgoing signatures'),
),
migrations.AddField(
model_name='oidcprovider',
name='request_signature_supported',
field=models.BooleanField(default=False, verbose_name='Request signature supported'),
),
]

View File

@ -119,6 +119,14 @@ class OIDCProvider(models.Model):
claims_parameter_supported = models.BooleanField(
verbose_name=_('Claims parameter supported'),
default=False)
request_signature_supported = models.BooleanField(
verbose_name=_('Request signature supported'),
default=False)
request_signature_jwkset_json = JSONField(
verbose_name=_('JSON WebKeyset used for outgoing signatures'),
null=True,
blank=True,
validators=[validate_jwkset])
# ou where new users should be created
strategy = models.CharField(
@ -158,6 +166,14 @@ class OIDCProvider(models.Model):
return JWKSet.from_json(json.dumps(self.jwkset_json))
return None
@property
def request_signature_jwkset(self):
if self.request_signature_jwkset_json:
return JWKSet.from_json(json.dumps(
self.request_signature_jwkset_json))
return None
def __str__(self):
return self.name

View File

@ -14,12 +14,16 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import uuid
import logging
import json
import requests
from jwcrypto.common import JWException
from jwcrypto.jwt import JWT
from django.urls import reverse
from django.utils.translation import get_language, ugettext as _
from django.contrib import messages
@ -142,17 +146,72 @@ class LoginCallback(View):
messages.warning(request, _('Login with OpenIDConnect failed, report %s to an '
'administrator') % request.request_id)
return self.continue_to_next_url()
if not code:
messages.warning(request, _('Missing code, report %s to an administrator') %
request.request_id)
logger.warning('auth_oidc: missing code, %r', request.GET)
return self.continue_to_next_url()
try:
if not provider.request_signature_supported:
if not code:
messages.warning(request, _('Missing code, report %s to an administrator') %
request.request_id)
logger.warning('auth_oidc: missing code, %r', request.GET)
return self.continue_to_next_url()
token_endpoint_request = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': request.build_absolute_uri(request.path),
}
elif code:
logger.warning('authz code provided but grant type is signed '
'JWT. authz code \'%s\' will be ignored' % code)
messages.warning(request, _('Provider %s configured for signed JWT grant but an '
'authorization code was provided') % provider.issuer)
return self.continue_to_next_url()
else:
# JWT Bearer authz through OAuth assertion framework - see RFC 7523
sign_key = None
for key in provider.request_signature_jwkset:
if key.key_type in ['EC', 'RSA', 'HMAC']:
sign_key = key
break
if not sign_key:
messages.warning(request, _('Provider %s configured for signed JWT grant but no '
'signature key could be retrieved.') % provider.issuer)
logger.warning(
'auth_oidc: provider %s has no jwt grant signature key' % provider.issuer)
return self.continue_to_next_url()
header = {
# FIXME do not hard-code key length
'alg': {'EC': 'ES256', 'RSA': 'RS256', 'HMAC': 'HS256'}.get(sign_key.key_type),
'typ': 'authz JWT',
'cty': 'JWT',
'kid': sign_key.key_id,
}
now = datetime.datetime.now()
exp = now + datetime.timedelta(hours=1)
claims = {
'iss': 'client %s' % provider.client_id,
'sub': '', # resource owner is not know yet
'aud': 'provider %s' % provider.issuer,
'iat': int(now.timestamp()),
'exp': int(exp.timestamp()),
}
jwt = JWT(header=header, claims=claims)
try:
jwt.make_signed_token(key=sign_key)
jwt = jwt.serialize()
except JWException as e:
logger.error('error during jwt grant serialization: %s' % e)
messages.warning(
request,
_('Error during grant request issuance, report %s to an administrator') %
request.request_id)
return self.continue_to_next_url()
token_endpoint_request = {
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
'assertion': jwt,
'scope': provider.scopes,
}
try:
logger.debug('auth_oidc: sent request to token endpoint %r', token_endpoint_request)
response = requests.post(provider.token_endpoint, data=token_endpoint_request,
auth=(provider.client_id, provider.client_secret), timeout=10)

View File

@ -117,6 +117,17 @@ def oidc_provider_jwkset():
jwkset.add(key_ec)
return jwkset
@pytest.fixture
def request_signature_jwkset_json():
key_rsa = JWK.generate(kty='RSA', size=512, kid=KID_RSA)
key_ec = JWK.generate(kty='EC', size=256, kid=KID_EC)
jwkset = JWKSet()
jwkset.add(key_rsa)
jwkset.add(key_ec)
return json.loads(jwkset.export(private_keys=True))
OIDC_PROVIDER_PARAMS = [
{},
{
@ -246,63 +257,18 @@ def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token
@urlmatch(netloc=token_endpoint.netloc, path=token_endpoint.path)
def token_endpoint_mock(url, request):
if urlparse.parse_qs(request.body).get('code') == [code]:
exp = now() + datetime.timedelta(seconds=10)
id_token = {
'iss': oidc_provider.issuer,
'sub': sub,
'iat': int(now().timestamp()),
'aud': str(oidc_provider.client_id),
'exp': int(exp.timestamp()),
'name': 'doe',
}
if nonce:
id_token['nonce'] = nonce
if extra_id_token:
id_token.update(extra_id_token)
if oidc_provider.request_signature_supported:
parsed = urlparse.parse_qs(request.body)
assert len(parsed.get('assertion')) == 1
assert parsed.get('grant_type') == ['urn:ietf:params:oauth:grant-type:jwt-bearer']
assertion = parsed.get('assertion')[0]
assert len(assertion.split('.')) == 3 # header, payload, signature
if oidc_provider.idtoken_algo in (OIDCProvider.ALGO_RSA,
OIDCProvider.ALGO_EC):
alg = {
OIDCProvider.ALGO_RSA: 'RS256',
OIDCProvider.ALGO_EC: 'ES256',
}.get(oidc_provider.idtoken_algo)
jwk = None
for key in oidc_provider_jwkset['keys']:
if key.key_type == {
OIDCProvider.ALGO_RSA: 'RSA',
OIDCProvider.ALGO_EC: 'EC',
}.get(oidc_provider.idtoken_algo):
jwk = key
break
if provides_kid_header:
header = {'alg': alg, 'kid': kid}
else:
header = {'alg': alg, 'kid': jwk.key_id}
jwt = JWT(header=header, claims=id_token)
jwt.make_signed_token(jwk)
else: # hmac
jwt = JWT(header={'alg': 'HS256'},
claims=id_token)
k = base64url_encode(oidc_provider.client_secret.encode('utf-8'))
jwt.make_signed_token(
JWK(kty='oct',
k=force_text(k)))
content = {
'access_token': '1234',
# check token_type is case insensitive
'token_type': random.choice(['B', 'b']) + 'earer',
'id_token': jwt.serialize(),
}
return {
'content': json.dumps(content),
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
}
else:
# JWT deserialization:
jwt = JWT()
jwt.deserialize(jwt=assertion, key=oidc_provider.request_signature_jwkset)
# todo check no claim are missing claim
elif urlparse.parse_qs(request.body).get('code') != [code]:
return {
'content': json.dumps({'error': 'invalid request'}),
'headers': {
@ -310,6 +276,61 @@ def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token
},
'status_code': 400,
}
exp = now() + datetime.timedelta(seconds=10)
id_token = {
'iss': oidc_provider.issuer,
'sub': sub,
'iat': int(now().timestamp()),
'aud': str(oidc_provider.client_id),
'exp': int(exp.timestamp()),
'name': 'doe',
}
if nonce:
id_token['nonce'] = nonce
if extra_id_token:
id_token.update(extra_id_token)
if oidc_provider.idtoken_algo in (OIDCProvider.ALGO_RSA,
OIDCProvider.ALGO_EC):
alg = {
OIDCProvider.ALGO_RSA: 'RS256',
OIDCProvider.ALGO_EC: 'ES256',
}.get(oidc_provider.idtoken_algo)
jwk = None
for key in oidc_provider_jwkset['keys']:
if key.key_type == {
OIDCProvider.ALGO_RSA: 'RSA',
OIDCProvider.ALGO_EC: 'EC',
}.get(oidc_provider.idtoken_algo):
jwk = key
break
if provides_kid_header:
header = {'alg': alg, 'kid': kid}
else:
header = {'alg': alg, 'kid': jwk.key_id}
jwt = JWT(header=header, claims=id_token)
jwt.make_signed_token(jwk)
else: # hmac
jwt = JWT(header={'alg': 'HS256'},
claims=id_token)
k = base64url_encode(oidc_provider.client_secret.encode('utf-8'))
jwt.make_signed_token(
JWK(kty='oct',
k=force_text(k)))
content = {
'access_token': '1234',
# check token_type is case insensitive
'token_type': random.choice(['B', 'b']) + 'earer',
'id_token': jwt.serialize(),
}
return {
'content': json.dumps(content),
'headers': {
'content-type': 'application/json',
},
'status_code': 200,
}
@urlmatch(netloc=userinfo_endpoint.netloc, path=userinfo_endpoint.path)
def user_info_endpoint_mock(url, request):
@ -482,7 +503,6 @@ def test_login_autorun(oidc_provider, app, settings):
assert response['Location'] == '/accounts/oidc/login/%s/' % oidc_provider.pk
def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
OU = get_ou_model()
cassis = OU.objects.create(name='Cassis', slug='cassis')
@ -598,6 +618,66 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
assert response.location.startswith('https://server.example.com/logout?')
def test_jwt_bearer_authz_grant(app, caplog, code, oidc_provider, oidc_provider_jwkset, request_signature_jwkset_json, hooks):
OU = get_ou_model()
oidc_provider.request_signature_supported = True
oidc_provider.request_signature_jwkset_json = request_signature_jwkset_json
oidc_provider.save()
cassis = OU.objects.create(name='Cassis', slug='cassis')
response = app.get('/admin/').maybe_follow()
assert oidc_provider.name in response.text
response = response.click(oidc_provider.name)
location = urlparse.urlparse(response.location)
endpoint = urlparse.urlparse(oidc_provider.authorization_endpoint)
assert location.scheme == endpoint.scheme
assert location.netloc == endpoint.netloc
assert location.path == endpoint.path
User = get_user_model()
assert User.objects.count() == 0
query = check_simple_qs(urlparse.parse_qs(location.query))
assert query['state'] in app.session['auth_oidc']
assert query['response_type'] == 'code'
assert query['client_id'] == str(oidc_provider.client_id)
assert query['scope'] == 'openid'
assert query['redirect_uri'] == 'http://testserver' + reverse('oidc-login-callback')
nonce = app.session['auth_oidc'][query['state']]['request']['nonce']
if oidc_provider.claims_parameter_supported:
claims = json.loads(query['claims'])
assert claims['id_token']['sub'] is None
assert claims['userinfo']['email']['essential']
assert claims['userinfo']['given_name']['essential']
assert claims['userinfo']['family_name']['essential']
assert claims['userinfo']['ou'] is None
with utils.check_log(caplog, 'authz code provided but grant type is signed JWT'):
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
assert len(hooks.auth_oidc_backend_modify_user) == 0
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, '', nonce=nonce):
response = app.get(login_callback_url(oidc_provider), {'state': query['state']})
assert len(hooks.auth_oidc_backend_modify_user) == 1
assert set(hooks.auth_oidc_backend_modify_user[0]['kwargs']) >= set(
['user', 'provider', 'user_info', 'id_token', 'access_token'])
assert urlparse.urlparse(response['Location']).path == '/admin/'
assert User.objects.count() == 1
user = User.objects.get()
assert user.ou == get_default_ou()
assert user.username == 'john.doe'
assert user.first_name == 'John'
assert user.last_name == 'Doe'
assert user.email == 'john.doe@example.com'
assert user.attributes.first_name == 'John'
assert user.attributes.last_name == 'Doe'
assert AttributeValue.objects.filter(content='John', verified=True).count() == 1
assert AttributeValue.objects.filter(content='Doe', verified=False).count() == 1
assert last_authentication_event(session=app.session)['nonce'] == nonce
def test_show_on_login_page(app, oidc_provider):
response = app.get('/login/')
assert 'oidc-a-oididp' in response.text