426 lines
18 KiB
Python
426 lines
18 KiB
Python
import datetime
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import urlparse
|
|
|
|
try:
|
|
import lasso
|
|
except ImportError:
|
|
lasso = None
|
|
|
|
import pytest
|
|
|
|
from quixote import cleanup
|
|
from quixote import get_session, get_session_manager
|
|
from wcs.qommon.http_request import HTTPRequest
|
|
from wcs.qommon.misc import get_lasso_server
|
|
from wcs.qommon.saml2 import Saml2Directory
|
|
from wcs.qommon.ident.idp import MethodAdminDirectory, AdminIDPDir
|
|
from wcs.qommon import sessions, x509utils
|
|
from wcs.roles import Role
|
|
|
|
from utilities import get_app, create_temporary_pub, clean_temporary_pub
|
|
|
|
from test_hobo_notify import PROFILE
|
|
|
|
pytestmark = pytest.mark.skipif('lasso is None')
|
|
|
|
IDP_METADATA = """<?xml version="1.0"?>
|
|
<ns0:EntityDescriptor xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:ns1="http://www.w3.org/2000/09/xmldsig#" entityID="http://sso.example.net/saml2/metadata">
|
|
<ns0:IDPSSODescriptor protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
|
|
<ns0:ArtifactResolutionService Binding="urn:oasis:names:tc:SAML:2.0:bindings:SOAP" Location="http://sso.example.net/saml2/artifact" index="0"/>
|
|
<ns0:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://sso.example.net/saml2/slo" ResponseLocation="http://sso.example.net/saml2/slo_return"/>
|
|
<ns0:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://sso.example.net/saml2/slo" ResponseLocation="http://sso.example.net/saml2/slo_return"/>
|
|
<ns0:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:SOAP" Location="http://sso.example.net/saml2/slo/soap"/>
|
|
<ns0:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://sso.example.net/saml2/sso"/>
|
|
<ns0:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://sso.example.net/saml2/sso"/>
|
|
</ns0:IDPSSODescriptor>
|
|
</ns0:EntityDescriptor>"""
|
|
|
|
|
|
def pytest_generate_tests(metafunc):
|
|
if 'pub' in metafunc.fixturenames:
|
|
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True)
|
|
|
|
@pytest.fixture
|
|
def pub(request):
|
|
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
|
|
|
|
if not pub.cfg:
|
|
pub.cfg = {}
|
|
pub.cfg['sp'] = {
|
|
'saml2_metadata': 'saml2-metadata.xml',
|
|
'saml2_base_url': 'http://example.net/saml',
|
|
'saml2_providerid': 'http://example.net/saml/metadata'
|
|
}
|
|
MethodAdminDirectory().generate_rsa_keypair()
|
|
setup_idps(pub)
|
|
pub.user_class.wipe()
|
|
pub.user_class().store()
|
|
return pub
|
|
|
|
|
|
def setup_idps(pub, idp_number=1):
|
|
pub.cfg['idp'] = {}
|
|
for i in range(idp_number):
|
|
# generate a pair of keys for the mocking idp server
|
|
idp_publickey, idp_privatekey = x509utils.generate_rsa_keypair()
|
|
metadata = IDP_METADATA
|
|
if i == 0:
|
|
base_id = 'http-sso.example.net-saml2-metadata'
|
|
else:
|
|
base_id = 'http-sso%s.example.net-saml2-metadata' % i
|
|
metadata = IDP_METADATA.replace('sso.example.net',
|
|
'sso%d.example.net' % i)
|
|
pub.cfg['idp'][base_id] = {
|
|
'metadata': 'idp-%s-metadata.xml' % base_id,
|
|
'publickey': 'idp-%s-publickey.pem' % base_id,
|
|
'role': lasso.PROVIDER_ROLE_IDP,
|
|
}
|
|
filename = pub.cfg['idp'][base_id]['metadata']
|
|
fd = file(os.path.join(pub.app_dir, filename), 'w')
|
|
fd.write(metadata)
|
|
fd.close()
|
|
|
|
filename = pub.cfg['idp'][base_id]['publickey']
|
|
fd = file(os.path.join(pub.app_dir, filename), 'w')
|
|
fd.write(idp_publickey)
|
|
fd.close()
|
|
|
|
filename = pub.cfg['idp'][base_id]['publickey'].replace('public', 'private')
|
|
fd = file(os.path.join(pub.app_dir, filename), 'w')
|
|
fd.write(idp_privatekey)
|
|
fd.close()
|
|
|
|
pub.write_cfg()
|
|
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
|
|
def test_login(pub):
|
|
req = HTTPRequest(None, {
|
|
'SERVER_NAME': 'example.net',
|
|
'SCRIPT_NAME': '',
|
|
})
|
|
pub._set_request(req)
|
|
saml2 = Saml2Directory()
|
|
saml2.perform_login()
|
|
assert req.response.status_code == 302
|
|
assert req.response.headers['location'].startswith('http://sso.example.net/saml2/sso?SAMLRequest')
|
|
|
|
def get_authn_response_msg(pub, ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT):
|
|
idp_metadata_filepath = os.path.join(pub.app_dir,
|
|
'idp-http-sso.example.net-saml2-metadata-metadata.xml')
|
|
idp_key_filepath = os.path.join(pub.app_dir,
|
|
'idp-http-sso.example.net-saml2-metadata-privatekey.pem')
|
|
idp = lasso.Server(idp_metadata_filepath, idp_key_filepath, None, None)
|
|
idp.addProvider(lasso.PROVIDER_ROLE_SP,
|
|
os.path.join(pub.app_dir, 'saml2-metadata.xml'),
|
|
os.path.join(pub.app_dir, 'public-key.pem'))
|
|
login = lasso.Login(idp)
|
|
login.initIdpInitiatedAuthnRequest(pub.cfg['sp']['saml2_providerid'])
|
|
login.request.nameIDPolicy.format = ni_format
|
|
login.request.nameIDPolicy.allowCreate = True
|
|
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST
|
|
login.processAuthnRequestMsg(None)
|
|
login.validateRequestMsg(True, True)
|
|
login.buildAssertion(lasso.SAML2_AUTHN_CONTEXT_PASSWORD,
|
|
datetime.datetime.now().isoformat(),
|
|
'unused',
|
|
(datetime.datetime.now() - datetime.timedelta(3600)).isoformat(),
|
|
(datetime.datetime.now() + datetime.timedelta(3600)).isoformat())
|
|
if ni_format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED:
|
|
login.assertion.subject.nameID.content = '1234'
|
|
value = lasso.MiscTextNode.newWithString('John')
|
|
value.textChild = True
|
|
login.assertion.addAttributeWithNode('first_name', lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC, value)
|
|
value = lasso.MiscTextNode.newWithString('Doe')
|
|
value.textChild = True
|
|
login.assertion.addAttributeWithNode('last_name', lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC, value)
|
|
value = lasso.MiscTextNode.newWithString('john.doe@example.com')
|
|
value.textChild = True
|
|
login.assertion.addAttributeWithNode('email', lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC, value)
|
|
value = lasso.MiscTextNode.newWithString('2000-01-01')
|
|
value.textChild = True
|
|
login.assertion.addAttributeWithNode('birthdate', lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC, value)
|
|
for a_name in ['first_name', 'last_name', 'email']:
|
|
value = lasso.MiscTextNode.newWithString(a_name)
|
|
value.textChild = True
|
|
login.assertion.addAttributeWithNode('verified_attributes',
|
|
lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC, value)
|
|
|
|
if not login.assertion.attributeStatement:
|
|
login.assertion.attributeStatement = [lasso.Saml2AttributeStatement()]
|
|
|
|
# add two roles in role-slug attribute
|
|
role_slug_attribute = lasso.Saml2Attribute()
|
|
role_slug_attribute.name = 'role-slug'
|
|
role_slug_attribute.nameFormat = lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC
|
|
role_uuids = []
|
|
for role_uuid in ('foo', 'bar'):
|
|
text_node = lasso.MiscTextNode.newWithString(role_uuid)
|
|
text_node.textChild = True
|
|
atv = lasso.Saml2AttributeValue()
|
|
atv.any = [text_node]
|
|
role_uuids.append(atv)
|
|
role_slug_attribute.attributeValue = role_uuids
|
|
attributes = list(login.assertion.attributeStatement[0].attribute)
|
|
attributes.append(role_slug_attribute)
|
|
login.assertion.attributeStatement[0].attribute = attributes
|
|
|
|
login.buildAuthnResponseMsg()
|
|
return login.msgBody
|
|
|
|
def get_assertion_consumer_request(pub, ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT):
|
|
req = HTTPRequest(None, {
|
|
'SERVER_NAME': 'example.net',
|
|
'SCRIPT_NAME': '',
|
|
'PATH_INFO': '/saml/assertionConsumerPost',
|
|
})
|
|
pub._set_request(req)
|
|
pub.session_class.wipe()
|
|
req.session = pub.session_class(id=1)
|
|
assert req.session.user is None
|
|
req.form['SAMLResponse'] = get_authn_response_msg(pub, ni_format=ni_format)
|
|
return req
|
|
|
|
def test_saml_metadata(pub):
|
|
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': '', })
|
|
pub._set_request(req)
|
|
|
|
saml2 = Saml2Directory()
|
|
body = saml2.metadata()
|
|
assert '<EntityDescriptor' in body
|
|
assert req.response.content_type == 'text/xml'
|
|
|
|
def test_saml_public_key(pub):
|
|
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': '', })
|
|
pub._set_request(req)
|
|
|
|
saml2 = Saml2Directory()
|
|
body = saml2.public_key()
|
|
assert body.startswith('-----BEGIN PUBLIC KEY-----')
|
|
assert req.response.content_type == 'application/octet-stream'
|
|
|
|
def test_assertion_consumer(pub):
|
|
req = get_assertion_consumer_request(pub)
|
|
saml2 = Saml2Directory()
|
|
body = saml2.assertionConsumerPost()
|
|
|
|
assert req.response.status_code == 303
|
|
assert req.response.headers['location'] == 'http://example.net'
|
|
assert req.session.user is not None
|
|
|
|
def test_assertion_consumer_unspecified(pub):
|
|
req = get_assertion_consumer_request(pub, ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED)
|
|
saml2 = Saml2Directory()
|
|
body = saml2.assertionConsumerPost()
|
|
|
|
assert req.response.status_code == 303
|
|
assert req.response.headers['location'] == 'http://example.net'
|
|
assert req.session.user is not None
|
|
|
|
def test_assertion_consumer_existing_federation(pub, caplog):
|
|
# setup an hobo profile
|
|
from wcs.ctl.check_hobos import CmdCheckHobos
|
|
CmdCheckHobos().update_profile(PROFILE, pub)
|
|
|
|
pub.cfg['debug'] = {'logger': True}
|
|
pub.write_cfg()
|
|
pub.set_config()
|
|
|
|
Role.wipe()
|
|
role = Role('Foo')
|
|
role.uuid = 'foo'
|
|
role.store()
|
|
|
|
# 1st pass to generate a user
|
|
pub.user_class.wipe()
|
|
assert pub.user_class.count() == 0
|
|
req = get_assertion_consumer_request(pub)
|
|
saml2 = Saml2Directory()
|
|
saml_response_body = req.form['SAMLResponse']
|
|
body = saml2.assertionConsumerPost()
|
|
assert pub.user_class.count() == 1
|
|
user = pub.user_class.select()[0]
|
|
assert user.verified_fields
|
|
assert len(user.verified_fields) == 3
|
|
assert user.form_data['_birthdate'].tm_year == 2000
|
|
assert user.roles == [role.id] # bar uuid is ignored as unknown
|
|
|
|
assert ('enrolling user %s in Foo' % user.id) in [x.message for x in caplog.records]
|
|
assert 'role uuid bar is unknown' in [x.message for x in caplog.records]
|
|
|
|
req = HTTPRequest(None, {
|
|
'SERVER_NAME': 'example.net',
|
|
'SCRIPT_NAME': '',
|
|
'PATH_INFO': '/saml/assertionConsumerPost',
|
|
})
|
|
pub._set_request(req)
|
|
req.session = pub.session_class(id=2) # another session
|
|
req.session.message = ('error', 'blah')
|
|
req.form['SAMLResponse'] = saml_response_body
|
|
assert req.session.user is None
|
|
|
|
# replay the response, this will give an assertion replay error
|
|
saml2 = Saml2Directory()
|
|
body = saml2.assertionConsumerPost()
|
|
assert 'Assertion replay' in str(body)
|
|
|
|
# wipe knowledge of past assertions
|
|
shutil.rmtree(os.path.join(pub.app_dir, 'assertions'))
|
|
|
|
saml2 = Saml2Directory()
|
|
assert req.session.user is None
|
|
assert req.session.message == ('error', 'blah')
|
|
body = saml2.assertionConsumerPost()
|
|
assert req.session.user == user.id
|
|
assert req.session.saml_authn_context == lasso.SAML2_AUTHN_CONTEXT_PASSWORD
|
|
assert req.session.message is None
|
|
|
|
def test_assertion_consumer_redirect_after_url(pub):
|
|
req = get_assertion_consumer_request(pub)
|
|
req.form['RelayState'] = '/foobar/?test=ok'
|
|
saml2 = Saml2Directory()
|
|
saml_response_body = req.form['SAMLResponse']
|
|
body = saml2.assertionConsumerPost()
|
|
assert req.response.status_code == 303
|
|
assert req.response.headers['location'] == 'http://example.net/foobar/?test=ok'
|
|
|
|
def test_assertion_consumer_full_url_redirect_after_url(pub):
|
|
req = get_assertion_consumer_request(pub)
|
|
req.form['RelayState'] = 'http://example.org/foobar/?test=ok'
|
|
saml2 = Saml2Directory()
|
|
saml_response_body = req.form['SAMLResponse']
|
|
body = saml2.assertionConsumerPost()
|
|
assert req.response.status_code == 303
|
|
assert req.response.headers['location'] == 'http://example.org/foobar/?test=ok'
|
|
|
|
def test_saml_login_page(pub):
|
|
resp = get_app(pub).get('/login/')
|
|
assert resp.status_int == 302
|
|
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=')
|
|
request = lasso.Samlp2AuthnRequest()
|
|
request.initFromQuery(urlparse.urlparse(resp.location).query)
|
|
assert request.forceAuthn is False
|
|
|
|
def test_saml_login_page_force_authn(pub):
|
|
resp = get_app(pub).get('/login/?forceAuthn=true')
|
|
assert resp.status_int == 302
|
|
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=')
|
|
request = lasso.Samlp2AuthnRequest()
|
|
request.initFromQuery(urlparse.urlparse(resp.location).query)
|
|
assert request.forceAuthn is True
|
|
|
|
def test_saml_login_page_several_idp(pub):
|
|
setup_idps(pub, idp_number=4)
|
|
# even if there are multiple IdP, /login/ will initiate SSO with the first
|
|
# one.
|
|
# idp are stored in a dict, so the first idp is indeterminate
|
|
first_idp_domain = sorted(pub.cfg['idp'].keys())[0].split('-')[1]
|
|
resp = get_app(pub).get('/login/')
|
|
assert resp.status_int == 302
|
|
assert resp.location.startswith('http://%s/saml2/sso?SAMLRequest=' % first_idp_domain)
|
|
|
|
def test_saml_backoffice_redirect(pub):
|
|
resp = get_app(pub).get('/backoffice/')
|
|
assert resp.status_int == 302
|
|
assert resp.location.startswith('http://example.net/login/?next=')
|
|
resp = resp.follow()
|
|
assert resp.location.startswith('http://sso.example.net/saml2/sso')
|
|
assert urlparse.parse_qs(urlparse.urlparse(resp.location).query)['SAMLRequest']
|
|
assert urlparse.parse_qs(urlparse.urlparse(resp.location).query)['RelayState'] == ['http://example.net/backoffice/']
|
|
|
|
request = lasso.Samlp2AuthnRequest()
|
|
request.initFromQuery(urlparse.urlparse(resp.location).query)
|
|
assert ':next_url>http://example.net/backoffice/<' in request.getOriginalXmlnode()
|
|
|
|
def test_saml_register(pub):
|
|
get_app(pub).get('/register/', status=404)
|
|
pub.cfg['saml_identities'] = {'identity-creation': 'self'}
|
|
pub.write_cfg()
|
|
|
|
# if there's no specific registration URL, this initiates a SSO and there
|
|
# should be a registration link on the identity provider
|
|
resp = get_app(pub).get('/register/')
|
|
assert resp.location == 'http://example.net/login/?next=http%3A%2F%2Fexample.net%2Fregister%2F'
|
|
resp = resp.follow()
|
|
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=')
|
|
|
|
# check redirection to known registration page
|
|
pub.cfg['saml_identities'] = {'identity-creation': 'self',
|
|
'registration-url': 'http://sso.example.net/registration'}
|
|
pub.write_cfg()
|
|
resp = get_app(pub).get('/register/')
|
|
assert resp.location == 'http://sso.example.net/registration'
|
|
|
|
# check redirection to known registration page, with a variable
|
|
pub.cfg['saml_identities'] = {'identity-creation': 'self',
|
|
'registration-url': 'http://sso.example.net/registration?next_url=[next_url]'}
|
|
pub.write_cfg()
|
|
resp = get_app(pub).get('/register/')
|
|
assert resp.location == 'http://sso.example.net/registration?next_url=http%3A%2F%2Fexample.net%2Fregister%2F'
|
|
|
|
def test_saml_logout(pub):
|
|
req = get_assertion_consumer_request(pub)
|
|
saml2 = Saml2Directory()
|
|
saml2.assertionConsumerPost()
|
|
assert req.session.user is not None
|
|
body = saml2.slo_sp()
|
|
assert req.response.headers['location'].startswith('http://sso.example.net/saml2/slo?SAMLRequest=')
|
|
assert req.session.user is None
|
|
|
|
def test_saml_idp_logout(pub):
|
|
req = get_assertion_consumer_request(pub)
|
|
saml2 = Saml2Directory()
|
|
saml2.assertionConsumerPost()
|
|
assert req.session.user is not None
|
|
get_session_manager().maintain_session(req.session)
|
|
|
|
# get id from existing assertion
|
|
server = get_lasso_server()
|
|
login = lasso.Login(server)
|
|
login.setSessionFromDump(req.session.lasso_session_dump)
|
|
assertion_id = login.session.assertions['http://sso.example.net/saml2/metadata'].id
|
|
name_id = req.session.name_identifier
|
|
|
|
# and recreate an idp session
|
|
idp_metadata_filepath = os.path.join(pub.app_dir,
|
|
'idp-http-sso.example.net-saml2-metadata-metadata.xml')
|
|
idp_key_filepath = os.path.join(pub.app_dir,
|
|
'idp-http-sso.example.net-saml2-metadata-privatekey.pem')
|
|
idp = lasso.Server(idp_metadata_filepath, idp_key_filepath, None, None)
|
|
idp.addProvider(lasso.PROVIDER_ROLE_SP,
|
|
os.path.join(pub.app_dir, 'saml2-metadata.xml'),
|
|
os.path.join(pub.app_dir, 'public-key.pem'))
|
|
|
|
login = lasso.Login(idp)
|
|
login.initIdpInitiatedAuthnRequest(pub.cfg['sp']['saml2_providerid'])
|
|
login.request.nameIDPolicy.format = lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
|
|
login.request.nameIDPolicy.allowCreate = True
|
|
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST
|
|
login.processAuthnRequestMsg(None)
|
|
login.validateRequestMsg(True, True)
|
|
login.buildAssertion(lasso.SAML2_AUTHN_CONTEXT_PASSWORD,
|
|
datetime.datetime.now().isoformat(),
|
|
'unused',
|
|
(datetime.datetime.now() - datetime.timedelta(3600)).isoformat(),
|
|
(datetime.datetime.now() + datetime.timedelta(3600)).isoformat())
|
|
login.assertion.subject.nameID.content = name_id
|
|
login.assertion.id = assertion_id
|
|
login.assertion.authnStatement[0].sessionIndex = assertion_id
|
|
login.buildAuthnResponseMsg()
|
|
session_dump = login.session.dump()
|
|
|
|
logout = lasso.Logout(idp)
|
|
logout.setSessionFromDump(session_dump)
|
|
logout.initRequest(pub.cfg['sp']['saml2_providerid'], lasso.HTTP_METHOD_REDIRECT)
|
|
logout.buildRequestMsg()
|
|
|
|
# process logout message
|
|
saml2.slo_idp(urlparse.urlparse(logout.msgUrl).query)
|
|
assert req.response.headers['location'].startswith('http://sso.example.net/saml2/slo_return?SAMLResponse=')
|
|
assert req.session is None
|