django-mellon/tests/test_sso_slo.py

319 lines
13 KiB
Python
Raw Normal View History

import re
import base64
import zlib
import xml.etree.ElementTree as ET
import lasso
from pytest import fixture
from django.core.urlresolvers import reverse
2018-03-27 10:20:44 +02:00
from django.utils import six
from django.utils.six.moves.urllib import parse as urlparse
from mellon.utils import create_metadata
from httmock import all_requests, HTTMock, response as mock_response
from utils import reset_caplog
@fixture
def idp_metadata():
return open('tests/metadata.xml').read()
@fixture
def idp_private_key():
return open('tests/idp-private-key.pem').read()
@fixture
def sp_private_key():
return open('tests/sp-private-key.pem').read()
@fixture
def public_key():
return open('tests/public-key.pem').read()
@fixture
def sp_settings(private_settings, idp_metadata, sp_private_key, public_key):
private_settings.MELLON_IDENTITY_PROVIDERS = [{
'METADATA': idp_metadata,
}]
private_settings.MELLON_PUBLIC_KEYS = [public_key]
private_settings.MELLON_PRIVATE_KEYS = [sp_private_key]
private_settings.MELLON_NAME_ID_POLICY_FORMAT = lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
private_settings.LOGIN_REDIRECT_URL = '/'
return private_settings
@fixture
def sp_metadata(sp_settings, rf):
request = rf.get('/')
return create_metadata(request)
class MockIdp(object):
def __init__(self, idp_metadata, private_key, sp_metadata):
self.server = server = lasso.Server.newFromBuffers(idp_metadata, private_key)
server.addProviderFromBuffer(lasso.PROVIDER_ROLE_SP, sp_metadata)
def process_authn_request_redirect(self, url, auth_result=True, consent=True, msg=None):
login = lasso.Login(self.server)
login.processAuthnRequestMsg(url.split('?', 1)[1])
# See
# https://docs.python.org/2/library/zlib.html#zlib.decompress
# for the -15 magic value.
#
# * -8 to -15: Uses the absolute value of wbits as the window size
# logarithm. The input must be a raw stream with no header or trailer.
#
# it means Deflate instead of GZIP (same stream no header, no trailer)
self.request = zlib.decompress(
base64.b64decode(
urlparse.parse_qs(
urlparse.urlparse(url).query)['SAMLRequest'][0]), -15)
try:
login.validateRequestMsg(auth_result, consent)
except lasso.LoginRequestDeniedError:
pass
else:
login.buildAssertion(lasso.SAML_AUTHENTICATION_METHOD_PASSWORD,
"FIXME",
"FIXME",
"FIXME",
"FIXME")
if not auth_result and msg:
login.response.status.statusMessage = msg
if login.protocolProfile == lasso.LOGIN_PROTOCOL_PROFILE_BRWS_ART:
login.buildArtifactMsg(lasso.HTTP_METHOD_ARTIFACT_GET)
self.artifact = login.artifact
self.artifact_message = login.artifactMessage
elif login.protocolProfile == lasso.LOGIN_PROTOCOL_PROFILE_BRWS_POST:
login.buildAuthnResponseMsg()
else:
raise NotImplementedError
return login.msgUrl, login.msgBody, login.msgRelayState
def resolve_artifact(self, soap_message):
login = lasso.Login(self.server)
login.processRequestMsg(soap_message)
if hasattr(self, 'artifact') and self.artifact == login.artifact:
# artifact is known, go on !
login.artifactMessage = self.artifact_message
# forget the artifact
del self.artifact
del self.artifact_message
login.buildResponseMsg()
return login.msgBody
def mock_artifact_resolver(self):
@all_requests
def f(url, request):
content = self.resolve_artifact(request.body)
return mock_response(200, content=content,
headers={'Content-Type': 'application/soap+xml'})
return f
@fixture
def idp(sp_settings, idp_metadata, idp_private_key, sp_metadata):
return MockIdp(idp_metadata, idp_private_key, sp_metadata)
def test_sso_slo(db, app, idp, caplog, sp_settings):
response = app.get(reverse('mellon_login') + '?next=/whatever/')
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert relay_state
assert 'eo:next_url' not in str(idp.request)
assert url.endswith(reverse('mellon_login'))
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert urlparse.urlparse(response['Location']).path == '/whatever/'
def test_sso(db, app, idp, caplog, sp_settings):
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert not relay_state
assert url.endswith(reverse('mellon_login'))
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert urlparse.urlparse(response['Location']).path == sp_settings.LOGIN_REDIRECT_URL
def test_sso_request_denied(db, app, idp, caplog, sp_settings):
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(
response['Location'],
auth_result=False,
msg=u'User is not allowed to login')
assert not relay_state
assert url.endswith(reverse('mellon_login'))
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
2018-03-27 10:20:44 +02:00
if six.PY3:
assert "status is not success codes: ['urn:oasis:names:tc:SAML:2.0:status:Responder',\
'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']" in caplog.text
else:
assert "status is not success codes: [u'urn:oasis:names:tc:SAML:2.0:status:Responder',\
u'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']" in caplog.text
def test_sso_request_denied_artifact(db, app, caplog, sp_settings, idp_metadata, idp_private_key, rf):
sp_settings.MELLON_DEFAULT_ASSERTION_CONSUMER_BINDING = 'artifact'
request = rf.get('/')
sp_metadata = create_metadata(request)
idp = MockIdp(idp_metadata, idp_private_key, sp_metadata)
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(
response['Location'],
auth_result=False,
msg=u'User is not allowed to login')
assert not relay_state
assert body is None
assert reverse('mellon_login') in url
assert 'SAMLart' in url
acs_artifact_url = url.split('testserver', 1)[1]
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
assert "status is not success codes: ['urn:oasis:names:tc:SAML:2.0:status:Responder',\
'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']" in caplog.text
assert 'User is not allowed to login' in response
def test_sso_artifact(db, app, caplog, sp_settings, idp_metadata, idp_private_key, rf):
sp_settings.MELLON_DEFAULT_ASSERTION_CONSUMER_BINDING = 'artifact'
request = rf.get('/')
sp_metadata = create_metadata(request)
idp = MockIdp(idp_metadata, idp_private_key, sp_metadata)
response = app.get(reverse('mellon_login') + '?next=/whatever/')
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert relay_state
assert body is None
assert reverse('mellon_login') in url
assert 'SAMLart' in url
acs_artifact_url = url.split('testserver', 1)[1]
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert urlparse.urlparse(response['Location']).path == '/whatever/'
# force delog, but keep session information for relaystate handling
assert app.session
del app.session['_auth_user_id']
assert 'dead artifact' not in caplog.text
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
# verify retry login was asked
assert 'dead artifact' in caplog.text
assert urlparse.urlparse(response['Location']).path == reverse('mellon_login')
response = response.follow()
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert relay_state
reset_caplog(caplog)
# verify caplog has been cleaned
assert 'created new user' not in caplog.text
assert body is None
assert reverse('mellon_login') in url
assert 'SAMLart' in url
acs_artifact_url = url.split('testserver', 1)[1]
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert urlparse.urlparse(response['Location']).path == '/whatever/'
def test_sso_slo_pass_next_url(db, app, idp, caplog, sp_settings):
sp_settings.MELLON_ADD_AUTHNREQUEST_NEXT_URL_EXTENSION = True
response = app.get(reverse('mellon_login') + '?next=/whatever/')
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert 'eo:next_url' in str(idp.request)
assert url.endswith(reverse('mellon_login'))
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
assert 'created new user' in caplog.text
assert 'logged in using SAML' in caplog.text
assert response['Location'].endswith('/whatever/')
def test_sso_artifact_no_loop(db, app, caplog, sp_settings, idp_metadata, idp_private_key, rf):
sp_settings.MELLON_DEFAULT_ASSERTION_CONSUMER_BINDING = 'artifact'
request = rf.get('/')
sp_metadata = create_metadata(request)
idp = MockIdp(idp_metadata, idp_private_key, sp_metadata)
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert body is None
assert reverse('mellon_login') in url
assert 'SAMLart' in url
acs_artifact_url = url.split('testserver', 1)[1]
# forget the artifact
idp.artifact = ''
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url)
assert 'MELLON_RETRY_LOGIN=1;' in response['Set-Cookie']
# first error, we retry
assert urlparse.urlparse(response['Location']).path == reverse('mellon_login')
# check we are not logged
assert not app.session
# redo
response = app.get(reverse('mellon_login'))
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
assert body is None
assert reverse('mellon_login') in url
assert 'SAMLart' in url
acs_artifact_url = url.split('testserver', 1)[1]
# forget the artifact
idp.artifact = ''
with HTTMock(idp.mock_artifact_resolver()):
response = app.get(acs_artifact_url)
# check cookie is deleted after failed retry
# Py3-Dj111 variation
assert re.match(r'.*MELLON_RETRY_LOGIN=("")?;', response['Set-Cookie'])
assert 'Location' not in response
# check we are still not logged
assert not app.session
# check return url is in page
assert '"%s"' % sp_settings.LOGIN_REDIRECT_URL in response.text
def test_sso_slo_pass_login_hints_always_backoffice(db, app, idp, caplog, sp_settings):
sp_settings.MELLON_LOGIN_HINTS = ['always_backoffice']
response = app.get(reverse('mellon_login') + '?next=/whatever/')
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
root = ET.fromstring(idp.request)
login_hints = root.findall('.//{https://www.entrouvert.com/}login-hint')
assert len(login_hints) == 1, 'missing login hint'
assert login_hints[0].text == 'backoffice', 'login hint is not backoffice'
def test_sso_slo_pass_login_hints_backoffice(db, app, idp, caplog, sp_settings):
sp_settings.MELLON_LOGIN_HINTS = ['backoffice']
response = app.get(reverse('mellon_login') + '?next=/whatever/')
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
root = ET.fromstring(idp.request)
login_hints = root.findall('.//{https://www.entrouvert.com/}login-hint')
assert len(login_hints) == 0
for next_url in ['/manage/', '/admin/', '/manager/']:
response = app.get(reverse('mellon_login') + '?next=%s' % next_url)
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
root = ET.fromstring(idp.request)
login_hints = root.findall('.//{https://www.entrouvert.com/}login-hint')
assert len(login_hints) == 1, 'missing login hint'
assert login_hints[0].text == 'backoffice', 'login hint is not backoffice'