misc: add support for SOAP SLO (#41949)
This commit is contained in:
parent
65cbdcefc3
commit
482aa09f92
|
@ -26,6 +26,9 @@
|
|||
<SingleLogoutService
|
||||
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
|
||||
Location="{{ logout_url }}" />
|
||||
<SingleLogoutService
|
||||
Binding="urn:oasis:names:tc:SAML:2.0:bindings:SOAP"
|
||||
Location="{{ logout_url }}" />
|
||||
{% for name_id_format in name_id_formats %}
|
||||
<NameIDFormat>{{ name_id_format }}</NameIDFormat>
|
||||
{% endfor %}
|
||||
|
|
|
@ -35,7 +35,7 @@ from django.shortcuts import render, resolve_url
|
|||
from django.urls import reverse
|
||||
from django.utils.http import urlencode
|
||||
from django.utils import six
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.encoding import force_text, force_str
|
||||
from django.contrib.auth import REDIRECT_FIELD_NAME
|
||||
from django.db import transaction
|
||||
from django.utils.translation import ugettext as _
|
||||
|
@ -523,20 +523,24 @@ login = transaction.non_atomic_requests(csrf_exempt(LoginView.as_view()))
|
|||
class LogoutView(ProfileMixin, LogMixin, View):
|
||||
def get(self, request, *args, **kwargs):
|
||||
if 'SAMLRequest' in request.GET:
|
||||
return self.idp_logout(request, request.META['QUERY_STRING'])
|
||||
return self.idp_logout(request, request.META['QUERY_STRING'], 'redirect')
|
||||
elif 'SAMLResponse' in request.GET:
|
||||
return self.sp_logout_response(request)
|
||||
else:
|
||||
return self.sp_logout_request(request)
|
||||
|
||||
def logout(self, request, issuer, saml_user, session_indexes, indexes):
|
||||
def post(self, request, *args, **kwargs):
|
||||
return self.idp_logout(request, force_str(request.body), 'soap')
|
||||
|
||||
def logout(self, request, issuer, saml_user, session_indexes, indexes, mode):
|
||||
session_keys = set(indexes.values_list('session_key', flat=True))
|
||||
indexes.delete()
|
||||
|
||||
synchronous_logout = request.user == saml_user
|
||||
asynchronous_logout = (
|
||||
mode == 'soap'
|
||||
# the current session is not the only killed
|
||||
len(session_keys) != 1
|
||||
or len(session_keys) != 1
|
||||
or (
|
||||
# there is not current session
|
||||
not request.user.is_authenticated
|
||||
|
@ -567,7 +571,7 @@ class LogoutView(ProfileMixin, LogMixin, View):
|
|||
auth.logout(request)
|
||||
self.log.info('synchronous logout of %s', user)
|
||||
|
||||
def idp_logout(self, request, msg):
|
||||
def idp_logout(self, request, msg, mode):
|
||||
'''Handle logout request emitted by the IdP'''
|
||||
self.profile = logout = utils.create_logout(request)
|
||||
try:
|
||||
|
@ -610,7 +614,8 @@ class LogoutView(ProfileMixin, LogMixin, View):
|
|||
issuer=issuer,
|
||||
saml_user=name_id_user,
|
||||
session_indexes=session_indexes,
|
||||
indexes=indexes)
|
||||
indexes=indexes,
|
||||
mode=mode)
|
||||
|
||||
try:
|
||||
logout.buildResponseMsg()
|
||||
|
|
|
@ -280,6 +280,48 @@ def test_sso_idp_slo(db, app, idp, caplog, sp_settings):
|
|||
idp.check_slo_return(response.location)
|
||||
|
||||
|
||||
def test_sso_idp_slo_soap(db, app, idp, caplog, sp_settings):
|
||||
assert Session.objects.count() == 0
|
||||
assert User.objects.count() == 0
|
||||
|
||||
# first session
|
||||
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
||||
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
||||
assert relay_state
|
||||
assert 'eo:next_url' not in str(idp.request)
|
||||
assert url.endswith(reverse('mellon_login'))
|
||||
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
||||
assert 'created new user' in caplog.text
|
||||
assert 'logged in using SAML' in caplog.text
|
||||
assert urlparse.urlparse(response['Location']).path == '/whatever/'
|
||||
|
||||
# start a new Lasso session
|
||||
idp.reset_session_dump()
|
||||
|
||||
# second session
|
||||
app.cookiejar.clear()
|
||||
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
||||
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
||||
assert relay_state
|
||||
assert 'eo:next_url' not in str(idp.request)
|
||||
assert url.endswith(reverse('mellon_login'))
|
||||
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
||||
assert 'created new user' in caplog.text
|
||||
assert 'logged in using SAML' in caplog.text
|
||||
assert urlparse.urlparse(response['Location']).path == '/whatever/'
|
||||
|
||||
assert Session.objects.count() == 2
|
||||
assert User.objects.count() == 1
|
||||
|
||||
# idp logout
|
||||
app.cookiejar.clear()
|
||||
|
||||
url, body, relay_state = idp.init_slo(method=lasso.HTTP_METHOD_SOAP)
|
||||
response = app.post(url, params=body, headers={'Content-Type': force_str('text/xml')})
|
||||
assert Session.objects.count() == 1
|
||||
idp.check_slo_return(body=response.content)
|
||||
|
||||
|
||||
def test_sso_idp_slo_full(db, app, idp, caplog, sp_settings):
|
||||
assert Session.objects.count() == 0
|
||||
assert User.objects.count() == 0
|
||||
|
@ -318,6 +360,44 @@ def test_sso_idp_slo_full(db, app, idp, caplog, sp_settings):
|
|||
idp.check_slo_return(url=response.location)
|
||||
|
||||
|
||||
def test_sso_idp_slo_full_soap(db, app, idp, caplog, sp_settings):
|
||||
assert Session.objects.count() == 0
|
||||
assert User.objects.count() == 0
|
||||
|
||||
# first session
|
||||
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
||||
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
||||
assert relay_state
|
||||
assert 'eo:next_url' not in str(idp.request)
|
||||
assert url.endswith(reverse('mellon_login'))
|
||||
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
||||
assert 'created new user' in caplog.text
|
||||
assert 'logged in using SAML' in caplog.text
|
||||
assert urlparse.urlparse(response['Location']).path == '/whatever/'
|
||||
|
||||
# second session
|
||||
app.cookiejar.clear()
|
||||
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
||||
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
||||
assert relay_state
|
||||
assert 'eo:next_url' not in str(idp.request)
|
||||
assert url.endswith(reverse('mellon_login'))
|
||||
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
||||
assert 'created new user' in caplog.text
|
||||
assert 'logged in using SAML' in caplog.text
|
||||
assert urlparse.urlparse(response['Location']).path == '/whatever/'
|
||||
|
||||
assert Session.objects.count() == 2
|
||||
assert User.objects.count() == 1
|
||||
|
||||
# idp logout
|
||||
app.cookiejar.clear()
|
||||
url, body, relay_state = idp.init_slo(method=lasso.HTTP_METHOD_SOAP, full=True)
|
||||
response = app.post(url, params=body, headers={'Content-Type': force_str('text/xml')})
|
||||
assert Session.objects.count() == 0
|
||||
idp.check_slo_return(body=response.content)
|
||||
|
||||
|
||||
def test_sso(db, app, idp, caplog, sp_settings):
|
||||
response = app.get(reverse('mellon_login'))
|
||||
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
||||
|
|
|
@ -42,9 +42,9 @@ def test_create_metadata(rf, private_settings, caplog):
|
|||
('/sm:EntityDescriptor[@entityID="http://testserver/metadata/"]', 1,
|
||||
('/*', 1),
|
||||
('/sm:SPSSODescriptor', 1,
|
||||
('/*', 6),
|
||||
('/*', 7),
|
||||
('/sm:NameIDFormat', 1),
|
||||
('/sm:SingleLogoutService', 1),
|
||||
('/sm:SingleLogoutService', 2),
|
||||
('/sm:AssertionConsumerService[@isDefault=\'true\'][@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact\']', 1),
|
||||
('/sm:AssertionConsumerService[@isDefault=\'true\'][@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST\']',
|
||||
0),
|
||||
|
@ -64,11 +64,11 @@ def test_create_metadata(rf, private_settings, caplog):
|
|||
('/sm:EntityDescriptor[@entityID="http://testserver/metadata/"]', 1,
|
||||
('/*', 1),
|
||||
('/sm:SPSSODescriptor', 1,
|
||||
('/*', 7),
|
||||
('/*', 8),
|
||||
('/sm:Extensions', 1,
|
||||
('/idpdisc:DiscoveryResponse', 1)),
|
||||
('/sm:NameIDFormat', 1),
|
||||
('/sm:SingleLogoutService', 1),
|
||||
('/sm:SingleLogoutService', 2),
|
||||
('/sm:AssertionConsumerService[@isDefault=\'true\'][@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact\']', 1),
|
||||
('/sm:AssertionConsumerService[@isDefault=\'true\'][@Binding=\'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST\']',
|
||||
0),
|
||||
|
|
|
@ -20,11 +20,9 @@ import mock
|
|||
import lasso
|
||||
from django.utils.six.moves.urllib.parse import parse_qs, urlparse
|
||||
import base64
|
||||
import random
|
||||
import hashlib
|
||||
from httmock import HTTMock
|
||||
|
||||
import django
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_text
|
||||
from django.utils.http import urlencode
|
||||
|
@ -109,9 +107,9 @@ def test_metadata(private_settings, client):
|
|||
('/sm:EntityDescriptor[@entityID="http://testserver/metadata/"]', 1,
|
||||
('/*', 4),
|
||||
('/sm:SPSSODescriptor', 1,
|
||||
('/*', 6),
|
||||
('/*', 7),
|
||||
('/sm:NameIDFormat', 1),
|
||||
('/sm:SingleLogoutService', 1),
|
||||
('/sm:SingleLogoutService', 2),
|
||||
('/sm:AssertionConsumerService', None,
|
||||
('[@isDefault="true"]', None,
|
||||
('[@Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact"]', 1),
|
||||
|
|
Loading…
Reference in New Issue