authentic/tests/test_idp_saml2.py

985 lines
38 KiB
Python

# coding: utf-8
# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import base64
import datetime
import hashlib
import re
import xml.etree.ElementTree as ET
import lasso
import mock
import pytest
from django.contrib.auth import REDIRECT_FIELD_NAME
from django.core.files import File
from django.template import Context, Template
from django.urls import reverse
from django.utils.encoding import force_bytes, force_str, force_text
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import gettext as _
from authentic2.a2_rbac.models import OrganizationalUnit, Role
from authentic2.constants import NONCE_FIELD_NAME, SERVICE_FIELD_NAME
from authentic2.custom_user.models import User
from authentic2.idp.saml import saml2_endpoints
from authentic2.idp.saml.saml2_endpoints import get_extensions, get_login_hints_extension
from authentic2.models import Attribute, Service
from authentic2.saml import models as saml_models
from authentic2.saml.models import SAMLAttribute
from authentic2.utils import make_url
from . import utils
@pytest.fixture
def saml_settings(settings):
settings.A2_IDP_SAML2_ENABLE = True
settings.A2_LOGIN_DISPLAY_A_CANCEL_BUTTON = True
def get_idp_metadata(app):
response = app.get('/idp/saml2/metadata')
# FIXME: add better test of well formedness for metadata
assert response['Content-type'] == 'text/xml', 'metadata endpoint did not return an XML document'
assert (
'IDPSSODescriptor' in response.text
), 'metadata endpoint does not contain an IDPSSODescriptor element'
return response.text
class Raw(object):
def __init__(self, d):
self.__dict__.update(d)
@pytest.fixture
def keys():
with open('tests/cert.pem') as fd:
cert = ''.join(fd.read().splitlines()[1:-1])
with open('tests/key.pem') as fd:
key = ''.join(fd.read().splitlines()[1:-1])
return (cert, key)
@pytest.fixture()
def idp(saml_settings, db):
code_attribute = Attribute.objects.create(kind='string', name='code', label='Code')
mobile_attribute = Attribute.objects.create(kind='string', name='mobile', label='Mobile')
avatar_attribute = Attribute.objects.create(kind='profile_image', name='avatar', label='Avatar')
default_ou = OrganizationalUnit.objects.get()
return Raw(locals())
@pytest.fixture
def user(idp):
email = 'john.doe@example.com'
username = 'john.doe'
first_name = 'John'
last_name = 'Doe'
user = User.objects.create(email=email, username=username, first_name=first_name, last_name=last_name)
idp.code_attribute.set_value(user, '1234', verified=True)
idp.mobile_attribute.set_value(user, '5678', verified=True)
with open('tests/200x200.jpg', 'rb') as fd:
idp.avatar_attribute.set_value(user, File(fd))
user.set_password(username)
user.save()
return user
class SamlSP(object):
METADATA_TPL = '''<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<EntityDescriptor
entityID="{{ base_url }}/"
xmlns="urn:oasis:names:tc:SAML:2.0:metadata">
<SPSSODescriptor
AuthnRequestsSigned="true"
WantAssertionsSigned="true"
protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
{% if keys %}
<KeyDescriptor use="signing">
<ds:KeyInfo xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
<ds:X509Data><ds:X509Certificate>{{ keys.0 }}</ds:X509Certificate></ds:X509Data>
</ds:KeyInfo>
</KeyDescriptor>
{% endif %}
<SingleLogoutService
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
Location="https://files.entrouvert.org/mellon/logout" />
{% if binding == 'post' %}
<AssertionConsumerService
index="0"
isDefault="true"
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"
Location="{{ base_url }}/sso/POST" />
{% elif binding == 'artifact' %}
<AssertionConsumerService
index="0"
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact"
Location="{{ base_url }}/mellon/artifactResponse" />
{% endif %}
</SPSSODescriptor>
</EntityDescriptor>'''
service = None
server = None
binding = 'post'
keys = None # pair of public and private key as PEM
relay_state = 'relay-state'
def __init__(self, app, **kwargs):
self.app = app
self.base_url = 'https://sp.example.com'
self.name = 'Test SP'
self.slug = 'test-sp'
self.idp_entity_idp = ('http://testserver/idp/saml2/metadata',)
self.default_name_id_format = 'email'
self.accepted_name_id_format = ['email', 'persistent', 'transient', 'username']
self.ou = OrganizationalUnit.objects.get()
self.__dict__.update(kwargs)
self.provider = saml_models.LibertyProvider(
name=self.name, slug=self.slug, ou=self.ou, metadata=self.get_metadata()
)
self.provider.clean()
self.provider.save()
self.service = saml_models.LibertyServiceProvider.objects.create(
liberty_provider=self.provider, enabled=True
)
self.default_sp_options_idp_policy = saml_models.SPOptionsIdPPolicy.objects.create(
name='Default',
enabled=True,
authn_request_signed=False,
default_name_id_format=self.default_name_id_format,
accepted_name_id_format=self.accepted_name_id_format,
)
# Admin role
self.admin_role = Role.objects.create(
name='Administrator', slug='administrator', service=self.provider
)
self.admin_role.attributes.create(name='superuser', kind='string', value='true')
# SAML attributes mapping
self.saml_first_name_attribute = self.provider.attributes.create(
name_format='basic',
name='first-name',
friendly_name='First name',
attribute_name='django_user_first_name',
)
self.saml_last_name_attribute = self.provider.attributes.create(
name_format='basic',
name='last-name',
friendly_name='Last name',
attribute_name='django_user_last_name',
)
self.saml_superuser_attribute = self.provider.attributes.create(
name_format='basic',
name='superuser',
friendly_name='Superuser status',
attribute_name='superuser',
)
self.saml_code_attribute = self.provider.attributes.create(
name_format='basic', name='code_code', friendly_name='code', attribute_name='django_user_code'
)
self.saml_mobile_attribute = self.provider.attributes.create(
name_format='basic', name='mobile', friendly_name='mobile', attribute_name='django_user_mobile'
)
self.saml_verified_attributes = self.provider.attributes.create(
name_format='basic',
name='verified_attributes',
friendly_name='Verified attributes',
attribute_name='@verified_attributes@',
)
self.saml_avatar_attribute = self.provider.attributes.create(
name_format='basic', name='avatar', friendly_name='Avatar', attribute_name='django_user_avatar'
)
self.role_authorized = Role.objects.create(name='PC Delta', slug='pc-delta')
self.provider.unauthorized_url = 'https://whatever.com/loser/'
self.provider.save()
def get_metadata(self):
return Template(self.METADATA_TPL).render(
Context(dict(base_url=self.base_url, binding=self.binding, keys=self.keys))
)
def get_server(self):
if not self.server:
sp_meta = self.get_metadata()
idp_meta = get_idp_metadata(self.app)
self.server = lasso.Server.newFromBuffers(sp_meta, self.keys[1] if self.keys else None)
self.server.signatureMethod = lasso.SIGNATURE_METHOD_RSA_SHA256
self.server.addProviderFromBuffer(lasso.PROVIDER_ROLE_IDP, force_str(idp_meta))
return self.server
def make_authn_request(
self,
entity_id=None,
method=lasso.HTTP_METHOD_REDIRECT,
allow_create=True,
format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
relay_state=None,
force_authn=None,
is_passive=None,
sp_name_qualifier=None,
name_id_policy=True,
login_hints=None,
):
server = self.get_server()
login = self.login = lasso.Login(server)
if not self.keys:
login.setSignatureHint(lasso.PROFILE_SIGNATURE_HINT_FORBID)
login.initAuthnRequest(entity_id, method)
request = login.request
policy = request.nameIdPolicy
if force_authn is not None:
request.forceAuthn = force_authn
if is_passive is not None:
request.isPassive = is_passive
if allow_create is not None:
policy.allowCreate = allow_create
if format is not None:
policy.format = format
if sp_name_qualifier is not None:
policy.spNameQualifier = sp_name_qualifier
relay_state = relay_state or self.relay_state
if relay_state is not None:
login.msgRelayState = force_str(relay_state)
if not name_id_policy:
request.nameIdPolicy = None
request.extensions = lasso.Samlp2Extensions()
# extension with unicode characters !! test dumping in saml2_endpoints and continue_sso
request.extensions.any = (force_str('<extension xmlns="http://example.com/">éé</extension>'),)
if login_hints:
request.extensions.any = (
force_str(
'<login-hint xmlns="https://www.entrouvert.com/">%s</login-hint>' % ' '.join(login_hints)
),
)
login.buildAuthnRequestMsg()
url_parsed = urlparse.urlparse(login.msgUrl)
assert url_parsed.path == reverse('a2-idp-saml-sso'), 'msgUrl should target the sso endpoint'
if self.keys:
assert 'rsa-sha256' in login.msgUrl
return login.msgUrl, login.msgBody, login.msgRelayState, request.id
def parse_authn_response(self, saml_response):
login = self.login = lasso.Login(self.get_server())
login.processAuthnResponseMsg(force_str(saml_response))
login.acceptSso()
def parse_artifact_url(self, response):
login = self.login = lasso.Login(self.get_server())
if response.location:
method = lasso.HTTP_METHOD_ARTIFACT_GET
query_string = response.location.split('?', 1)[1]
parsed_query_string = urlparse.parse_qs(query_string)
self.relay_state = parsed_query_string.get('RelayState')
login.msgRelayState = force_str(self.relay_state)
else: # lasso.HTTP_METHOD_ARTIFACT_POST, never happens
raise NotImplementedError
if not self.keys:
login.setSignatureHint(lasso.PROFILE_SIGNATURE_HINT_FORBID)
login.initRequest(force_str(query_string), method)
login.buildRequestMsg()
response = self.app.post(
login.msgUrl, params=force_bytes(login.msgBody), headers={'content-type': str('text/xml')}
)
login.processResponseMsg(force_str(response.text))
login.acceptSso()
class Scenario(object):
check_federation = False
def __init__(self, app, sp_kwargs=None, make_authn_request_kwargs=None, **kwargs):
self.app = app
sp_kwargs = sp_kwargs or {}
self.sp = SamlSP(app=app, **sp_kwargs)
self.make_authn_request_kwargs = make_authn_request_kwargs or {}
self.__dict__.update(kwargs)
def launch_authn_request(self):
# Launch an AuthnRequest
url, body, relay_state, request_id = self.sp.make_authn_request(**self.make_authn_request_kwargs)
if body is None:
response = self.app.get(url)
else: # post case
params = {'SAMLRequest': body}
if relay_state is not None:
params['RelayState'] = relay_state
response = self.app.post(url, params=params)
utils.assert_redirects_complex(
response,
reverse('auth_login'),
**{
'nonce': '*',
SERVICE_FIELD_NAME: 'default ' + self.sp.slug,
REDIRECT_FIELD_NAME: make_url('a2-idp-saml-continue', params={NONCE_FIELD_NAME: request_id}),
},
)
self.nonce = urlparse.parse_qs(urlparse.urlparse(response['Location']).query)['nonce'][0]
url = response['Location']
response = self.app.get(url)
assert response.status_code == 200
assert response['Content-Type'].split(';')[0] == 'text/html'
assert response.pyquery('button.cancel-button[name=cancel]').text() == _('Cancel')
self.login_page_response = response
def login(self, user):
response = self.login_page_response
response.form.set('username', user.username)
response.form.set('password', user.username)
response = response.form.submit(name='login-password-submit')
utils.assert_redirects_complex(response, reverse('a2-idp-saml-continue'), nonce=self.nonce)
self.idp_response = response.follow()
return response
def cancel(self):
response = self.login_page_response.form.submit(name='cancel')
utils.assert_redirects_complex(
response, reverse('a2-idp-saml-continue'), cancel='*', nonce=self.nonce
)
self.idp_response = response.follow()
return response
def handle_post_response(self):
response = self.idp_response
assert response.status_code == 200
assert response['Content-type'].split(';')[0] == 'text/html'
assert len(response.forms) == 1
assert response.form.action == '%s/sso/POST' % self.sp.base_url
assert 'SAMLResponse' in response.form.fields
if self.sp.relay_state is not None:
assert response.form['RelayState'].value == self.sp.relay_state
saml_response = response.form['SAMLResponse'].value
decoded_saml_response = base64.b64decode(saml_response)
assert b'rsa-sha256' in decoded_saml_response
self.sp.parse_authn_response(saml_response)
def handle_artifact_response(self):
response = self.idp_response
assert response.status_code == 302
assert response.location.startswith('https://sp.example.com/mellon/artifactResponse?SAMLart=')
self.sp.parse_artifact_url(response)
def check_assertion(self, user=None):
login = self.sp.login
assertion = login.assertion
session_not_on_or_after = login.assertion.authnStatement[0].sessionNotOnOrAfter
assert session_not_on_or_after is not None
assert (
datetime.datetime.strptime(session_not_on_or_after, '%Y-%m-%dT%H:%M:%SZ')
> datetime.datetime.utcnow()
)
assertion_xml = assertion.exportToXml()
namespaces = {
'saml': lasso.SAML2_ASSERTION_HREF,
}
constraints = ()
# check nameid
if self.check_federation:
nid_format = self.make_authn_request_kwargs.get('format')
if not nid_format:
name_id = login.assertion.subject.nameID
if self.sp.default_name_id_format == 'username':
assert name_id.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED
assert force_text(name_id.content) == user.username
elif self.sp.default_name_id_format == 'uuid':
assert name_id.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED
assert force_text(name_id.content) == user.uuid
else:
raise NotImplementedError(
'unknown default_name_id_format %s' % self.sp.default_name_id_format
)
elif nid_format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT:
federation = saml_models.LibertyFederation.objects.get()
constraints += (
('/saml:Assertion/saml:Subject/saml:NameID', federation.name_id_content),
(
'/saml:Assertion/saml:Subject/saml:NameID/@Format',
lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
),
('/saml:Assertion/saml:Subject/saml:NameID/@SPNameQualifier', '%s/' % self.sp.base_url),
)
elif nid_format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_EMAIL or (
not nid_format and self.sp.default_name_id_format == 'email'
):
constraints += (
('/saml:Assertion/saml:Subject/saml:NameID', self.email),
(
'/saml:Assertion/saml:Subject/saml:NameID/@Format',
lasso.SAML2_NAME_IDENTIFIER_FORMAT_EMAIL,
),
)
constraints += (
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='first-name']/" "@NameFormat",
lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC,
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='first-name']/" "@FriendlyName",
'First name',
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='first-name']/"
"saml:AttributeValue",
'John',
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='last-name']/" "@NameFormat",
lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC,
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='last-name']/" "@FriendlyName",
'Last name',
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='last-name']/"
"saml:AttributeValue",
'Doe',
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='code_code']/" "@NameFormat",
lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC,
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='code_code']/" "@FriendlyName",
'code',
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='code_code']/"
"saml:AttributeValue",
'1234',
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='mobile']/" "@NameFormat",
lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC,
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='mobile']/" "@FriendlyName",
'mobile',
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='mobile']/"
"saml:AttributeValue",
'5678',
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='avatar']/" "@NameFormat",
lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC,
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='avatar']/" "@FriendlyName",
'Avatar',
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='avatar']/"
"saml:AttributeValue",
re.compile('^http://testserver/media/profile-image/.*$'),
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='verified_attributes']/"
"@NameFormat",
lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC,
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='verified_attributes']/"
"@FriendlyName",
'Verified attributes',
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='verified_attributes']/"
"saml:AttributeValue",
set(['code_code', 'mobile']),
),
)
if user is not None and self.sp.admin_role in user.roles.all():
constraints += (
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='superuser']/"
"@NameFormat",
lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC,
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='superuser']/"
"@FriendlyName",
'Superuser status',
),
(
"/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='superuser']/"
"saml:AttributeValue",
'true',
),
)
utils.assert_xpath_constraints(assertion_xml, constraints, namespaces)
def test_sso_redirect_post(app, idp, user):
scenario = Scenario(app, sp_kwargs=dict(binding='post'))
scenario.launch_authn_request()
scenario.login(user)
scenario.handle_post_response()
scenario.check_assertion(user=user)
def test_sso_post_post(app, idp, user):
scenario = Scenario(
app, make_authn_request_kwargs={'method': lasso.HTTP_METHOD_POST}, sp_kwargs=dict(binding='post')
)
scenario.launch_authn_request()
scenario.login(user)
scenario.handle_post_response()
scenario.check_assertion(user=user)
def test_sso_redirect_artifact(app, idp, user, keys):
scenario = Scenario(app, sp_kwargs=dict(binding='artifact', keys=keys))
scenario.launch_authn_request()
scenario.login(user)
scenario.handle_artifact_response()
scenario.check_assertion(user=user)
def test_sso_cancel_redirect(app, idp):
scenario = Scenario(app)
scenario.launch_authn_request()
scenario.cancel()
with pytest.raises(lasso.ProfileRequestDeniedError):
scenario.handle_post_response()
def test_sso_no_name_id_policy_redirect(app, idp, user):
scenario = Scenario(app, make_authn_request_kwargs=dict(name_id_policy=False))
scenario.launch_authn_request()
scenario.login(user=user)
scenario.handle_post_response()
scenario.check_assertion(user=user)
def test_sso_nid_username(app, idp, user):
scenario = Scenario(
app,
sp_kwargs=dict(default_name_id_format='username'),
make_authn_request_kwargs=dict(name_id_policy=False),
check_federation=True,
)
scenario.launch_authn_request()
scenario.login(user=user)
scenario.handle_post_response()
scenario.check_assertion(user=user)
def test_sso_nid_uuid(app, idp, user):
scenario = Scenario(
app,
sp_kwargs=dict(default_name_id_format='uuid'),
make_authn_request_kwargs=dict(name_id_policy=False),
check_federation=True,
)
scenario.launch_authn_request()
scenario.login(user=user)
scenario.handle_post_response()
scenario.check_assertion(user=user)
def test_sso_authorized_role_ok(app, idp, user):
scenario = Scenario(app)
scenario.sp.provider.add_authorized_role(scenario.sp.role_authorized)
user.roles.add(scenario.sp.role_authorized)
scenario.launch_authn_request()
scenario.login(user=user)
scenario.handle_post_response()
scenario.check_assertion(user=user)
def test_sso_authorized_role_nok(app, idp, user):
scenario = Scenario(app)
scenario.sp.provider.add_authorized_role(scenario.sp.role_authorized)
scenario.launch_authn_request()
scenario.login(user=user)
assert scenario.idp_response.pyquery('a[href="%s"]' % 'https://whatever.com/loser/').text() == 'Back'
def test_sso_redirect_artifact_login_hints(app, user, keys):
scenario = Scenario(
app,
sp_kwargs=dict(binding='artifact', keys=keys),
make_authn_request_kwargs={'login_hints': ['backoffice']},
)
scenario.launch_authn_request()
assert app.session['login-hint'] == ['backoffice']
scenario.login(user)
scenario.handle_artifact_response()
scenario.check_assertion(user=user)
@pytest.fixture
def add_attributes(rf):
with mock.patch('authentic2.idp.saml.saml2_endpoints.get_attribute_definitions') as get_definitions:
with mock.patch(
'authentic2.idp.saml.saml2_endpoints.get_attributes', wraps=saml2_endpoints.get_attributes
) as get_attributes:
request = rf.get('/')
request.user = None
assertion = lasso.Saml2Assertion()
provider = Service()
def func():
saml2_endpoints.add_attributes(
func.request,
saml2_endpoints.get_entity_id(func.request),
func.assertion,
func.provider,
func.nid_format,
)
return {
at.name: set(
[''.join(force_text(mtn.dump()) for mtn in atv.any) for atv in at.attributeValue]
)
for at in assertion.attributeStatement[0].attribute
}
func.get_definitions = get_definitions
func.get_attributes = get_attributes
func.request = request
func.assertion = assertion
func.provider = provider
func.nid_format = 'transient'
yield func
def test_add_attributes_empty_assertion(add_attributes):
'''Verify adding attributes to an otherwise empty assertion'''
# setup
add_attributes.get_attributes.return_value = {
'first_name': ['Éléonore'],
'last_name': ['Rigby'],
}
add_attributes.get_definitions.return_value = [
SAMLAttribute(name_format='basic', name='prenom', attribute_name='first_name'),
SAMLAttribute(name_format='basic', name='nom', attribute_name='last_name'),
]
# run
attributes = add_attributes()
# check
assert attributes == {
'nom': set(['Rigby']),
'prenom': set(['Éléonore']),
}
def test_add_attributes_initialized_assertion(add_attributes):
'''Verify existing assertion's attributes are preserved'''
# setup
add_attributes.get_attributes.return_value = {
'first_name': ['Éléonore'],
'last_name': ['Rigby'],
}
add_attributes.get_definitions.return_value = [
SAMLAttribute(name_format='basic', name='prenom', attribute_name='first_name'),
SAMLAttribute(name_format='basic', name='nom', attribute_name='last_name'),
]
assertion = add_attributes.assertion
(statement,) = assertion.attributeStatement = [lasso.Saml2AttributeStatement()]
(attribute,) = statement.attribute = [
lasso.Saml2Attribute(),
]
attribute.name = 'prenom'
attribute.nameFormat = lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC
(atv,) = attribute.attributeValue = [lasso.Saml2AttributeValue()]
(mtn,) = atv.any = [
lasso.MiscTextNode.newWithString('coucou'),
]
mtn.textChild = True
# run
attributes = add_attributes()
# check
assert attributes == {
'nom': set(['Rigby']),
'prenom': set(['Éléonore', 'coucou']),
}
@pytest.fixture
def profile():
server = lasso.Server()
profile = lasso.Login(server)
profile.request = lasso.Samlp2AuthnRequest()
yield profile
def test_get_extensions(profile):
assert get_extensions(profile) == []
profile.request.extensions = lasso.Samlp2Extensions()
profile.request.extensions.any = (force_str('<extension attribute="1"/>'),)
extensions = get_extensions(profile)
assert len(extensions) == 1, 'there should be one extension node'
assert extensions[0].tag == 'extension'
assert extensions[0].attrib['attribute'] == '1'
def test_get_login_hints_extension(profile):
assert get_login_hints_extension(profile) == set()
extensions = [
'<login-hint xmlns="https://www.entrouvert.com/">backoffice saint-machin-truc</login-hint>',
'<extension attribute="1"/>',
'<login-hint xmlns="https://www.entrouvert.com/">toto@example.com</login-hint>',
]
profile.request.extensions = lasso.Samlp2Extensions()
profile.request.extensions.any = tuple(force_str(ext) for ext in extensions)
login_hints = get_login_hints_extension(profile)
assert login_hints == set(['backoffice', 'saint-machin-truc', 'toto@example.com'])
def test_make_edu_person_targeted_id(db, settings, rf):
user = User.objects.create(username='a')
provider = saml_models.LibertyProvider(entity_id='https://sp.com/')
assert saml2_endpoints.make_edu_person_targeted_id_value(provider, user) is None
settings.A2_IDP_SAML2_EDU_PERSON_TARGETED_ID_SALT = 'b'
settings.A2_IDP_SAML2_EDU_PERSON_TARGETED_ID_ATTRIBUTE = 'username'
assert saml2_endpoints.make_edu_person_targeted_id_value(provider, user) == (
'_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5'
)
assert (
saml2_endpoints.make_edu_person_targeted_id_value(provider, user)
== '_' + hashlib.sha256(b'b' + b'https://sp.com/' + b'a').hexdigest().upper()
)
edpt = saml2_endpoints.make_edu_person_targeted_id('http://testserver/idp/saml2/metadata', provider, user)
assert edpt is not None
node = lasso.Node.newFromXmlNode(force_str(ET.tostring(edpt)))
assert isinstance(node, lasso.Saml2NameID)
assert force_text(node.content) == ('_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5')
assert node.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
assert node.nameQualifier == 'http://testserver/idp/saml2/metadata'
assert node.spNameQualifier == 'https://sp.com/'
def test_add_attributes_edu_person_targeted_id_nid_format(db, settings, rf, add_attributes):
# setup
user = User.objects.create(username='a', first_name='John', last_name='Rambo')
settings.A2_IDP_SAML2_EDU_PERSON_TARGETED_ID_SALT = 'b'
settings.A2_IDP_SAML2_EDU_PERSON_TARGETED_ID_ATTRIBUTE = 'username'
add_attributes.provider.entity_id = 'https://sp.com/'
add_attributes.request.user = user
add_attributes.nid_format = 'edupersontargetedid'
add_attributes.get_definitions.return_value = [
SAMLAttribute(name_format='basic', name='prenom', attribute_name='django_user_first_name'),
SAMLAttribute(name_format='basic', name='nom', attribute_name='django_user_last_name'),
]
# run
attributes = add_attributes()
# check
assert len(attributes) == 3
assert attributes['nom'] == set(['Rambo'])
assert attributes['prenom'] == set(['John'])
edu_name = 'urn:oid:1.3.6.1.4.1.5923.1.1.1.10'
assert len(attributes[edu_name]) == 1
node = lasso.Node.newFromXmlNode(force_str(list(attributes[edu_name])[0]))
assert isinstance(node, lasso.Saml2NameID)
assert force_text(node.content) == ('_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5')
assert node.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
assert node.nameQualifier == 'http://testserver/idp/saml2/metadata'
assert node.spNameQualifier == 'https://sp.com/'
def test_add_attributes_edu_person_targeted_id_attribute(db, settings, rf, add_attributes):
# setup
user = User.objects.create(username='a', first_name='John', last_name='Rambo')
settings.A2_IDP_SAML2_EDU_PERSON_TARGETED_ID_SALT = 'b'
settings.A2_IDP_SAML2_EDU_PERSON_TARGETED_ID_ATTRIBUTE = 'username'
add_attributes.provider.entity_id = 'https://sp.com/'
add_attributes.request.user = user
add_attributes.nid_format = 'transient'
add_attributes.get_definitions.return_value = [
SAMLAttribute(name_format='basic', name='prenom', attribute_name='django_user_first_name'),
SAMLAttribute(name_format='basic', name='nom', attribute_name='django_user_last_name'),
SAMLAttribute(name_format='basic', name='edupersontargetedid', attribute_name='edupersontargetedid'),
]
# run
attributes = add_attributes()
# check
assert len(attributes) == 3
assert attributes['nom'] == set(['Rambo'])
assert attributes['prenom'] == set(['John'])
assert len(attributes['edupersontargetedid']) == 1
node = lasso.Node.newFromXmlNode(force_str(list(attributes['edupersontargetedid'])[0]))
assert isinstance(node, lasso.Saml2NameID)
assert force_text(node.content) == ('_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5')
assert node.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT
assert node.nameQualifier == 'http://testserver/idp/saml2/metadata'
assert node.spNameQualifier == 'https://sp.com/'
@pytest.fixture
def add_attributes_all(add_attributes):
add_attributes.provider.entity_id = 'https://sp.com/'
add_attributes.nid_format = 'transient'
attribute_names = [
# django_user source
'django_user_id',
'django_user_password',
'django_user_last_login',
'django_user_is_superuser',
'django_user_uuid',
'django_user_username',
'django_user_first_name',
'django_user_last_name',
'django_user_email',
'django_user_email_verified',
'django_user_is_staff',
'django_user_is_active',
'django_user_ou',
'django_user_date_joined',
'django_user_modified',
'django_user_last_account_deletion_alert',
'django_user_deleted',
'django_user_ou_uuid',
'django_user_ou_slug',
'django_user_ou_name',
'django_user_birthdate',
'django_user_groups',
'django_user_group_names',
'django_user_domain',
'django_user_identifier',
'django_user_full_name',
'a2_role_slugs',
'a2_role_names',
'a2_role_uuids',
'a2_service_ou_role_slugs',
'a2_service_ou_role_names',
'a2_service_ou_role_uuids',
]
add_attributes.get_definitions.return_value = list(
SAMLAttribute(name_format='basic', name=name, attribute_name=name) for name in attribute_names
)
def func(user):
add_attributes.request.user = user
return add_attributes()
for key in dir(add_attributes):
if not key.startswith(('func_', '__')):
setattr(func, key, getattr(add_attributes, key))
return func
def test_add_attributes_user_ou1_role_ou2(add_attributes_all, user_ou1, role_ou2, ou1):
Attribute.objects.create(kind='birthdate', name='birthdate', label='birthdate', required=False)
user_ou1.roles.add(role_ou2)
user_ou1.attributes.birthdate = datetime.date(1970, 1, 1)
add_attributes_all.provider.slug = 'provider'
add_attributes_all.provider.name = 'Provider'
add_attributes_all.provider.ou = ou1
add_attributes_all.provider.save()
service_role = Role.objects.create(
name='Role of service', slug='role-of-service', ou=ou1, service=add_attributes_all.provider
)
service_role.attributes.create(name='is_admin', kind='string', value='true')
user_ou1.roles.add(service_role)
add_attributes_all.get_definitions.return_value.append(
SAMLAttribute(name_format='basic', name='is_admin', attribute_name='is_admin'),
)
attributes = add_attributes_all(user_ou1)
assert attributes == {
'a2_role_names': set(['Role of service', 'role_ou2']),
'a2_role_slugs': set(['role-of-service', 'role_ou2']),
'a2_role_uuids': set([service_role.uuid, role_ou2.uuid]),
'a2_service_ou_role_names': set(['Role of service']),
'a2_service_ou_role_slugs': set(['role-of-service']),
'a2_service_ou_role_uuids': set([service_role.uuid]),
'django_user_birthdate': set(['1970-01-01']),
'django_user_date_joined': set([str(user_ou1.date_joined)]),
'django_user_deleted': set([]),
'django_user_domain': set(['']),
'django_user_email': set(['john.doe@example.net']),
'django_user_email_verified': set(['false']),
'django_user_first_name': set(['J\xf4hn']),
'django_user_full_name': set(['J\xf4hn D\xf4e']),
'django_user_group_names': set([]),
'django_user_groups': set([]),
'django_user_id': set([str(user_ou1.id)]),
'django_user_identifier': set(['john.doe']),
'django_user_is_active': set(['true']),
'django_user_is_staff': set(['false']),
'django_user_is_superuser': set(['false']),
'django_user_last_account_deletion_alert': set([]),
'django_user_last_login': set([]),
'django_user_last_name': set(['D\xf4e']),
'django_user_modified': set([str(user_ou1.modified)]),
'django_user_ou': set([]),
'django_user_ou_name': set(['OU1']),
'django_user_ou_slug': set(['ou1']),
'django_user_ou_uuid': set([ou1.uuid]),
'django_user_password': set(['abba0b6ff456806bab66baed93e6d9c4']),
'django_user_username': set(['john.doe']),
'django_user_uuid': set([user_ou1.uuid]),
'is_admin': set(['true']),
}
def test_metadata_with_openssl_public_key(app, idp, settings):
settings.A2_IDP_SAML2_SIGNATURE_PUBLIC_KEY = '''-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvxFkfPdndlGgQPDZgFGX
brNAc/79PULZBuNdWFHDD9P5hNhZn9Kqm4Cp06Pe/A6u+g5wLnYvbZQcFCgfQAEz
ziJtb3J55OOlB7iMEI/T2AX2WzrUH8QT8NGhABONKU2Gg4XiyeXNhH5R7zdHlUwc
Wq3ZwNbtbY0TVc+n665EbrfV/59xihSqsoFrkmBLH0CoepUXtAzA7WDYn8AzusIu
Mx3n8844pJwgxhTB7Gjuboptlz9Hri8JRdXiVT9OS9Wt69ubcNoM6zuKASmtm48U
uGnhj8v6XwvbjKZrL9kA+xf8ziazZfvvw/VGTm+IVFYB7d1x457jY5zjjXJvNyso
owIDAQAB
-----END PUBLIC KEY-----'''
response = app.get('/idp/saml2/metadata')
def test_null_character_nonce(app, db):
response = app.get('/idp/saml2/continue/', params={'nonce': '\0'}, status=400)
assert response.text == 'null character in query string'