use good API from lasso to set Extensions node content (#23003)
- use extensions.any tuple to set the content of the Extensions node - add tests for the presence of the eo:next_url node when ADD_AUTHNREQUEST_NEXT_URL_EXTENSION is used - add tests for next_url propagation through the RelayState value
This commit is contained in:
parent
edb09ed8fd
commit
d4d0b85944
|
@ -373,13 +373,19 @@ class LoginView(ProfileMixin, LogMixin, View):
|
|||
|
||||
if utils.get_setting(idp, 'ADD_AUTHNREQUEST_NEXT_URL_EXTENSION'):
|
||||
authn_request.extensions = lasso.Samlp2Extensions()
|
||||
authn_request.extensions.setOriginalXmlnode(
|
||||
eo_next_url = escape(request.build_absolute_uri(next_url or '/'))
|
||||
# lasso>2.5.1 introduced a better API
|
||||
if hasattr(authn_request.extensions, 'any'):
|
||||
authn_request.extensions.any = (
|
||||
'<eo:next_url xmlns:eo="https://www.entrouvert.com/">%s</eo:next_url>' % eo_next_url,)
|
||||
else:
|
||||
authn_request.extensions.setOriginalXmlnode(
|
||||
'''<samlp:Extensions
|
||||
xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"
|
||||
xmlns:eo="https://www.entrouvert.com/">
|
||||
<eo:next_url>%s</eo:next_url>
|
||||
</samlp:Extensions>''' %
|
||||
escape(request.build_absolute_uri(next_url or '/')))
|
||||
xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"
|
||||
xmlns:eo="https://www.entrouvert.com/">
|
||||
<eo:next_url>%s</eo:next_url>
|
||||
</samlp:Extensions>''' % eo_next_url
|
||||
)
|
||||
self.set_next_url(next_url)
|
||||
login.buildAuthnRequestMsg()
|
||||
except lasso.Error as e:
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
import base64
|
||||
import zlib
|
||||
|
||||
import lasso
|
||||
|
||||
from pytest import fixture
|
||||
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.utils import six
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from mellon.utils import create_metadata
|
||||
|
||||
|
@ -58,6 +62,18 @@ class MockIdp(object):
|
|||
def process_authn_request_redirect(self, url, auth_result=True, consent=True):
|
||||
login = lasso.Login(self.server)
|
||||
login.processAuthnRequestMsg(url.split('?', 1)[1])
|
||||
# See
|
||||
# https://docs.python.org/2/library/zlib.html#zlib.decompress
|
||||
# for the -15 magic value.
|
||||
#
|
||||
# * -8 to -15: Uses the absolute value of wbits as the window size
|
||||
# logarithm. The input must be a raw stream with no header or trailer.
|
||||
#
|
||||
# it means Deflate instead of GZIP (same stream no header, no trailer)
|
||||
self.request = zlib.decompress(
|
||||
base64.b64decode(
|
||||
urlparse.parse_qs(
|
||||
urlparse.urlparse(url).query)['SAMLRequest'][0]), -15)
|
||||
try:
|
||||
login.validateRequestMsg(auth_result, consent)
|
||||
except lasso.LoginRequestDeniedError:
|
||||
|
@ -76,7 +92,7 @@ class MockIdp(object):
|
|||
login.buildAuthnResponseMsg()
|
||||
else:
|
||||
raise NotImplementedError
|
||||
return login.msgUrl, login.msgBody
|
||||
return login.msgUrl, login.msgBody, login.msgRelayState
|
||||
|
||||
def resolve_artifact(self, soap_message):
|
||||
login = lasso.Login(self.server)
|
||||
|
@ -105,20 +121,23 @@ def idp(sp_settings, idp_metadata, idp_private_key, sp_metadata):
|
|||
|
||||
|
||||
def test_sso_slo(db, app, idp, caplog, sp_settings):
|
||||
response = app.get(reverse('mellon_login'))
|
||||
url, body = idp.process_authn_request_redirect(response['Location'])
|
||||
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
||||
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
||||
assert relay_state
|
||||
assert 'eo:next_url' not in str(idp.request)
|
||||
assert url.endswith(reverse('mellon_login'))
|
||||
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body})
|
||||
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
||||
assert 'created new user' in caplog.text
|
||||
assert 'logged in using SAML' in caplog.text
|
||||
assert response['Location'].endswith(sp_settings.LOGIN_REDIRECT_URL)
|
||||
assert response['Location'].endswith('/whatever/')
|
||||
|
||||
|
||||
def test_sso(db, app, idp, caplog, sp_settings):
|
||||
response = app.get(reverse('mellon_login'))
|
||||
url, body = idp.process_authn_request_redirect(response['Location'])
|
||||
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
||||
assert not relay_state
|
||||
assert url.endswith(reverse('mellon_login'))
|
||||
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body})
|
||||
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
||||
assert 'created new user' in caplog.text
|
||||
assert 'logged in using SAML' in caplog.text
|
||||
assert response['Location'].endswith(sp_settings.LOGIN_REDIRECT_URL)
|
||||
|
@ -126,9 +145,10 @@ def test_sso(db, app, idp, caplog, sp_settings):
|
|||
|
||||
def test_sso_request_denied(db, app, idp, caplog, sp_settings):
|
||||
response = app.get(reverse('mellon_login'))
|
||||
url, body = idp.process_authn_request_redirect(response['Location'], auth_result=False)
|
||||
url, body, relay_state = idp.process_authn_request_redirect(response['Location'], auth_result=False)
|
||||
assert not relay_state
|
||||
assert url.endswith(reverse('mellon_login'))
|
||||
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body})
|
||||
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
||||
if six.PY3:
|
||||
assert "status is not success codes: ['urn:oasis:names:tc:SAML:2.0:status:Responder',\
|
||||
'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']" in caplog.text
|
||||
|
@ -142,28 +162,30 @@ def test_sso_artifact(db, app, caplog, sp_settings, idp_metadata, idp_private_ke
|
|||
request = rf.get('/')
|
||||
sp_metadata = create_metadata(request)
|
||||
idp = MockIdp(idp_metadata, idp_private_key, sp_metadata)
|
||||
response = app.get(reverse('mellon_login'))
|
||||
url, body = idp.process_authn_request_redirect(response['Location'])
|
||||
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
||||
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
||||
assert relay_state
|
||||
assert body is None
|
||||
assert reverse('mellon_login') in url
|
||||
assert 'SAMLart' in url
|
||||
acs_artifact_url = url.split('testserver', 1)[1]
|
||||
with HTTMock(idp.mock_artifact_resolver()):
|
||||
response = app.get(acs_artifact_url)
|
||||
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
|
||||
assert 'created new user' in caplog.text
|
||||
assert 'logged in using SAML' in caplog.text
|
||||
assert response['Location'].endswith(sp_settings.LOGIN_REDIRECT_URL)
|
||||
assert response['Location'].endswith('/whatever/')
|
||||
# force delog
|
||||
app.session.flush()
|
||||
del app.session['_auth_user_id']
|
||||
assert 'dead artifact' not in caplog.text
|
||||
with HTTMock(idp.mock_artifact_resolver()):
|
||||
response = app.get(acs_artifact_url)
|
||||
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
|
||||
# verify retry login was asked
|
||||
assert 'dead artifact' in caplog.text
|
||||
assert response.status_code == 302
|
||||
assert reverse('mellon_login') in url
|
||||
response = response.follow()
|
||||
url, body = idp.process_authn_request_redirect(response['Location'])
|
||||
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
||||
assert relay_state
|
||||
reset_caplog(caplog)
|
||||
# verify caplog has been cleaned
|
||||
assert 'created new user' not in caplog.text
|
||||
|
@ -172,7 +194,19 @@ def test_sso_artifact(db, app, caplog, sp_settings, idp_metadata, idp_private_ke
|
|||
assert 'SAMLart' in url
|
||||
acs_artifact_url = url.split('testserver', 1)[1]
|
||||
with HTTMock(idp.mock_artifact_resolver()):
|
||||
response = app.get(acs_artifact_url)
|
||||
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
|
||||
assert 'created new user' in caplog.text
|
||||
assert 'logged in using SAML' in caplog.text
|
||||
assert response['Location'].endswith(sp_settings.LOGIN_REDIRECT_URL)
|
||||
assert response['Location'].endswith('/whatever/')
|
||||
|
||||
|
||||
def test_sso_slo_pass_next_url(db, app, idp, caplog, sp_settings):
|
||||
sp_settings.MELLON_ADD_AUTHNREQUEST_NEXT_URL_EXTENSION = True
|
||||
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
||||
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
||||
assert 'eo:next_url' in str(idp.request)
|
||||
assert url.endswith(reverse('mellon_login'))
|
||||
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
||||
assert 'created new user' in caplog.text
|
||||
assert 'logged in using SAML' in caplog.text
|
||||
assert response['Location'].endswith('/whatever/')
|
||||
|
|
Loading…
Reference in New Issue