use good API from lasso to set Extensions node content (#23003)

- use extensions.any tuple to set the content of the Extensions node
- add tests for the presence of the eo:next_url node when
  ADD_AUTHNREQUEST_NEXT_URL_EXTENSION is used
- add tests for next_url propagation through the RelayState value
This commit is contained in:
Benjamin Dauvergne 2018-04-05 19:34:59 +02:00 committed by Frédéric Péters
parent edb09ed8fd
commit d4d0b85944
2 changed files with 64 additions and 24 deletions

View File

@ -373,13 +373,19 @@ class LoginView(ProfileMixin, LogMixin, View):
if utils.get_setting(idp, 'ADD_AUTHNREQUEST_NEXT_URL_EXTENSION'):
authn_request.extensions = lasso.Samlp2Extensions()
authn_request.extensions.setOriginalXmlnode(
eo_next_url = escape(request.build_absolute_uri(next_url or '/'))
# lasso>2.5.1 introduced a better API
if hasattr(authn_request.extensions, 'any'):
authn_request.extensions.any = (
'<eo:next_url xmlns:eo="https://www.entrouvert.com/">%s</eo:next_url>' % eo_next_url,)
else:
authn_request.extensions.setOriginalXmlnode(
'''<samlp:Extensions
xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"
xmlns:eo="https://www.entrouvert.com/">
<eo:next_url>%s</eo:next_url>
</samlp:Extensions>''' %
escape(request.build_absolute_uri(next_url or '/')))
xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"
xmlns:eo="https://www.entrouvert.com/">
<eo:next_url>%s</eo:next_url>
</samlp:Extensions>''' % eo_next_url
)
self.set_next_url(next_url)
login.buildAuthnRequestMsg()
except lasso.Error as e:

View File

@ -1,9 +1,13 @@
import base64
import zlib
import lasso
from pytest import fixture
from django.core.urlresolvers import reverse
from django.utils import six
from django.utils.six.moves.urllib import parse as urlparse
from mellon.utils import create_metadata
@ -58,6 +62,18 @@ class MockIdp(object):
def process_authn_request_redirect(self, url, auth_result=True, consent=True):
login = lasso.Login(self.server)
login.processAuthnRequestMsg(url.split('?', 1)[1])
# See
# https://docs.python.org/2/library/zlib.html#zlib.decompress
# for the -15 magic value.
#
# * -8 to -15: Uses the absolute value of wbits as the window size
# logarithm. The input must be a raw stream with no header or trailer.
#
# it means Deflate instead of GZIP (same stream no header, no trailer)
self.request = zlib.decompress(
base64.b64decode(
urlparse.parse_qs(
urlparse.urlparse(url).query)['SAMLRequest'][0]), -15)
try:
login.validateRequestMsg(auth_result, consent)
except lasso.LoginRequestDeniedError:
@ -76,7 +92,7 @@ class MockIdp(object):
login.buildAuthnResponseMsg()
else:
raise NotImplementedError
return login.msgUrl, login.msgBody
return login.msgUrl, login.msgBody, login.msgRelayState
def resolve_artifact(self, soap_message):
login = lasso.Login(self.server)
@ -105,20 +121,23 @@ def idp(sp_settings, idp_metadata, idp_private_key, sp_metadata):
def test_sso_slo(db, app, idp, caplog, sp_settings):
response = app.get(reverse('mellon_login'))
url, body = idp.process_authn_request_redirect(response['Location'])
response = app.get(reverse('mellon_login') + '?next=/whatever/')
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert relay_state
assert 'eo:next_url' not in str(idp.request)
assert url.endswith(reverse('mellon_login'))
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body})
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert response['Location'].endswith(sp_settings.LOGIN_REDIRECT_URL)
assert response['Location'].endswith('/whatever/')
def test_sso(db, app, idp, caplog, sp_settings):
response = app.get(reverse('mellon_login'))
url, body = idp.process_authn_request_redirect(response['Location'])
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert not relay_state
assert url.endswith(reverse('mellon_login'))
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body})
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert response['Location'].endswith(sp_settings.LOGIN_REDIRECT_URL)
@ -126,9 +145,10 @@ def test_sso(db, app, idp, caplog, sp_settings):
def test_sso_request_denied(db, app, idp, caplog, sp_settings):
response = app.get(reverse('mellon_login'))
url, body = idp.process_authn_request_redirect(response['Location'], auth_result=False)
url, body, relay_state = idp.process_authn_request_redirect(response['Location'], auth_result=False)
assert not relay_state
assert url.endswith(reverse('mellon_login'))
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body})
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
if six.PY3:
assert "status is not success codes: ['urn:oasis:names:tc:SAML:2.0:status:Responder',\
'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']" in caplog.text
@ -142,28 +162,30 @@ def test_sso_artifact(db, app, caplog, sp_settings, idp_metadata, idp_private_ke
request = rf.get('/')
sp_metadata = create_metadata(request)
idp = MockIdp(idp_metadata, idp_private_key, sp_metadata)
response = app.get(reverse('mellon_login'))
url, body = idp.process_authn_request_redirect(response['Location'])
response = app.get(reverse('mellon_login') + '?next=/whatever/')
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert relay_state
assert body is None
assert reverse('mellon_login') in url
assert 'SAMLart' in url
acs_artifact_url = url.split('testserver', 1)[1]
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url)
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert response['Location'].endswith(sp_settings.LOGIN_REDIRECT_URL)
assert response['Location'].endswith('/whatever/')
# force delog
app.session.flush()
del app.session['_auth_user_id']
assert 'dead artifact' not in caplog.text
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url)
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
# verify retry login was asked
assert 'dead artifact' in caplog.text
assert response.status_code == 302
assert reverse('mellon_login') in url
response = response.follow()
url, body = idp.process_authn_request_redirect(response['Location'])
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert relay_state
reset_caplog(caplog)
# verify caplog has been cleaned
assert 'created new user' not in caplog.text
@ -172,7 +194,19 @@ def test_sso_artifact(db, app, caplog, sp_settings, idp_metadata, idp_private_ke
assert 'SAMLart' in url
acs_artifact_url = url.split('testserver', 1)[1]
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url)
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert response['Location'].endswith(sp_settings.LOGIN_REDIRECT_URL)
assert response['Location'].endswith('/whatever/')
def test_sso_slo_pass_next_url(db, app, idp, caplog, sp_settings):
sp_settings.MELLON_ADD_AUTHNREQUEST_NEXT_URL_EXTENSION = True
response = app.get(reverse('mellon_login') + '?next=/whatever/')
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert 'eo:next_url' in str(idp.request)
assert url.endswith(reverse('mellon_login'))
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert response['Location'].endswith('/whatever/')