wcs/tests/test_saml_auth.py

260 lines
9.9 KiB
Python

import datetime
import os
import sys
import shutil
try:
import lasso
except ImportError:
lasso = None
import pytest
from quixote import cleanup
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.saml2 import Saml2Directory
from wcs.qommon.ident.idp import MethodAdminDirectory, AdminIDPDir
from wcs.qommon import sessions, x509utils
from utilities import get_app, create_temporary_pub
pytestmark = pytest.mark.skipif('lasso is None')
IDP_METADATA = """<?xml version="1.0"?>
<ns0:EntityDescriptor xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:ns1="http://www.w3.org/2000/09/xmldsig#" entityID="http://sso.example.net/saml2/metadata">
<ns0:IDPSSODescriptor protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
<ns0:ArtifactResolutionService Binding="urn:oasis:names:tc:SAML:2.0:bindings:SOAP" Location="http://sso.example.net/saml2/artifact" index="0"/>
<ns0:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://sso.example.net/saml2/slo" ResponseLocation="http://sso.example.net/saml2/slo_return"/>
<ns0:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://sso.example.net/saml2/slo" ResponseLocation="http://sso.example.net/saml2/slo_return"/>
<ns0:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:SOAP" Location="http://sso.example.net/saml2/slo/soap"/>
<ns0:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://sso.example.net/saml2/sso"/>
<ns0:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://sso.example.net/saml2/sso"/>
</ns0:IDPSSODescriptor>
</ns0:EntityDescriptor>"""
def setup_module(module):
cleanup()
global pub
pub = create_temporary_pub()
def setup_environment(pub, idp_number=1):
pub.cfg = {}
pub.cfg['sp'] = {
'saml2_metadata': 'saml2-metadata.xml',
'saml2_base_url': 'http://example.net/saml',
'saml2_providerid': 'http://example.net/saml/metadata'
}
MethodAdminDirectory().generate_rsa_keypair()
pub.cfg['idp'] = {}
for i in range(idp_number):
# generate a pair of keys for the mocking idp server
idp_publickey, idp_privatekey = x509utils.generate_rsa_keypair()
metadata = IDP_METADATA
if i == 0:
base_id = 'http-sso.example.net-saml2-metadata'
else:
base_id = 'http-sso%s.example.net-saml2-metadata' % i
metadata = IDP_METADATA.replace('sso.example.net',
'sso%d.example.net' % i)
pub.cfg['idp'][base_id] = {
'metadata': 'idp-%s-metadata.xml' % base_id,
'publickey': 'idp-%s-publickey.pem' % base_id,
'role': lasso.PROVIDER_ROLE_IDP,
}
filename = pub.cfg['idp'][base_id]['metadata']
fd = file(os.path.join(pub.app_dir, filename), 'w')
fd.write(metadata)
fd.close()
filename = pub.cfg['idp'][base_id]['publickey']
fd = file(os.path.join(pub.app_dir, filename), 'w')
fd.write(idp_publickey)
fd.close()
filename = pub.cfg['idp'][base_id]['publickey'].replace('public', 'private')
fd = file(os.path.join(pub.app_dir, filename), 'w')
fd.write(idp_privatekey)
fd.close()
pub.write_cfg()
def teardown_module(module):
shutil.rmtree(pub.APP_DIR)
def test_login():
setup_environment(pub)
req = HTTPRequest(None, {
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
})
pub._set_request(req)
saml2 = Saml2Directory()
saml2.perform_login()
assert req.response.status_code == 302
assert req.response.headers['location'].startswith('http://sso.example.net/saml2/sso?SAMLRequest')
def get_authn_response_msg():
idp_metadata_filepath = os.path.join(pub.app_dir,
'idp-http-sso.example.net-saml2-metadata-metadata.xml')
idp_key_filepath = os.path.join(pub.app_dir,
'idp-http-sso.example.net-saml2-metadata-privatekey.pem')
idp = lasso.Server(idp_metadata_filepath, idp_key_filepath, None, None)
idp.addProvider(lasso.PROVIDER_ROLE_SP,
os.path.join(pub.app_dir, 'saml2-metadata.xml'),
os.path.join(pub.app_dir, 'public-key.pem'))
login = lasso.Login(idp)
login.initIdpInitiatedAuthnRequest(pub.cfg['sp']['saml2_providerid'])
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
login.request.nameIDPolicy.allowCreate = True
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST
login.processAuthnRequestMsg(None)
login.validateRequestMsg(True, True)
login.buildAssertion(lasso.SAML2_AUTHN_CONTEXT_PASSWORD,
datetime.datetime.now().isoformat(),
'unused',
(datetime.datetime.now() - datetime.timedelta(3600)).isoformat(),
(datetime.datetime.now() + datetime.timedelta(3600)).isoformat())
login.buildAuthnResponseMsg()
return login.msgBody
def get_assertion_consumer_request():
req = HTTPRequest(None, {
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
'PATH_INFO': '/saml/assertionConsumerPost',
})
pub._set_request(req)
sessions.Session.wipe()
req.session = sessions.Session(id=1)
assert req.session.user is None
req.form['SAMLResponse'] = get_authn_response_msg()
return req
def test_saml_metadata():
setup_environment(pub)
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': '', })
pub._set_request(req)
saml2 = Saml2Directory()
body = saml2.metadata()
assert '<EntityDescriptor' in body
assert req.response.content_type == 'text/xml'
def test_saml_public_key():
setup_environment(pub)
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': '', })
pub._set_request(req)
saml2 = Saml2Directory()
body = saml2.public_key()
assert body.startswith('-----BEGIN PUBLIC KEY-----')
assert req.response.content_type == 'application/octet-stream'
def test_assertion_consumer():
setup_environment(pub)
req = get_assertion_consumer_request()
saml2 = Saml2Directory()
body = saml2.assertionConsumerPost()
assert req.response.status_code == 303
assert req.response.headers['location'] == 'http://example.net/'
assert req.session.user is not None
def test_assertion_consumer_existing_federation():
# 1st pass to generate a name id
setup_environment(pub)
req = get_assertion_consumer_request()
saml2 = Saml2Directory()
saml_response_body = req.form['SAMLResponse']
body = saml2.assertionConsumerPost()
# create user matching the name identifier
user = pub.user_class(name='foobar')
user.id = 17
user.name_identifiers = [req.session.name_identifier]
user.store()
req = HTTPRequest(None, {
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
'PATH_INFO': '/saml/assertionConsumerPost',
})
pub._set_request(req)
req.session = sessions.Session(id=2) # another session
req.form['SAMLResponse'] = saml_response_body
assert req.session.user is None
# replay the response, this will give an assertion replay error
saml2 = Saml2Directory()
body = saml2.assertionConsumerPost()
assert 'Assertion replay' in str(body)
# wipe knowledge of past assertions
shutil.rmtree(os.path.join(pub.app_dir, 'assertions'))
saml2 = Saml2Directory()
assert req.session.user is None
body = saml2.assertionConsumerPost()
assert req.session.user == 17
def test_assertion_consumer_redirect_after_url():
setup_environment(pub)
req = get_assertion_consumer_request()
req.session.after_url = '/foobar'
saml2 = Saml2Directory()
saml_response_body = req.form['SAMLResponse']
body = saml2.assertionConsumerPost()
assert req.response.headers['location'] == 'http://example.net/foobar'
def test_saml_login_page():
setup_environment(pub)
resp = get_app(pub).get('/login/')
assert resp.status_int == 302
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=')
def test_saml_login_page_several_idp():
setup_environment(pub, idp_number=4)
# even if there are multiple IdP, /login/ will initiate SSO with the first
# one.
resp = get_app(pub).get('/login/')
assert resp.status_int == 302
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=')
def test_saml_register():
setup_environment(pub)
get_app(pub).get('/register/', status=404)
pub.cfg['saml_identities'] = {'identity-creation': 'self'}
pub.write_cfg()
# if there's no specific registration URL, this initiates a SSO and there
# should be a registration link on the identity provider
resp = get_app(pub).get('/register/')
assert resp.location == 'http://example.net/login/'
resp = resp.follow()
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=')
# check redirection to known registration page
pub.cfg['saml_identities'] = {'identity-creation': 'self',
'registration-url': 'http://sso.example.net/registration'}
pub.write_cfg()
resp = get_app(pub).get('/register/')
assert resp.location == 'http://sso.example.net/registration'
# check redirection to known registration page, with a variable
pub.cfg['saml_identities'] = {'identity-creation': 'self',
'registration-url': 'http://sso.example.net/registration?next_url=[next_url]'}
pub.write_cfg()
resp = get_app(pub).get('/register/')
assert resp.location == 'http://sso.example.net/registration?next_url=http%3A%2F%2Fexample.net%2Fregister%2F'
def test_saml_logout():
setup_environment(pub)
req = get_assertion_consumer_request()
saml2 = Saml2Directory()
saml2.assertionConsumerPost()
assert req.session.user is not None
body = saml2.slo_sp()
assert req.response.headers['location'].startswith('http://sso.example.net/saml2/slo?SAMLRequest=')
assert req.session.user is None