458 lines
22 KiB
Python
458 lines
22 KiB
Python
# authentic2 - versatile identity manager
|
|
# Copyright (C) 2010-2019 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import re
|
|
import datetime
|
|
import base64
|
|
import unittest
|
|
from lxml.html import parse
|
|
|
|
from django.test import Client
|
|
from django.test.utils import override_settings
|
|
from django.contrib.auth import get_user_model, REDIRECT_FIELD_NAME
|
|
from django.core.urlresolvers import reverse
|
|
from django.core.files import File
|
|
from django.utils import six
|
|
from django.utils.translation import gettext as _
|
|
from django.utils.six.moves.urllib import parse as urlparse
|
|
|
|
from authentic2.compat import Base64Error
|
|
from authentic2.saml import models as saml_models
|
|
from authentic2.a2_rbac.models import Role, OrganizationalUnit
|
|
from authentic2.utils import make_url
|
|
from authentic2.constants import NONCE_FIELD_NAME, SERVICE_FIELD_NAME
|
|
from authentic2.models import Attribute
|
|
|
|
from utils import Authentic2TestCase
|
|
|
|
try:
|
|
import lasso
|
|
except ImportError:
|
|
lasso = None
|
|
|
|
|
|
@unittest.skipUnless(lasso is not None, 'lasso is not installed')
|
|
@override_settings(A2_IDP_SAML2_ENABLE=True)
|
|
class SamlBaseTestCase(Authentic2TestCase):
|
|
METADATA_TPL = '''<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
|
<EntityDescriptor
|
|
entityID="{base_url}/"
|
|
xmlns="urn:oasis:names:tc:SAML:2.0:metadata">
|
|
<SPSSODescriptor
|
|
AuthnRequestsSigned="true"
|
|
WantAssertionsSigned="true"
|
|
protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
|
|
<SingleLogoutService
|
|
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
|
|
Location="https://files.entrouvert.org/mellon/logout" />
|
|
<AssertionConsumerService
|
|
index="0"
|
|
isDefault="true"
|
|
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"
|
|
Location="{base_url}/sso/POST" />
|
|
<AssertionConsumerService
|
|
index="1"
|
|
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact"
|
|
Location="{base_url}/mellon/artifactResponse" />
|
|
</SPSSODescriptor>
|
|
</EntityDescriptor>'''
|
|
|
|
def get_sp_metadata(self, base_url='https://sp.example.com'):
|
|
return self.METADATA_TPL.format(base_url=base_url)
|
|
|
|
def get_idp_metadata(self):
|
|
client = Client()
|
|
response = client.get(reverse('a2-idp-saml-metadata'))
|
|
# FIXME: add better test of well formedness for metadata
|
|
self.assertEqual(response['Content-type'], 'text/xml',
|
|
msg='metadata endpoint did not return an XML '
|
|
'document')
|
|
self.assertIn('IDPSSODescriptor', response.content,
|
|
msg='metadata endpoint does not contain an '
|
|
'IDPSSODescriptor element')
|
|
return response.content
|
|
|
|
def get_server(self, base_url='https://sp.example.com'):
|
|
sp_meta = self.get_sp_metadata(base_url=base_url)
|
|
idp_meta = self.get_idp_metadata()
|
|
server = lasso.Server.newFromBuffers(sp_meta)
|
|
server.signatureMethod = lasso.SIGNATURE_METHOD_RSA_SHA256
|
|
server.addProviderFromBuffer(lasso.PROVIDER_ROLE_IDP, idp_meta)
|
|
return server
|
|
|
|
def setup(self, default_name_id_format='persistent'):
|
|
self.base_url = 'https://sp.example.com'
|
|
self.name = 'Test SP'
|
|
self.slug = 'test-sp'
|
|
self.email = 'john.doe@example.com'
|
|
self.username = 'john.doe'
|
|
self.first_name = 'John'
|
|
self.last_name = 'Doe'
|
|
self.password = 'T0toto'
|
|
self.code_attribute = Attribute.objects.create(kind='string', name='code', label='Code')
|
|
self.mobile_attribute = Attribute.objects.create(kind='string', name='mobile',
|
|
label='Mobile')
|
|
self.avatar_attribute = Attribute.objects.create(
|
|
kind='profile_image',
|
|
name='avatar',
|
|
label='Avatar')
|
|
self.user = get_user_model().objects.create(
|
|
email=self.email,
|
|
username=self.username,
|
|
first_name=self.first_name,
|
|
last_name=self.last_name)
|
|
self.code_attribute.set_value(self.user, '1234', verified=True)
|
|
self.mobile_attribute.set_value(self.user, '5678', verified=True)
|
|
self.avatar_attribute.set_value(self.user, File(open('tests/200x200.jpg')))
|
|
self.user.set_password(self.password)
|
|
self.user.save()
|
|
self.default_ou = OrganizationalUnit.objects.get()
|
|
sp_meta = self.get_sp_metadata()
|
|
self.liberty_provider = saml_models.LibertyProvider(
|
|
name=self.name,
|
|
slug=self.slug,
|
|
ou=self.default_ou,
|
|
metadata=sp_meta)
|
|
self.liberty_provider.clean()
|
|
self.liberty_provider.save()
|
|
self.liberty_service_provider = saml_models.LibertyServiceProvider \
|
|
.objects.create(
|
|
liberty_provider=self.liberty_provider,
|
|
enabled=True)
|
|
self.default_sp_options_idp_policy = saml_models.SPOptionsIdPPolicy \
|
|
.objects.create(
|
|
name='Default',
|
|
enabled=True,
|
|
authn_request_signed=False,
|
|
default_name_id_format=default_name_id_format,
|
|
accepted_name_id_format=['email', 'persistent', 'transient',
|
|
'username'])
|
|
self.admin_role = Role.objects.create(
|
|
name='Administrator',
|
|
slug='administrator',
|
|
service=self.liberty_provider)
|
|
self.admin_role.attributes.create(
|
|
name='superuser',
|
|
kind='strig',
|
|
value='true')
|
|
self.admin_role.members.add(self.user)
|
|
self.first_name_attribute = self.liberty_provider.attributes.create(
|
|
name_format='basic',
|
|
name='first-name',
|
|
friendly_name='First name',
|
|
attribute_name='django_user_first_name')
|
|
self.last_name_attribute = self.liberty_provider.attributes.create(
|
|
name_format='basic',
|
|
name='last-name',
|
|
friendly_name='Last name',
|
|
attribute_name='django_user_last_name')
|
|
self.superuser_attribute = self.liberty_provider.attributes.create(
|
|
name_format='basic',
|
|
name='superuser',
|
|
friendly_name='Superuser status',
|
|
attribute_name='superuser')
|
|
self.superuser_attribute = self.liberty_provider.attributes.create(
|
|
name_format='basic',
|
|
name='code_code',
|
|
friendly_name='code',
|
|
attribute_name='django_user_code')
|
|
self.superuser_attribute = self.liberty_provider.attributes.create(
|
|
name_format='basic',
|
|
name='mobile',
|
|
friendly_name='mobile',
|
|
attribute_name='django_user_mobile')
|
|
self.superuser_attribute = self.liberty_provider.attributes.create(
|
|
name_format='basic',
|
|
name='verified_attributes',
|
|
friendly_name='Verified attributes',
|
|
attribute_name='@verified_attributes@')
|
|
self.liberty_provider.attributes.create(
|
|
name_format='basic',
|
|
name='avatar',
|
|
friendly_name='Avatar',
|
|
attribute_name='django_user_avatar')
|
|
self.role_authorized = Role.objects.create(name='PC Delta', slug='pc-delta')
|
|
self.liberty_provider.unauthorized_url = 'https://whatever.com/loser/'
|
|
self.liberty_provider.save()
|
|
|
|
def make_authn_request(
|
|
self, idp=None,
|
|
method=lasso.HTTP_METHOD_REDIRECT,
|
|
allow_create=None,
|
|
format=None,
|
|
relay_state=None,
|
|
force_authn=None,
|
|
is_passive=None,
|
|
sp_name_qualifier=None,
|
|
sign=False,
|
|
name_id_policy=True):
|
|
server = self.get_server()
|
|
login = lasso.Login(server)
|
|
if not sign:
|
|
login.setSignatureHint(lasso.PROFILE_SIGNATURE_HINT_FORBID)
|
|
login.initAuthnRequest(idp, method)
|
|
request = login.request
|
|
policy = request.nameIdPolicy
|
|
if force_authn is not None:
|
|
request.forceAuthn = force_authn
|
|
if is_passive is not None:
|
|
request.isPassive = is_passive
|
|
if allow_create is not None:
|
|
policy.allowCreate = allow_create
|
|
if format is not None:
|
|
policy.format = format
|
|
if sp_name_qualifier is not None:
|
|
policy.spNameQualifier = sp_name_qualifier
|
|
if relay_state is not None:
|
|
login.msgRelayState = relay_state
|
|
if not name_id_policy:
|
|
request.nameIdPolicy = None
|
|
login.buildAuthnRequestMsg()
|
|
if method == lasso.HTTP_METHOD_REDIRECT:
|
|
self.assertIsNone(login.msgBody, 'body should be None with '
|
|
'method Redirect')
|
|
elif method == lasso.HTTP_METHOD_POST:
|
|
self.assertIsNotNone(login.msgBody)
|
|
self.assertIsNone(login.msgBody, 'body should be None with method '
|
|
'Redirect')
|
|
url_parsed = urlparse.urlparse(login.msgUrl)
|
|
self.assertEqual(url_parsed.path, reverse('a2-idp-saml-sso'),
|
|
'msgUrl should target the sso endpoint')
|
|
if sign:
|
|
assert 'rsa-sha256' in login.msgUrl
|
|
return login.msgUrl, login.msgBody, request.id
|
|
|
|
def parse_authn_response(self, saml_response):
|
|
server = self.get_server()
|
|
login = lasso.Login(server)
|
|
login.processAuthnResponseMsg(saml_response)
|
|
login.acceptSso()
|
|
return login
|
|
|
|
|
|
class SamlSSOTestCase(SamlBaseTestCase):
|
|
def test_sso_login_redirect(self):
|
|
self.do_test_sso(dict(allow_create=True,
|
|
format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT))
|
|
|
|
def test_sso_cancel_redirect(self):
|
|
self.do_test_sso(dict(allow_create=True), cancel=True)
|
|
|
|
def test_sso_no_name_id_policy_redirect(self):
|
|
self.do_test_sso(dict(allow_create=True, name_id_policy=False),
|
|
check_federation=False, default_name_id_format='email')
|
|
|
|
def test_sso_name_id_policy_username(self):
|
|
self.do_test_sso(dict(allow_create=True, name_id_policy=False),
|
|
check_federation=True, default_name_id_format='username')
|
|
|
|
def test_sso_name_id_policy_uuid(self):
|
|
self.do_test_sso(dict(allow_create=True, name_id_policy=False),
|
|
check_federation=True, default_name_id_format='uuid')
|
|
|
|
def test_sso_unauthorized_role(self):
|
|
self.do_test_sso(dict(allow_create=True), authorized_service=False)
|
|
|
|
def do_test_sso(self, make_authn_request_kwargs={}, check_federation=True,
|
|
cancel=False, default_name_id_format='persistent', authorized_service=True):
|
|
self.setup(default_name_id_format=default_name_id_format)
|
|
client = Client()
|
|
# Launch an AuthnRequest
|
|
url, body, request_id = self.make_authn_request(
|
|
**make_authn_request_kwargs)
|
|
response = client.get(url)
|
|
self.assertRedirectsComplex(response, reverse('auth_login'), **{
|
|
'nonce': '*',
|
|
SERVICE_FIELD_NAME: self.slug,
|
|
REDIRECT_FIELD_NAME: make_url('a2-idp-saml-continue',
|
|
params={
|
|
NONCE_FIELD_NAME: request_id,
|
|
}
|
|
),
|
|
})
|
|
nonce = urlparse.parse_qs(
|
|
urlparse.urlparse(
|
|
response['Location']).query)['nonce'][0]
|
|
url = response['Location']
|
|
response = client.get(url)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertEqual(response['Content-Type'].split(';')[0], 'text/html')
|
|
self.assertInHTML(u'<button class="cancel-button" name="cancel" formnovalidate>%s</button>' % _('Cancel'), response.content,
|
|
count=1)
|
|
if cancel:
|
|
response = client.post(url, {
|
|
'cancel': 1,
|
|
})
|
|
self.assertRedirectsComplex(response,
|
|
reverse('a2-idp-saml-continue'),
|
|
cancel='*', nonce=nonce)
|
|
response = client.get(response['Location'])
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertEqual(response['Content-type'].split(';')[0],
|
|
'text/html')
|
|
doc = parse(six.BytesIO(response.content)).getroot()
|
|
self.assertEqual(len(doc.forms), 1,
|
|
msg='the number of forms is not 1')
|
|
self.assertEqual(doc.forms[0].get('action'),
|
|
'%s/sso/POST' % self.base_url)
|
|
self.assertIn('SAMLResponse', doc.forms[0].fields)
|
|
saml_response = doc.forms[0].fields['SAMLResponse']
|
|
try:
|
|
decoded_saml_response = base64.b64decode(saml_response)
|
|
except Base64Error:
|
|
self.fail('SAMLResponse is not base64 encoded: %s'
|
|
% saml_response)
|
|
assert b'rsa-sha256' in decoded_saml_response
|
|
with self.assertRaises(lasso.ProfileRequestDeniedError):
|
|
assertion = self.parse_authn_response(saml_response)
|
|
elif not authorized_service:
|
|
self.liberty_provider.add_authorized_role(self.role_authorized)
|
|
# User without the authorized role
|
|
role_goth = Role.objects.create(name='Goth Kids', slug='goth-kids')
|
|
self.user.roles.all().delete()
|
|
self.user.roles.add(role_goth)
|
|
response = client.post(url, {
|
|
'username': self.email,
|
|
'password': self.password,
|
|
'login-password-submit': 1})
|
|
response = client.get(response['Location'])
|
|
assert 'https://whatever.com/loser/' in response.content
|
|
# User with the authorized role
|
|
self.user.roles.add(self.role_authorized)
|
|
response = client.post(url, {
|
|
'username': self.email,
|
|
'password': self.password,
|
|
'login-password-submit': 1})
|
|
response = client.get(response['Location'])
|
|
assert 'SAMLResponse' in response.content
|
|
else:
|
|
response = client.post(url, {
|
|
'username': self.email,
|
|
'password': self.password,
|
|
'login-password-submit': 1,
|
|
})
|
|
self.assertRedirectsComplex(
|
|
response, reverse('a2-idp-saml-continue'), nonce=nonce)
|
|
response = client.get(response['Location'])
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertEqual(response['Content-type'].split(';')[0], 'text/html')
|
|
doc = parse(six.BytesIO(response.content)).getroot()
|
|
self.assertEqual(len(doc.forms), 1, msg='the number of forms is not 1')
|
|
self.assertEqual(
|
|
doc.forms[0].get('action'), '%s/sso/POST' % self.base_url)
|
|
self.assertIn('SAMLResponse', doc.forms[0].fields)
|
|
saml_response = doc.forms[0].fields['SAMLResponse']
|
|
try:
|
|
decoded_saml_response = base64.b64decode(saml_response)
|
|
except Base64Error:
|
|
self.fail('SAMLResponse is not base64 encoded: %s' % saml_response)
|
|
assert b'rsa-sha256' in decoded_saml_response
|
|
login = self.parse_authn_response(saml_response)
|
|
assertion = login.assertion
|
|
session_not_on_or_after = login.assertion.authnStatement[0].sessionNotOnOrAfter
|
|
assert session_not_on_or_after is not None
|
|
assert (datetime.datetime.strptime(session_not_on_or_after, '%Y-%m-%dT%H:%M:%SZ') >
|
|
datetime.datetime.utcnow())
|
|
assertion_xml = assertion.exportToXml()
|
|
namespaces = {
|
|
'saml': lasso.SAML2_ASSERTION_HREF,
|
|
}
|
|
constraints = ()
|
|
# check nameid
|
|
if check_federation:
|
|
format = make_authn_request_kwargs.get('format')
|
|
if not format:
|
|
if self.default_sp_options_idp_policy.default_name_id_format == 'username':
|
|
self.assertEqual(login.assertion.subject.nameID.format,
|
|
lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED)
|
|
self.assertEqual(login.assertion.subject.nameID.content,
|
|
self.user.username.encode('utf-8'))
|
|
elif self.default_sp_options_idp_policy.default_name_id_format == 'uuid':
|
|
self.assertEqual(login.assertion.subject.nameID.format,
|
|
lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED)
|
|
self.assertEqual(login.assertion.subject.nameID.content,
|
|
self.user.uuid.encode('utf-8'))
|
|
elif format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT:
|
|
federation = saml_models.LibertyFederation.objects.get()
|
|
constraints += (
|
|
('/saml:Assertion/saml:Subject/saml:NameID',
|
|
federation.name_id_content),
|
|
('/saml:Assertion/saml:Subject/saml:NameID/@Format',
|
|
lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT),
|
|
('/saml:Assertion/saml:Subject/saml:NameID/@SPNameQualifier',
|
|
'%s/' % self.base_url),
|
|
|
|
)
|
|
elif (format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_EMAIL or
|
|
(not format and default_name_id_format == 'email')):
|
|
constraints += (
|
|
('/saml:Assertion/saml:Subject/saml:NameID',
|
|
self.email),
|
|
('/saml:Assertion/saml:Subject/saml:NameID/@Format',
|
|
lasso.SAML2_NAME_IDENTIFIER_FORMAT_EMAIL),
|
|
)
|
|
constraints += (
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='first-name']/"
|
|
"@NameFormat", lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='first-name']/"
|
|
"@FriendlyName", 'First name'),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='first-name']/"
|
|
"saml:AttributeValue", 'John'),
|
|
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='last-name']/"
|
|
"@NameFormat", lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='last-name']/"
|
|
"@FriendlyName", 'Last name'),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='last-name']/"
|
|
"saml:AttributeValue", 'Doe'),
|
|
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='superuser']/"
|
|
"@NameFormat", lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='superuser']/"
|
|
"@FriendlyName", 'Superuser status'),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='superuser']/"
|
|
"saml:AttributeValue", 'true'),
|
|
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='code_code']/"
|
|
"@NameFormat", lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='code_code']/"
|
|
"@FriendlyName", 'code'),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='code_code']/"
|
|
"saml:AttributeValue", '1234'),
|
|
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='mobile']/"
|
|
"@NameFormat", lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='mobile']/"
|
|
"@FriendlyName", 'mobile'),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='mobile']/"
|
|
"saml:AttributeValue", '5678'),
|
|
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='avatar']/"
|
|
"@NameFormat", lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='avatar']/"
|
|
"@FriendlyName", 'Avatar'),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='avatar']/"
|
|
"saml:AttributeValue", re.compile('^http://testserver/media/profile-image/.*$')),
|
|
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='verified_attributes']/"
|
|
"@NameFormat", lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='verified_attributes']/"
|
|
"@FriendlyName", 'Verified attributes'),
|
|
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='verified_attributes']/"
|
|
"saml:AttributeValue", set(['code_code', 'mobile'])),
|
|
)
|
|
self.assertXPathConstraints(assertion_xml, constraints, namespaces)
|