wip/50571-OIDC-entete-Access-Control-Allow #151
|
@ -180,7 +180,7 @@ class OIDCClient(Service):
|
|||
raise ValueError('redirect_uri length > %s' % app_settings.REDIRECT_URI_MAX_LENGTH)
|
||||
|
||||
parsed_uri = urllib.parse.urlparse(redirect_uri)
|
||||
for valid_redirect_uri in self.redirect_uris.split():
|
||||
for valid_redirect_uri in self.get_redirect_uris():
|
||||
parsed_valid_uri = urllib.parse.urlparse(valid_redirect_uri)
|
||||
if parsed_uri.scheme != parsed_valid_uri.scheme:
|
||||
continue
|
||||
|
@ -215,7 +215,7 @@ class OIDCClient(Service):
|
|||
if self.sector_identifier_uri:
|
||||
sector_identifier = utils.url_domain(self.sector_identifier_uri)
|
||||
else:
|
||||
for redirect_uri in self.redirect_uris.split():
|
||||
for redirect_uri in self.get_redirect_uris():
|
||||
hostname = utils.url_domain(redirect_uri)
|
||||
if sector_identifier is None:
|
||||
sector_identifier = hostname
|
||||
|
@ -230,7 +230,7 @@ class OIDCClient(Service):
|
|||
return sector_identifier
|
||||
|
||||
def get_base_urls(self):
|
||||
return super().get_base_urls() + [url for url in self.redirect_uris.split() if url]
|
||||
return super().get_base_urls() + [url for url in self.get_redirect_uris() if url]
|
||||
|
||||
def __repr__(self):
|
||||
return '<OIDCClient name:%r client_id:%r identifier_policy:%r>' % (
|
||||
|
@ -262,6 +262,9 @@ class OIDCClient(Service):
|
|||
return {}
|
||||
return {'id': utils.make_sub(self, user)}
|
||||
|
||||
def get_redirect_uris(self):
|
||||
return filter(None, self.redirect_uris.split())
|
||||
|
||||
|
||||
class OIDCAuthorization(models.Model):
|
||||
client_ct = models.ForeignKey(
|
||||
|
|
|
@ -40,7 +40,7 @@ from authentic2.a2_rbac.models import OrganizationalUnit
|
|||
from authentic2.custom_user.models import Profile
|
||||
from authentic2.decorators import setting_enabled
|
||||
from authentic2.exponential_retry_timeout import ExponentialRetryTimeout
|
||||
from authentic2.utils import hooks
|
||||
from authentic2.utils import cors, hooks
|
||||
from authentic2.utils.misc import last_authentication_event, login_require, make_url, redirect
|
||||
from authentic2.utils.service import set_service
|
||||
from authentic2.utils.view_decorators import check_view_restriction
|
||||
|
@ -189,6 +189,10 @@ class WrongClientSecret(InvalidClient):
|
|||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class CORSInvalidOrigin(OIDCException):
|
||||
error_code = 'invalid_origin'
|
||||
|
||||
|
||||
def idtoken_duration(client):
|
||||
return client.idtoken_duration or datetime.timedelta(seconds=app_settings.IDTOKEN_DURATION)
|
||||
|
||||
|
@ -224,12 +228,16 @@ def openid_configuration(request, *args, **kwargs):
|
|||
'frontchannel_logout_supported': True,
|
||||
'frontchannel_logout_session_supported': True,
|
||||
}
|
||||
return JsonResponse(metadata)
|
||||
response = JsonResponse(metadata)
|
||||
cors.set_headers(response, origin='*')
|
||||
return response
|
||||
|
||||
|
||||
@setting_enabled('ENABLE', settings=app_settings)
|
||||
def certs(request, *args, **kwargs):
|
||||
return HttpResponse(utils.get_jwkset().export(private_keys=False), content_type='application/json')
|
||||
response = HttpResponse(utils.get_jwkset().export(private_keys=False), content_type='application/json')
|
||||
cors.set_headers(response, origin='*')
|
||||
return response
|
||||
|
||||
|
||||
@check_view_restriction
|
||||
|
@ -257,9 +265,7 @@ def authorize(request, *args, **kwargs):
|
|||
except ValueError:
|
||||
error_description = _('Redirect URI "%s" is unknown.') % redirect_uri
|
||||
if settings.DEBUG:
|
||||
error_description += _(' Known redirect URIs are: %s') % ', '.join(
|
||||
client.redirect_uris.split()
|
||||
)
|
||||
error_description += _(' Known redirect URIs are: %s') % ', '.join(client.get_redirect_uris())
|
||||
raise InvalidRequest(error_description)
|
||||
use_fragment = client.authorization_flow == client.FLOW_IMPLICIT
|
||||
validated_redirect_uri = redirect_uri
|
||||
|
@ -584,7 +590,7 @@ def is_ro_cred_grant_ratelimited(request, key='ip', increment=True):
|
|||
)
|
||||
|
||||
|
||||
def authenticate_client(request, ratelimit=False, client=None):
|
||||
def authenticate_client(request):
|
||||
'''Authenticate client on the token endpoint'''
|
||||
|
||||
if 'authorization' in request.headers:
|
||||
|
@ -599,12 +605,16 @@ def authenticate_client(request, ratelimit=False, client=None):
|
|||
raise InvalidClient(_('Empty client identifier'))
|
||||
|
||||
if not client_secret:
|
||||
raise InvalidRequest('missing client_secret', client=client)
|
||||
raise InvalidRequest('missing client_secret')
|
||||
|
||||
client = get_client(client_id)
|
||||
if not client:
|
||||
raise InvalidClient(_('Wrong client identifier: %s') % client_id)
|
||||
|
||||
if cors.is_cors_request(request):
|
||||
if not cors.is_good_origin(request, client.get_redirect_uris()):
|
||||
raise CORSInvalidOrigin(_('Your Origin header does not match the configured redirect_uris'))
|
||||
|
||||
return authenticate_client_secret(client, client_secret)
|
||||
|
||||
|
||||
|
@ -614,7 +624,7 @@ def idtoken_from_user_credential(request):
|
|||
raise InvalidRequest('Rate limit exceeded for IP address "%s"' % request.META.get('REMOTE_ADDR', ''))
|
||||
|
||||
try:
|
||||
client = authenticate_client(request, ratelimit=True, client=None)
|
||||
client = authenticate_client(request)
|
||||
except InvalidClient:
|
||||
# increment rate limit by IP
|
||||
if is_ro_cred_grant_ratelimited(request):
|
||||
|
@ -818,6 +828,8 @@ def tokens_from_authz_code(request):
|
|||
@setting_enabled('ENABLE', settings=app_settings)
|
||||
@csrf_exempt
|
||||
def token(request, *args, **kwargs):
|
||||
if cors.is_preflight_request(request):
|
||||
return cors.preflight_response(request, origin=request, methods=('POST',))
|
||||
if request.method != 'POST':
|
||||
return HttpResponseNotAllowed(['POST'])
|
||||
grant_type = request.POST.get('grant_type')
|
||||
|
@ -858,6 +870,9 @@ def authenticate_access_token(request):
|
|||
@setting_enabled('ENABLE', settings=app_settings)
|
||||
@csrf_exempt
|
||||
def user_info(request, *args, **kwargs):
|
||||
if cors.is_preflight_request(request):
|
||||
return cors.preflight_response(request, origin='*', headers=('x-requested-with', 'authorization'))
|
||||
|
||||
try:
|
||||
access_token = authenticate_access_token(request)
|
||||
user_info = utils.create_user_info(
|
||||
|
@ -867,7 +882,7 @@ def user_info(request, *args, **kwargs):
|
|||
access_token.scope_set(),
|
||||
profile=access_token.profile,
|
||||
)
|
||||
return JsonResponse(user_info)
|
||||
response = JsonResponse(user_info)
|
||||
except OIDCException as e:
|
||||
error_response = e.json_response(request, endpoint='user_info')
|
||||
if e.status == 401:
|
||||
|
@ -875,7 +890,9 @@ def user_info(request, *args, **kwargs):
|
|||
e.error_code,
|
||||
e.error_description,
|
||||
)
|
||||
return error_response
|
||||
response = error_response
|
||||
cors.set_headers(response, origin='*', headers=('x-requested-with', 'authorization'))
|
||||
return response
|
||||
|
||||
|
||||
@setting_enabled('ENABLE', settings=app_settings)
|
||||
|
|
|
@ -2009,41 +2009,91 @@ def test_token_endpoint_code_timeout(oidc_client, oidc_settings, simple_user, ap
|
|||
|
||||
def test_authenticate_client_exception_handling(app, oidc_client, simple_user, rf):
|
||||
from authentic2_idp_oidc.views import (
|
||||
CORSInvalidOrigin,
|
||||
InvalidClient,
|
||||
InvalidRequest,
|
||||
WrongClientSecret,
|
||||
authenticate_client,
|
||||
)
|
||||
|
||||
request = rf.get('/')
|
||||
request = rf.post('/')
|
||||
|
||||
# missing client id
|
||||
with pytest.raises(InvalidRequest):
|
||||
authenticate_client(request, client=oidc_client)
|
||||
authenticate_client(request)
|
||||
|
||||
# empty client id
|
||||
request.POST = {'client_id': '', 'client_secret': ''}
|
||||
with pytest.raises(InvalidClient):
|
||||
authenticate_client(request, client=oidc_client)
|
||||
authenticate_client(request)
|
||||
|
||||
# empty client secret
|
||||
request.POST['client_id'] = 'abc'
|
||||
with pytest.raises(InvalidRequest):
|
||||
authenticate_client(request, client=oidc_client)
|
||||
authenticate_client(request)
|
||||
|
||||
# wrong client id
|
||||
request.POST['client_secret'] = 'def'
|
||||
with pytest.raises(InvalidClient):
|
||||
authenticate_client(request, client=oidc_client)
|
||||
authenticate_client(request)
|
||||
|
||||
# wrong client secret
|
||||
request.POST['client_id'] = oidc_client.client_id
|
||||
with pytest.raises(WrongClientSecret):
|
||||
authenticate_client(request, client=oidc_client)
|
||||
authenticate_client(request)
|
||||
|
||||
# wrong client secret
|
||||
request.POST['client_id'] = oidc_client.client_id
|
||||
with pytest.raises(WrongClientSecret):
|
||||
authenticate_client(request)
|
||||
|
||||
# OK
|
||||
request.POST['client_secret'] = oidc_client.client_secret
|
||||
assert authenticate_client(request, client=oidc_client) == oidc_client
|
||||
assert authenticate_client(request) == oidc_client
|
||||
|
||||
# missing origin
|
||||
request = rf.post(
|
||||
'/', data={'client_id': oidc_client.client_id, 'client_secret': 'xxx'}, HTTP_SEC_FETCH_MODE='cors'
|
||||
)
|
||||
with pytest.raises(CORSInvalidOrigin):
|
||||
authenticate_client(request)
|
||||
|
||||
# invalid origin
|
||||
request = rf.post(
|
||||
'/',
|
||||
data={'client_id': oidc_client.client_id, 'client_secret': 'xxx'},
|
||||
HTTP_SEC_FETCH_MODE='cors',
|
||||
HTTP_ORIGIN='https://sp.example.com/',
|
||||
)
|
||||
with pytest.raises(CORSInvalidOrigin):
|
||||
authenticate_client(request)
|
||||
|
||||
|
||||
def test_token_cors_preflight(app):
|
||||
token_url = make_url('oidc-token')
|
||||
|
||||
app.options(token_url, status=405)
|
||||
|
||||
app.options(
|
||||
token_url,
|
||||
headers={
|
||||
'sec-fetch-mode': 'cors',
|
||||
'origin': 'https://coin.org',
|
||||
'access-control-request-method': 'get',
|
||||
},
|
||||
status=405,
|
||||
)
|
||||
|
||||
response = app.options(
|
||||
token_url,
|
||||
headers={
|
||||
'sec-fetch-mode': 'cors',
|
||||
'origin': 'https://coin.org',
|
||||
'access-control-request-method': 'post',
|
||||
},
|
||||
status=200,
|
||||
)
|
||||
assert response.headers['access-control-allow-origin'] == 'https://coin.org'
|
||||
|
||||
|
||||
def test_login_from_client_accounts_appearance(oidc_client, app, simple_user, settings):
|
||||
|
@ -2106,3 +2156,25 @@ def test_login_from_client_accounts_appearance(oidc_client, app, simple_user, se
|
|||
assert ('class', 'a2-service-information--logo') in response.pyquery('img')[0].items()
|
||||
assert ('src', '/media/services/logos/201x201.jpg') in response.pyquery('img')[0].items()
|
||||
assert ('alt', 'One specific client') in response.pyquery('img')[0].items()
|
||||
|
||||
|
||||
def test_user_info_cors(app, oidc_client, simple_user):
|
||||
response = app.options(
|
||||
'/idp/oidc/user_info/',
|
||||
headers={'Sec-Fetch-Mode': 'cors', 'Access-Control-Request-Method': 'GET'},
|
||||
status=200,
|
||||
)
|
||||
assert response.headers['Access-Control-Allow-Origin'] == '*'
|
||||
assert response.headers['Access-Control-Max-Age']
|
||||
assert response.headers['Access-Control-Allow-Methods'] == 'GET'
|
||||
assert response.headers['Access-Control-Allow-Headers'] == 'x-requested-with,authorization'
|
||||
assert response.content == b''
|
||||
|
||||
token = OIDCAccessToken.objects.create(client=oidc_client, user=simple_user)
|
||||
|
||||
response = app.get(
|
||||
'/idp/oidc/user_info/',
|
||||
headers={'Authorization': f'Bearer {token.uuid}'},
|
||||
status=200,
|
||||
)
|
||||
assert response.headers['Access-Control-Allow-Origin'] == '*'
|
||||
|
|
|
@ -88,3 +88,29 @@ def test_user_info(app, client, freezer, simple_user):
|
|||
response['WWW-Authenticate']
|
||||
== 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
|
||||
)
|
||||
|
||||
|
||||
def test_openid_configuration(app):
|
||||
response = app.get('/.well-known/openid-configuration')
|
||||
assert response.json == {
|
||||
'authorization_endpoint': 'https://testserver/idp/oidc/authorize',
|
||||
'end_session_endpoint': 'https://testserver/idp/oidc/logout',
|
||||
'frontchannel_logout_session_supported': True,
|
||||
'frontchannel_logout_supported': True,
|
||||
'id_token_signing_alg_values_supported': ['RS256', 'HS256', 'ES256'],
|
||||
'issuer': 'https://testserver/',
|
||||
'jwks_uri': 'https://testserver/idp/oidc/certs',
|
||||
'response_types_supported': ['code', 'token', 'token id_token'],
|
||||
'subject_types_supported': ['public', 'pairwise'],
|
||||
'token_endpoint': 'https://testserver/idp/oidc/token',
|
||||
'token_endpoint_auth_methods_supported': ['client_secret_post', 'client_secret_basic'],
|
||||
'userinfo_endpoint': 'https://testserver/idp/oidc/user_info',
|
||||
}
|
||||
|
||||
assert response.headers['Access-Control-Allow-Origin'] == '*'
|
||||
|
||||
|
||||
def test_certs(app, oidc_settings):
|
||||
response = app.get('/idp/oidc/certs/')
|
||||
assert response.json['keys']
|
||||
assert response.headers['Access-Control-Allow-Origin'] == '*'
|
||||
|
|
Loading…
Reference in New Issue