wip/50571-OIDC-entete-Access-Control-Allow #151

Merged
bdauvergne merged 3 commits from wip/50571-OIDC-entete-Access-Control-Allow into main 2023-10-12 17:22:53 +02:00
4 changed files with 139 additions and 21 deletions

View File

@ -180,7 +180,7 @@ class OIDCClient(Service):
raise ValueError('redirect_uri length > %s' % app_settings.REDIRECT_URI_MAX_LENGTH)
parsed_uri = urllib.parse.urlparse(redirect_uri)
for valid_redirect_uri in self.redirect_uris.split():
for valid_redirect_uri in self.get_redirect_uris():
parsed_valid_uri = urllib.parse.urlparse(valid_redirect_uri)
if parsed_uri.scheme != parsed_valid_uri.scheme:
continue
@ -215,7 +215,7 @@ class OIDCClient(Service):
if self.sector_identifier_uri:
sector_identifier = utils.url_domain(self.sector_identifier_uri)
else:
for redirect_uri in self.redirect_uris.split():
for redirect_uri in self.get_redirect_uris():
hostname = utils.url_domain(redirect_uri)
if sector_identifier is None:
sector_identifier = hostname
@ -230,7 +230,7 @@ class OIDCClient(Service):
return sector_identifier
def get_base_urls(self):
return super().get_base_urls() + [url for url in self.redirect_uris.split() if url]
return super().get_base_urls() + [url for url in self.get_redirect_uris() if url]
def __repr__(self):
return '<OIDCClient name:%r client_id:%r identifier_policy:%r>' % (
@ -262,6 +262,9 @@ class OIDCClient(Service):
return {}
return {'id': utils.make_sub(self, user)}
def get_redirect_uris(self):
return filter(None, self.redirect_uris.split())
class OIDCAuthorization(models.Model):
client_ct = models.ForeignKey(

View File

@ -40,7 +40,7 @@ from authentic2.a2_rbac.models import OrganizationalUnit
from authentic2.custom_user.models import Profile
from authentic2.decorators import setting_enabled
from authentic2.exponential_retry_timeout import ExponentialRetryTimeout
from authentic2.utils import hooks
from authentic2.utils import cors, hooks
from authentic2.utils.misc import last_authentication_event, login_require, make_url, redirect
from authentic2.utils.service import set_service
from authentic2.utils.view_decorators import check_view_restriction
@ -189,6 +189,10 @@ class WrongClientSecret(InvalidClient):
super().__init__(*args, **kwargs)
class CORSInvalidOrigin(OIDCException):
error_code = 'invalid_origin'
def idtoken_duration(client):
return client.idtoken_duration or datetime.timedelta(seconds=app_settings.IDTOKEN_DURATION)
@ -224,12 +228,16 @@ def openid_configuration(request, *args, **kwargs):
'frontchannel_logout_supported': True,
'frontchannel_logout_session_supported': True,
}
return JsonResponse(metadata)
response = JsonResponse(metadata)
cors.set_headers(response, origin='*')
return response
@setting_enabled('ENABLE', settings=app_settings)
def certs(request, *args, **kwargs):
return HttpResponse(utils.get_jwkset().export(private_keys=False), content_type='application/json')
response = HttpResponse(utils.get_jwkset().export(private_keys=False), content_type='application/json')
cors.set_headers(response, origin='*')
return response
@check_view_restriction
@ -257,9 +265,7 @@ def authorize(request, *args, **kwargs):
except ValueError:
error_description = _('Redirect URI "%s" is unknown.') % redirect_uri
if settings.DEBUG:
error_description += _(' Known redirect URIs are: %s') % ', '.join(
client.redirect_uris.split()
)
error_description += _(' Known redirect URIs are: %s') % ', '.join(client.get_redirect_uris())
raise InvalidRequest(error_description)
use_fragment = client.authorization_flow == client.FLOW_IMPLICIT
validated_redirect_uri = redirect_uri
@ -584,7 +590,7 @@ def is_ro_cred_grant_ratelimited(request, key='ip', increment=True):
)
def authenticate_client(request, ratelimit=False, client=None):
def authenticate_client(request):
'''Authenticate client on the token endpoint'''
if 'authorization' in request.headers:
@ -599,12 +605,16 @@ def authenticate_client(request, ratelimit=False, client=None):
raise InvalidClient(_('Empty client identifier'))
if not client_secret:
raise InvalidRequest('missing client_secret', client=client)
raise InvalidRequest('missing client_secret')
client = get_client(client_id)
if not client:
raise InvalidClient(_('Wrong client identifier: %s') % client_id)
if cors.is_cors_request(request):
if not cors.is_good_origin(request, client.get_redirect_uris()):
raise CORSInvalidOrigin(_('Your Origin header does not match the configured redirect_uris'))
return authenticate_client_secret(client, client_secret)
@ -614,7 +624,7 @@ def idtoken_from_user_credential(request):
raise InvalidRequest('Rate limit exceeded for IP address "%s"' % request.META.get('REMOTE_ADDR', ''))
try:
client = authenticate_client(request, ratelimit=True, client=None)
client = authenticate_client(request)
except InvalidClient:
# increment rate limit by IP
if is_ro_cred_grant_ratelimited(request):
@ -818,6 +828,8 @@ def tokens_from_authz_code(request):
@setting_enabled('ENABLE', settings=app_settings)
@csrf_exempt
def token(request, *args, **kwargs):
if cors.is_preflight_request(request):
return cors.preflight_response(request, origin=request, methods=('POST',))
if request.method != 'POST':
return HttpResponseNotAllowed(['POST'])
grant_type = request.POST.get('grant_type')
@ -858,6 +870,9 @@ def authenticate_access_token(request):
@setting_enabled('ENABLE', settings=app_settings)
@csrf_exempt
def user_info(request, *args, **kwargs):
if cors.is_preflight_request(request):
return cors.preflight_response(request, origin='*', headers=('x-requested-with', 'authorization'))
try:
access_token = authenticate_access_token(request)
user_info = utils.create_user_info(
@ -867,7 +882,7 @@ def user_info(request, *args, **kwargs):
access_token.scope_set(),
profile=access_token.profile,
)
return JsonResponse(user_info)
response = JsonResponse(user_info)
except OIDCException as e:
error_response = e.json_response(request, endpoint='user_info')
if e.status == 401:
@ -875,7 +890,9 @@ def user_info(request, *args, **kwargs):
e.error_code,
e.error_description,
)
return error_response
response = error_response
cors.set_headers(response, origin='*', headers=('x-requested-with', 'authorization'))
return response
@setting_enabled('ENABLE', settings=app_settings)

View File

@ -2009,41 +2009,91 @@ def test_token_endpoint_code_timeout(oidc_client, oidc_settings, simple_user, ap
def test_authenticate_client_exception_handling(app, oidc_client, simple_user, rf):
from authentic2_idp_oidc.views import (
CORSInvalidOrigin,
InvalidClient,
InvalidRequest,
WrongClientSecret,
authenticate_client,
)
request = rf.get('/')
request = rf.post('/')
# missing client id
with pytest.raises(InvalidRequest):
authenticate_client(request, client=oidc_client)
authenticate_client(request)
# empty client id
request.POST = {'client_id': '', 'client_secret': ''}
with pytest.raises(InvalidClient):
authenticate_client(request, client=oidc_client)
authenticate_client(request)
# empty client secret
request.POST['client_id'] = 'abc'
with pytest.raises(InvalidRequest):
authenticate_client(request, client=oidc_client)
authenticate_client(request)
# wrong client id
request.POST['client_secret'] = 'def'
with pytest.raises(InvalidClient):
authenticate_client(request, client=oidc_client)
authenticate_client(request)
# wrong client secret
request.POST['client_id'] = oidc_client.client_id
with pytest.raises(WrongClientSecret):
authenticate_client(request, client=oidc_client)
authenticate_client(request)
# wrong client secret
request.POST['client_id'] = oidc_client.client_id
with pytest.raises(WrongClientSecret):
authenticate_client(request)
# OK
request.POST['client_secret'] = oidc_client.client_secret
assert authenticate_client(request, client=oidc_client) == oidc_client
assert authenticate_client(request) == oidc_client
# missing origin
request = rf.post(
'/', data={'client_id': oidc_client.client_id, 'client_secret': 'xxx'}, HTTP_SEC_FETCH_MODE='cors'
)
with pytest.raises(CORSInvalidOrigin):
authenticate_client(request)
# invalid origin
request = rf.post(
'/',
data={'client_id': oidc_client.client_id, 'client_secret': 'xxx'},
HTTP_SEC_FETCH_MODE='cors',
HTTP_ORIGIN='https://sp.example.com/',
)
with pytest.raises(CORSInvalidOrigin):
authenticate_client(request)
def test_token_cors_preflight(app):
token_url = make_url('oidc-token')
app.options(token_url, status=405)
app.options(
token_url,
headers={
'sec-fetch-mode': 'cors',
'origin': 'https://coin.org',
'access-control-request-method': 'get',
},
status=405,
)
response = app.options(
token_url,
headers={
'sec-fetch-mode': 'cors',
'origin': 'https://coin.org',
'access-control-request-method': 'post',
},
status=200,
)
assert response.headers['access-control-allow-origin'] == 'https://coin.org'
def test_login_from_client_accounts_appearance(oidc_client, app, simple_user, settings):
@ -2106,3 +2156,25 @@ def test_login_from_client_accounts_appearance(oidc_client, app, simple_user, se
assert ('class', 'a2-service-information--logo') in response.pyquery('img')[0].items()
assert ('src', '/media/services/logos/201x201.jpg') in response.pyquery('img')[0].items()
assert ('alt', 'One specific client') in response.pyquery('img')[0].items()
def test_user_info_cors(app, oidc_client, simple_user):
response = app.options(
'/idp/oidc/user_info/',
headers={'Sec-Fetch-Mode': 'cors', 'Access-Control-Request-Method': 'GET'},
status=200,
)
assert response.headers['Access-Control-Allow-Origin'] == '*'
assert response.headers['Access-Control-Max-Age']
assert response.headers['Access-Control-Allow-Methods'] == 'GET'
assert response.headers['Access-Control-Allow-Headers'] == 'x-requested-with,authorization'
assert response.content == b''
token = OIDCAccessToken.objects.create(client=oidc_client, user=simple_user)
response = app.get(
'/idp/oidc/user_info/',
headers={'Authorization': f'Bearer {token.uuid}'},
status=200,
)
assert response.headers['Access-Control-Allow-Origin'] == '*'

View File

@ -88,3 +88,29 @@ def test_user_info(app, client, freezer, simple_user):
response['WWW-Authenticate']
== 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
)
def test_openid_configuration(app):
response = app.get('/.well-known/openid-configuration')
assert response.json == {
'authorization_endpoint': 'https://testserver/idp/oidc/authorize',
'end_session_endpoint': 'https://testserver/idp/oidc/logout',
'frontchannel_logout_session_supported': True,
'frontchannel_logout_supported': True,
'id_token_signing_alg_values_supported': ['RS256', 'HS256', 'ES256'],
'issuer': 'https://testserver/',
'jwks_uri': 'https://testserver/idp/oidc/certs',
'response_types_supported': ['code', 'token', 'token id_token'],
'subject_types_supported': ['public', 'pairwise'],
'token_endpoint': 'https://testserver/idp/oidc/token',
'token_endpoint_auth_methods_supported': ['client_secret_post', 'client_secret_basic'],
'userinfo_endpoint': 'https://testserver/idp/oidc/user_info',
}
assert response.headers['Access-Control-Allow-Origin'] == '*'
def test_certs(app, oidc_settings):
response = app.get('/idp/oidc/certs/')
assert response.json['keys']
assert response.headers['Access-Control-Allow-Origin'] == '*'