django-mellon/tests/test_sso_slo.py

319 lines
13 KiB
Python

import re
import base64
import zlib
import xml.etree.ElementTree as ET
import lasso
from pytest import fixture
from django.core.urlresolvers import reverse
from django.utils import six
from django.utils.six.moves.urllib import parse as urlparse
from mellon.utils import create_metadata
from httmock import all_requests, HTTMock, response as mock_response
from utils import reset_caplog
@fixture
def idp_metadata():
return open('tests/metadata.xml').read()
@fixture
def idp_private_key():
return open('tests/idp-private-key.pem').read()
@fixture
def sp_private_key():
return open('tests/sp-private-key.pem').read()
@fixture
def public_key():
return open('tests/public-key.pem').read()
@fixture
def sp_settings(private_settings, idp_metadata, sp_private_key, public_key):
private_settings.MELLON_IDENTITY_PROVIDERS = [{
'METADATA': idp_metadata,
}]
private_settings.MELLON_PUBLIC_KEYS = [public_key]
private_settings.MELLON_PRIVATE_KEYS = [sp_private_key]
private_settings.MELLON_NAME_ID_POLICY_FORMAT = lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
private_settings.LOGIN_REDIRECT_URL = '/'
return private_settings
@fixture
def sp_metadata(sp_settings, rf):
request = rf.get('/')
return create_metadata(request)
class MockIdp(object):
def __init__(self, idp_metadata, private_key, sp_metadata):
self.server = server = lasso.Server.newFromBuffers(idp_metadata, private_key)
server.addProviderFromBuffer(lasso.PROVIDER_ROLE_SP, sp_metadata)
def process_authn_request_redirect(self, url, auth_result=True, consent=True, msg=None):
login = lasso.Login(self.server)
login.processAuthnRequestMsg(url.split('?', 1)[1])
# See
# https://docs.python.org/2/library/zlib.html#zlib.decompress
# for the -15 magic value.
#
# * -8 to -15: Uses the absolute value of wbits as the window size
# logarithm. The input must be a raw stream with no header or trailer.
#
# it means Deflate instead of GZIP (same stream no header, no trailer)
self.request = zlib.decompress(
base64.b64decode(
urlparse.parse_qs(
urlparse.urlparse(url).query)['SAMLRequest'][0]), -15)
try:
login.validateRequestMsg(auth_result, consent)
except lasso.LoginRequestDeniedError:
pass
else:
login.buildAssertion(lasso.SAML_AUTHENTICATION_METHOD_PASSWORD,
"FIXME",
"FIXME",
"FIXME",
"FIXME")
if not auth_result and msg:
login.response.status.statusMessage = msg
if login.protocolProfile == lasso.LOGIN_PROTOCOL_PROFILE_BRWS_ART:
login.buildArtifactMsg(lasso.HTTP_METHOD_ARTIFACT_GET)
self.artifact = login.artifact
self.artifact_message = login.artifactMessage
elif login.protocolProfile == lasso.LOGIN_PROTOCOL_PROFILE_BRWS_POST:
login.buildAuthnResponseMsg()
else:
raise NotImplementedError
return login.msgUrl, login.msgBody, login.msgRelayState
def resolve_artifact(self, soap_message):
login = lasso.Login(self.server)
login.processRequestMsg(soap_message)
if hasattr(self, 'artifact') and self.artifact == login.artifact:
# artifact is known, go on !
login.artifactMessage = self.artifact_message
# forget the artifact
del self.artifact
del self.artifact_message
login.buildResponseMsg()
return login.msgBody
def mock_artifact_resolver(self):
@all_requests
def f(url, request):
content = self.resolve_artifact(request.body)
return mock_response(200, content=content,
headers={'Content-Type': 'application/soap+xml'})
return f
@fixture
def idp(sp_settings, idp_metadata, idp_private_key, sp_metadata):
return MockIdp(idp_metadata, idp_private_key, sp_metadata)
def test_sso_slo(db, app, idp, caplog, sp_settings):
response = app.get(reverse('mellon_login') + '?next=/whatever/')
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert relay_state
assert 'eo:next_url' not in str(idp.request)
assert url.endswith(reverse('mellon_login'))
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert urlparse.urlparse(response['Location']).path == '/whatever/'
def test_sso(db, app, idp, caplog, sp_settings):
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert not relay_state
assert url.endswith(reverse('mellon_login'))
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert urlparse.urlparse(response['Location']).path == sp_settings.LOGIN_REDIRECT_URL
def test_sso_request_denied(db, app, idp, caplog, sp_settings):
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(
response['Location'],
auth_result=False,
msg=u'User is not allowed to login')
assert not relay_state
assert url.endswith(reverse('mellon_login'))
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
if six.PY3:
assert "status is not success codes: ['urn:oasis:names:tc:SAML:2.0:status:Responder',\
'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']" in caplog.text
else:
assert "status is not success codes: [u'urn:oasis:names:tc:SAML:2.0:status:Responder',\
u'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']" in caplog.text
def test_sso_request_denied_artifact(db, app, caplog, sp_settings, idp_metadata, idp_private_key, rf):
sp_settings.MELLON_DEFAULT_ASSERTION_CONSUMER_BINDING = 'artifact'
request = rf.get('/')
sp_metadata = create_metadata(request)
idp = MockIdp(idp_metadata, idp_private_key, sp_metadata)
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(
response['Location'],
auth_result=False,
msg=u'User is not allowed to login')
assert not relay_state
assert body is None
assert reverse('mellon_login') in url
assert 'SAMLart' in url
acs_artifact_url = url.split('testserver', 1)[1]
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
assert "status is not success codes: ['urn:oasis:names:tc:SAML:2.0:status:Responder',\
'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']" in caplog.text
assert 'User is not allowed to login' in response
def test_sso_artifact(db, app, caplog, sp_settings, idp_metadata, idp_private_key, rf):
sp_settings.MELLON_DEFAULT_ASSERTION_CONSUMER_BINDING = 'artifact'
request = rf.get('/')
sp_metadata = create_metadata(request)
idp = MockIdp(idp_metadata, idp_private_key, sp_metadata)
response = app.get(reverse('mellon_login') + '?next=/whatever/')
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert relay_state
assert body is None
assert reverse('mellon_login') in url
assert 'SAMLart' in url
acs_artifact_url = url.split('testserver', 1)[1]
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert urlparse.urlparse(response['Location']).path == '/whatever/'
# force delog, but keep session information for relaystate handling
assert app.session
del app.session['_auth_user_id']
assert 'dead artifact' not in caplog.text
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
# verify retry login was asked
assert 'dead artifact' in caplog.text
assert urlparse.urlparse(response['Location']).path == reverse('mellon_login')
response = response.follow()
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert relay_state
reset_caplog(caplog)
# verify caplog has been cleaned
assert 'created new user' not in caplog.text
assert body is None
assert reverse('mellon_login') in url
assert 'SAMLart' in url
acs_artifact_url = url.split('testserver', 1)[1]
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert urlparse.urlparse(response['Location']).path == '/whatever/'
def test_sso_slo_pass_next_url(db, app, idp, caplog, sp_settings):
sp_settings.MELLON_ADD_AUTHNREQUEST_NEXT_URL_EXTENSION = True
response = app.get(reverse('mellon_login') + '?next=/whatever/')
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert 'eo:next_url' in str(idp.request)
assert url.endswith(reverse('mellon_login'))
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert response['Location'].endswith('/whatever/')
def test_sso_artifact_no_loop(db, app, caplog, sp_settings, idp_metadata, idp_private_key, rf):
sp_settings.MELLON_DEFAULT_ASSERTION_CONSUMER_BINDING = 'artifact'
request = rf.get('/')
sp_metadata = create_metadata(request)
idp = MockIdp(idp_metadata, idp_private_key, sp_metadata)
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert body is None
assert reverse('mellon_login') in url
assert 'SAMLart' in url
acs_artifact_url = url.split('testserver', 1)[1]
# forget the artifact
idp.artifact = ''
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url)
assert 'MELLON_RETRY_LOGIN=1;' in response['Set-Cookie']
# first error, we retry
assert urlparse.urlparse(response['Location']).path == reverse('mellon_login')
# check we are not logged
assert not app.session
# redo
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert body is None
assert reverse('mellon_login') in url
assert 'SAMLart' in url
acs_artifact_url = url.split('testserver', 1)[1]
# forget the artifact
idp.artifact = ''
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url)
# check cookie is deleted after failed retry
# Py3-Dj111 variation
assert re.match(r'.*MELLON_RETRY_LOGIN=("")?;', response['Set-Cookie'])
assert 'Location' not in response
# check we are still not logged
assert not app.session
# check return url is in page
assert '"%s"' % sp_settings.LOGIN_REDIRECT_URL in response.text
def test_sso_slo_pass_login_hints_always_backoffice(db, app, idp, caplog, sp_settings):
sp_settings.MELLON_LOGIN_HINTS = ['always_backoffice']
response = app.get(reverse('mellon_login') + '?next=/whatever/')
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
root = ET.fromstring(idp.request)
login_hints = root.findall('.//{https://www.entrouvert.com/}login-hint')
assert len(login_hints) == 1, 'missing login hint'
assert login_hints[0].text == 'backoffice', 'login hint is not backoffice'
def test_sso_slo_pass_login_hints_backoffice(db, app, idp, caplog, sp_settings):
sp_settings.MELLON_LOGIN_HINTS = ['backoffice']
response = app.get(reverse('mellon_login') + '?next=/whatever/')
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
root = ET.fromstring(idp.request)
login_hints = root.findall('.//{https://www.entrouvert.com/}login-hint')
assert len(login_hints) == 0
for next_url in ['/manage/', '/admin/', '/manager/']:
response = app.get(reverse('mellon_login') + '?next=%s' % next_url)
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
root = ET.fromstring(idp.request)
login_hints = root.findall('.//{https://www.entrouvert.com/}login-hint')
assert len(login_hints) == 1, 'missing login hint'
assert login_hints[0].text == 'backoffice', 'login hint is not backoffice'