idp_saml2: add CORS headers to SSO endpoint (#82266)
gitea/authentic/pipeline/head This commit looks good
Details
gitea/authentic/pipeline/head This commit looks good
Details
This commit is contained in:
parent
2cc154f011
commit
d9f9b59927
|
@ -107,7 +107,7 @@ from authentic2.saml.models import (
|
|||
saml2_urn_to_nidformat,
|
||||
save_key_values,
|
||||
)
|
||||
from authentic2.utils import hooks
|
||||
from authentic2.utils import cors, hooks
|
||||
from authentic2.utils import misc as utils_misc
|
||||
from authentic2.utils.misc import datetime_to_xs_datetime, find_authentication_event
|
||||
from authentic2.utils.misc import get_backends as get_idp_backends
|
||||
|
@ -513,6 +513,11 @@ def sso(request):
|
|||
For SOAP a session must be established previously through the login
|
||||
page. No authentication through the SOAP request is supported.
|
||||
"""
|
||||
# if CORS preflight request (mandatory if x-requested-with is present)
|
||||
# returns a wildcard autorisation
|
||||
if cors.is_preflight_request(request):
|
||||
return cors.preflight_response(request, origin=request, with_credentials=True)
|
||||
|
||||
if request.method == 'GET':
|
||||
logger.debug('called by GET')
|
||||
consent_answer = request.GET.get('consent_answer', '')
|
||||
|
@ -965,7 +970,12 @@ def return_login_response(request, login):
|
|||
else:
|
||||
raise NotImplementedError()
|
||||
provider = LibertyProvider.objects.get(entity_id=login.remoteProviderId)
|
||||
return return_saml2_response(request, login, title=_('You are being redirected to "%s"') % provider.name)
|
||||
response = return_saml2_response(
|
||||
request, login, title=_('You are being redirected to "%s"') % provider.name
|
||||
)
|
||||
if cors.is_cors_request(request) and cors.is_good_origin(request, login.remoteProviderId):
|
||||
cors.set_headers(response, origin=request, with_credentials=True)
|
||||
return response
|
||||
|
||||
|
||||
def finish_sso(request, login, user=None, return_profile=False):
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
# Authentic2 © Entr'ouvert
|
||||
|
||||
from django.http import HttpResponse, HttpResponseNotAllowed
|
||||
|
||||
from .misc import same_origin
|
||||
|
||||
DEFAULT_METHODS = ('GET',)
|
||||
|
||||
|
||||
def is_cors_request(request):
|
||||
return request.headers.get('sec-fetch-mode') == 'cors'
|
||||
|
||||
|
||||
def is_preflight_request(request):
|
||||
return request.method == 'OPTIONS' and is_cors_request(request)
|
||||
|
||||
|
||||
def is_good_origin(request, reference_url):
|
||||
origin = request.headers.get('Origin', 'null')
|
||||
if origin == 'null':
|
||||
return False
|
||||
if isinstance(reference_url, str):
|
||||
return same_origin(origin, reference_url)
|
||||
else:
|
||||
for url in reference_url:
|
||||
if same_origin(origin, url):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def preflight_response(request, *, methods=DEFAULT_METHODS, **kwargs):
|
||||
method = request.headers.get('Access-Control-Request-Method', '').upper()
|
||||
if method not in methods:
|
||||
return HttpResponseNotAllowed(methods)
|
||||
return set_headers(HttpResponse(''), methods=methods, **kwargs)
|
||||
|
||||
|
||||
def set_headers(
|
||||
response,
|
||||
*,
|
||||
origin='null',
|
||||
with_credentials=False,
|
||||
methods=DEFAULT_METHODS,
|
||||
headers=('x-requested-with',),
|
||||
max_age=86400,
|
||||
):
|
||||
# origin is an HttpRequest, take origin from it
|
||||
if hasattr(origin, 'headers'):
|
||||
origin = origin.headers['Origin']
|
||||
|
||||
response['Access-Control-Allow-Origin'] = origin
|
||||
response['Access-Control-Max-Age'] = str(max_age)
|
||||
response['Access-Control-Allow-Methods'] = ','.join(methods)
|
||||
if headers:
|
||||
response['Access-Control-Allow-Headers'] = ','.join(headers)
|
||||
if with_credentials:
|
||||
response['Access-Control-Allow-Credentials'] = 'true'
|
||||
return response
|
|
@ -316,7 +316,7 @@ class SamlSP:
|
|||
|
||||
class Scenario:
|
||||
check_federation = False
|
||||
authn_request_success = True
|
||||
authn_request_login_needed = True
|
||||
|
||||
def __init__(self, app, sp_kwargs=None, make_authn_request_kwargs=None, **kwargs):
|
||||
self.app = app
|
||||
|
@ -325,18 +325,20 @@ class Scenario:
|
|||
self.make_authn_request_kwargs = make_authn_request_kwargs or {}
|
||||
self.__dict__.update(kwargs)
|
||||
|
||||
def launch_authn_request(self):
|
||||
def launch_authn_request(self, requests_params=None):
|
||||
requests_params = requests_params or {}
|
||||
|
||||
# Launch an AuthnRequest
|
||||
url, body, relay_state, request_id = self.sp.make_authn_request(**self.make_authn_request_kwargs)
|
||||
if body is None:
|
||||
response = self.app.get(url)
|
||||
response = self.app.get(url, **requests_params)
|
||||
else: # post case
|
||||
params = {'SAMLRequest': body}
|
||||
if relay_state is not None:
|
||||
params['RelayState'] = relay_state
|
||||
response = self.app.post(url, params=params)
|
||||
response = self.app.post(url, params=params, **requests_params)
|
||||
|
||||
if self.authn_request_success:
|
||||
if self.authn_request_login_needed:
|
||||
utils.assert_redirects_complex(
|
||||
response,
|
||||
reverse('auth_login'),
|
||||
|
@ -1133,7 +1135,7 @@ def test_sso_is_passive_and_view_restriction(app, idp, user, cgu_attribute, capl
|
|||
scenario = Scenario(
|
||||
app,
|
||||
make_authn_request_kwargs={'is_passive': True},
|
||||
authn_request_success=False,
|
||||
authn_request_login_needed=False,
|
||||
)
|
||||
scenario.launch_authn_request()
|
||||
|
||||
|
@ -1163,7 +1165,7 @@ def test_sso_is_passive(app, idp, user, cgu_attribute, caplog):
|
|||
scenario = Scenario(
|
||||
app,
|
||||
make_authn_request_kwargs={'is_passive': True},
|
||||
authn_request_success=False,
|
||||
authn_request_login_needed=False,
|
||||
)
|
||||
scenario.launch_authn_request()
|
||||
|
||||
|
@ -1183,7 +1185,7 @@ def test_sso_with_authenticator_passive_sso_canceled(app, idp):
|
|||
scenario = Scenario(
|
||||
app,
|
||||
make_authn_request_kwargs={'is_passive': True},
|
||||
authn_request_success=False,
|
||||
authn_request_login_needed=False,
|
||||
)
|
||||
|
||||
authenticator = mock.Mock()
|
||||
|
@ -1221,7 +1223,7 @@ def test_sso_with_authenticator_passive_sso_authenticated(app, idp, user, monkey
|
|||
scenario = Scenario(
|
||||
app,
|
||||
make_authn_request_kwargs={'is_passive': True},
|
||||
authn_request_success=False,
|
||||
authn_request_login_needed=False,
|
||||
)
|
||||
|
||||
authenticator = mock.Mock()
|
||||
|
@ -1253,3 +1255,60 @@ def test_sso_with_authenticator_passive_sso_authenticated(app, idp, user, monkey
|
|||
scenario.idp_response = response
|
||||
scenario.handle_post_response()
|
||||
assert scenario.sp.login.response.status.statusCode.value == 'urn:oasis:names:tc:SAML:2.0:status:Success'
|
||||
|
||||
|
||||
def test_sso_cors_nok(app, idp, user):
|
||||
response = app.get('/login/')
|
||||
response.form.set('username', user.username)
|
||||
response.form.set('password', user.username)
|
||||
response.form.submit(name='login-password-submit')
|
||||
|
||||
scenario = Scenario(app, sp_kwargs=dict(binding='post'))
|
||||
scenario.authn_request_login_needed = False
|
||||
|
||||
# test preflight requests
|
||||
app.options(
|
||||
'/idp/saml2/sso',
|
||||
headers={
|
||||
'Origin': 'https://whatever.example.com',
|
||||
'Sec-Fetch-Mode': 'cors',
|
||||
'Access-Control-Request-Method': 'PUT',
|
||||
},
|
||||
status=405,
|
||||
)
|
||||
response = app.options(
|
||||
'/idp/saml2/sso',
|
||||
headers={
|
||||
'Origin': 'https://whatever.example.com',
|
||||
'Sec-Fetch-Mode': 'cors',
|
||||
'Access-Control-Request-Method': 'GET',
|
||||
},
|
||||
status=200,
|
||||
)
|
||||
assert response.headers['Access-Control-Allow-Origin'] == 'https://whatever.example.com'
|
||||
assert response.headers['Access-Control-Allow-Credentials'] == 'true'
|
||||
|
||||
scenario.launch_authn_request(
|
||||
requests_params={
|
||||
'headers': {
|
||||
'Origin': 'https://whatever.example.com',
|
||||
'Sec-Fetch-Mode': 'cors',
|
||||
'Access-Control-Request-Method': 'GET',
|
||||
}
|
||||
}
|
||||
)
|
||||
assert scenario.idp_response.headers.get('Access-Control-Allow-Origin') is None
|
||||
|
||||
|
||||
def test_sso_cors_ok(app, idp, user):
|
||||
response = app.get('/login/')
|
||||
response.form.set('username', user.username)
|
||||
response.form.set('password', user.username)
|
||||
response.form.submit(name='login-password-submit')
|
||||
|
||||
scenario = Scenario(app, sp_kwargs=dict(binding='post'))
|
||||
scenario.authn_request_login_needed = False
|
||||
scenario.launch_authn_request(
|
||||
requests_params={'headers': {'Origin': 'https://sp.example.com', 'Sec-Fetch-Mode': 'cors'}}
|
||||
)
|
||||
assert scenario.idp_response.headers.get('Access-Control-Allow-Origin') == 'https://sp.example.com'
|
||||
|
|
Loading…
Reference in New Issue