# django-mellon - SAML2 authentication for Django # Copyright (C) 2014-2019 Entr'ouvert # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import datetime import re import time from multiprocessing.pool import ThreadPool from unittest import mock import lasso import pytest import responses from django.contrib import auth from django.db import connection from mellon import models from mellon.adapters import DefaultAdapter from mellon.backends import SAMLBackend pytestmark = pytest.mark.django_db User = auth.get_user_model() @pytest.fixture def idp(): return { 'METADATA': open('tests/metadata.xml').read(), } @pytest.fixture def saml_attributes(): return { 'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT, 'name_id_content': 'x' * 32, 'issuer': 'https://idp5/metadata', 'username': ['foobar'], 'email': ['test@example.net'], 'first_name': ['Foo'], 'last_name': ['Bar'], 'is_superuser': ['true'], 'group': ['GroupA', 'GroupB', 'GroupC'], } @pytest.fixture def john(db): return User.objects.create(username='john.doe', email='john.doe@example.com') @pytest.fixture def jane(db): return User.objects.create(username='jane.doe', email='john.doe@example.com') def test_format_username(settings, idp, saml_attributes): adapter = DefaultAdapter() assert adapter.format_username(idp, {}) is None assert adapter.format_username(idp, saml_attributes) == ('x' * 32 + '@saml') settings.MELLON_USERNAME_TEMPLATE = '{attributes[name_id_content]}' assert adapter.format_username(idp, saml_attributes) == ('x' * 32) settings.MELLON_USERNAME_TEMPLATE = '{attributes[username][0]}' assert adapter.format_username(idp, saml_attributes) == 'foobar' def test_lookup_user(settings, idp, saml_attributes): adapter = DefaultAdapter() user = adapter.lookup_user(idp, saml_attributes) assert user is not None user2 = adapter.lookup_user(idp, saml_attributes) assert user.id == user2.id User.objects.all().delete() assert User.objects.count() == 0 settings.MELLON_PROVISION = False user = adapter.lookup_user(idp, saml_attributes) assert user is None assert User.objects.count() == 0 def test_lookup_user_transaction(transactional_db, concurrency, idp, saml_attributes, settings): adapter = DefaultAdapter() p = ThreadPool(concurrency) settings.MELLON_IDENTITY_PROVIDERS = [idp] if connection.vendor == 'postgresql': with connection.cursor() as c: c.execute('SHOW max_connections') max_connections = c.fetchone()[0] if int(max_connections) <= concurrency: pytest.skip('Number of concurrent connections above postgresql maximum limit') def f(i): # sqlite has a default lock timeout of 5s seconds between different access to the same in # memory DB if connection.vendor == 'sqlite': connection.cursor().execute('PRAGMA busy_timeout = 400000') try: return adapter.lookup_user(idp, saml_attributes) finally: connection.close() users = p.map(f, range(concurrency)) assert len(users) == concurrency assert len({user.pk for user in users}) == 1 def test_provision_user_attributes(settings, django_user_model, idp, saml_attributes, caplog): settings.MELLON_IDENTITY_PROVIDERS = [idp] settings.MELLON_ATTRIBUTE_MAPPING = { 'email': '{attributes[email][0]}', 'first_name': '{attributes[first_name][0]}', 'last_name': '{attributes[last_name][0]}', } user = SAMLBackend().authenticate(saml_attributes=saml_attributes) assert user.username == 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx@saml' assert user.first_name == 'Foo' assert user.last_name == 'Bar' assert user.email == 'test@example.net' assert user.is_superuser is False assert user.is_staff is False assert len(caplog.records) == 4 assert 'created new user' in caplog.text assert 'set field first_name' in caplog.text assert 'set field last_name' in caplog.text assert 'set field email' in caplog.text def test_provision_user_groups(settings, django_user_model, idp, saml_attributes, caplog): settings.MELLON_IDENTITY_PROVIDERS = [idp] settings.MELLON_GROUP_ATTRIBUTE = 'group' user = SAMLBackend().authenticate(saml_attributes=saml_attributes) assert user.groups.count() == 3 assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes['group']) assert len(caplog.records) == 4 assert 'created new user' in caplog.text assert 'adding group GroupA' in caplog.text assert 'adding group GroupB' in caplog.text assert 'adding group GroupC' in caplog.text saml_attributes2 = saml_attributes.copy() saml_attributes2['group'] = ['GroupB', 'GroupC'] user = SAMLBackend().authenticate(saml_attributes=saml_attributes2) assert user.groups.count() == 2 assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes2['group']) assert len(caplog.records) == 6 assert 'removing group GroupA' in caplog.records[-1].message def test_provision_is_superuser(settings, django_user_model, idp, saml_attributes, caplog): settings.MELLON_IDENTITY_PROVIDERS = [idp] settings.MELLON_SUPERUSER_MAPPING = { 'is_superuser': 'true', } user = SAMLBackend().authenticate(saml_attributes=saml_attributes) assert user.is_superuser is True assert user.is_staff is True assert 'flag is_staff and is_superuser added' in caplog.text user = SAMLBackend().authenticate(saml_attributes=saml_attributes) assert user.is_superuser is True assert user.is_staff is True assert 'flag is_staff and is_superuser removed' not in caplog.text def test_provision_absent_attribute(settings, django_user_model, idp, saml_attributes, caplog): settings.MELLON_IDENTITY_PROVIDERS = [idp] settings.MELLON_ATTRIBUTE_MAPPING = { 'email': '{attributes[email][0]}', 'first_name': '{attributes[first_name][0]}', 'last_name': '{attributes[last_name][0]}', } local_saml_attributes = saml_attributes.copy() del local_saml_attributes['email'] user = SAMLBackend().authenticate(saml_attributes=local_saml_attributes) assert not user.email assert len(caplog.records) == 4 assert 'created new user' in caplog.text assert re.search(r'invalid reference.*email', caplog.text) assert 'set field first_name' in caplog.text assert 'set field last_name' in caplog.text def test_provision_long_attribute(settings, django_user_model, idp, saml_attributes, caplog): from django import VERSION settings.MELLON_IDENTITY_PROVIDERS = [idp] settings.MELLON_ATTRIBUTE_MAPPING = { 'email': '{attributes[email][0]}', 'first_name': '{attributes[first_name][0]}', 'last_name': '{attributes[last_name][0]}', } local_saml_attributes = saml_attributes.copy() local_saml_attributes['first_name'] = [('y' * 32)] user = SAMLBackend().authenticate(saml_attributes=local_saml_attributes) assert len(caplog.records) == 4 assert 'created new user' in caplog.text assert 'set field first_name' in caplog.text if VERSION[0] <= 2: assert user.first_name == 'y' * 30 assert 'to value %r ' % ('y' * 30) in caplog.text else: # django users' first name attribute longer from django3 onwards assert user.first_name == 'y' * 32 assert 'to value %r ' % ('y' * 32) in caplog.text assert 'set field last_name' in caplog.text assert 'set field email' in caplog.text def test_lookup_user_transient_with_email(rf, private_settings, idp, saml_attributes): request = rf.get('/') request._messages = mock.Mock() adapter = DefaultAdapter(request=request) saml_attributes['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT assert User.objects.count() == 0 user = adapter.lookup_user(idp, saml_attributes) assert User.objects.count() == 0 request._messages.add.assert_called_once_with( 30, 'A transient NameID was received but TRANSIENT_FEDERATION_ATTRIBUTE is not set.', '' ) private_settings.MELLON_TRANSIENT_FEDERATION_ATTRIBUTE = 'email' user = adapter.lookup_user(idp, saml_attributes) assert user is not None assert user.saml_identifiers.count() == 1 assert user.saml_identifiers.first().name_id == saml_attributes['email'][0] user2 = adapter.lookup_user(idp, saml_attributes) assert user.id == user2.id User.objects.all().delete() assert User.objects.count() == 0 private_settings.MELLON_PROVISION = False user = adapter.lookup_user(idp, saml_attributes) assert user is None assert User.objects.count() == 0 def test_lookup_user_by_attributes_bad_setting1(settings, idp, saml_attributes, caplog): settings.MELLON_PROVISION = False adapter = DefaultAdapter() settings.MELLON_LOOKUP_BY_ATTRIBUTES = 'coin' assert adapter.lookup_user(idp, saml_attributes) is None assert 'it must be a list' in caplog.text def test_lookup_user_by_attributes_bad_setting2(settings, idp, saml_attributes, caplog): settings.MELLON_PROVISION = False adapter = DefaultAdapter() settings.MELLON_LOOKUP_BY_ATTRIBUTES = ['coin'] assert adapter.lookup_user(idp, saml_attributes) is None assert 'it must be a list of dicts' in caplog.text def test_lookup_user_by_attributes_bad_setting3(settings, idp, saml_attributes, caplog): settings.MELLON_PROVISION = False adapter = DefaultAdapter() settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{}] assert adapter.lookup_user(idp, saml_attributes) is None assert 'user_field is missing' in caplog.text def test_lookup_user_by_attributes_bad_setting4(settings, idp, saml_attributes, caplog): settings.MELLON_PROVISION = False adapter = DefaultAdapter() settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username'}] assert adapter.lookup_user(idp, saml_attributes) is None assert 'saml_attribute is missing' in caplog.text def test_lookup_user_by_attributes_not_found(settings, idp, saml_attributes, caplog): settings.MELLON_PROVISION = False adapter = DefaultAdapter() caplog.set_level('DEBUG') settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username', 'saml_attribute': 'saml_at1'}] saml_attributes['saml_at1'] = ['john.doe'] assert adapter.lookup_user(idp, saml_attributes) is None assert ': not found' in caplog.text def test_lookup_user_by_attributes_too_many1(settings, idp, saml_attributes, john, jane, caplog): settings.MELLON_PROVISION = False adapter = DefaultAdapter() settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'email', 'saml_attribute': 'saml_at1'}] saml_attributes['saml_at1'] = ['john.doe@example.com'] assert adapter.lookup_user(idp, saml_attributes) is None assert 'too many users found(2)' in caplog.text def test_lookup_user_by_attributes_too_manyi2(settings, idp, saml_attributes, john, jane, caplog): settings.MELLON_PROVISION = False adapter = DefaultAdapter() saml_attributes['saml_at1'] = ['john.doe'] saml_attributes['saml_at2'] = ['jane.doe'] settings.MELLON_LOOKUP_BY_ATTRIBUTES = [ {'user_field': 'username', 'saml_attribute': 'saml_at1'}, {'user_field': 'username', 'saml_attribute': 'saml_at2'}, ] assert adapter.lookup_user(idp, saml_attributes) is None assert 'too many users found(2)' in caplog.text def test_lookup_user_by_attributes_found(settings, idp, saml_attributes, john, jane, caplog): settings.MELLON_PROVISION = False adapter = DefaultAdapter() saml_attributes['saml_at1'] = ['john.doe'] settings.MELLON_LOOKUP_BY_ATTRIBUTES = [ {'user_field': 'username', 'saml_attribute': 'saml_at1'}, ] assert adapter.lookup_user(idp, saml_attributes) == john def test_lookup_user_by_attributes_ignore_case(settings, idp, saml_attributes, john, jane, caplog): settings.MELLON_PROVISION = False adapter = DefaultAdapter() saml_attributes['saml_at1'] = ['Jane.Doe'] settings.MELLON_LOOKUP_BY_ATTRIBUTES = [ {'user_field': 'username', 'saml_attribute': 'saml_at1'}, ] assert adapter.lookup_user(idp, saml_attributes) is None settings.MELLON_LOOKUP_BY_ATTRIBUTES = [ {'user_field': 'username', 'saml_attribute': 'saml_at1', 'ignore-case': True}, ] assert adapter.lookup_user(idp, saml_attributes) == jane def test_lookup_user_by_attributes_two_idps(settings, idp, saml_attributes, john, jane, caplog): '''Check that with lookup by attributes we can link an user with an existing link with another IdP, use case: migrating from a test IdP to a production IdP without having to unlink all already loaded users.''' settings.MELLON_PROVISION = False adapter = DefaultAdapter() saml_attributes['saml_at1'] = ['john.doe'] settings.MELLON_LOOKUP_BY_ATTRIBUTES = [ {'user_field': 'username', 'saml_attribute': 'saml_at1'}, ] assert models.UserSAMLIdentifier.objects.count() == 0 assert adapter.lookup_user(idp, saml_attributes) == john assert models.UserSAMLIdentifier.objects.count() == 1 saml_attributes['issuer'] = 'https://idp6/metadata' assert adapter.lookup_user(idp, saml_attributes) == john assert models.UserSAMLIdentifier.objects.count() == 2 @pytest.fixture def adapter(): return DefaultAdapter() def test_load_metadata_simple(adapter, metadata): idp = {'METADATA': metadata} assert adapter.load_metadata(idp, 0) == metadata def test_load_metadata_legacy(adapter, metadata_path, metadata, freezer): now = time.time() idp = {'METADATA': metadata_path} assert adapter.load_metadata(idp, 0) == metadata assert idp['METADATA'] == metadata assert idp['METADATA_PATH'] == metadata_path assert idp['METADATA_LAST_UPDATE'] == now def test_load_metadata_path(adapter, metadata_path, metadata, freezer): now = time.time() idp = {'METADATA_PATH': str(metadata_path)} assert adapter.load_metadata(idp, 0) == metadata assert idp['METADATA'] == metadata assert idp['METADATA_LAST_UPDATE'] == now METADATA_URL = 'https://example.com/metadata' @responses.activate def test_load_metadata_url(settings, adapter, metadata, freezer, caplog): idp = {'METADATA_URL': METADATA_URL} def wait_for_update_thread(): # wait for update thread to finish try: idp['METADATA_URL_UPDATE_THREAD'].join() except KeyError: pass # test normal loading responses.get(METADATA_URL, body=metadata, headers={'Content-Type': 'application/xml'}) assert adapter.load_metadata(idp, 0) == metadata assert 'METADATA_URL_UPDATE_THREAD' not in idp assert idp['METADATA'] == metadata assert idp['METADATA_LAST_UPDATE'] == time.time() # test ENTITY_ID change caplog.clear() freezer.move_to(datetime.timedelta(seconds=3601)) responses.replace( responses.GET, METADATA_URL, body=metadata.replace('idp5', 'idp6'), headers={'Content-Type': 'application/xml'}, ) assert adapter.load_metadata(idp, 0) == metadata wait_for_update_thread() assert 'WARNING' not in caplog.text assert any( 'entityID changed' in record.message and record.levelname == 'ERROR' for record in caplog.records ) # test load from file cache freezer.move_to(datetime.timedelta(seconds=3601)) caplog.clear() del idp['METADATA'] request = responses.replace( responses.GET, METADATA_URL, body='', headers={'Content-Type': 'application/xml'} ) assert adapter.load_metadata(idp, 0) == metadata wait_for_update_thread() assert len(caplog.records) == 1 assert caplog.records[0].levelname == 'WARNING' assert 'invalid metadata' in caplog.records[0].message assert request.call_count == 1 # test http get is not redone before 5 minutes freezer.move_to(datetime.timedelta(seconds=4 * 60)) assert adapter.load_metadata(idp, 0) == metadata wait_for_update_thread() assert request.call_count == 1 freezer.move_to(datetime.timedelta(seconds=2 * 60)) assert adapter.load_metadata(idp, 0) == metadata wait_for_update_thread() assert request.call_count == 2 # test error after 24hours caplog.clear() freezer.move_to(datetime.timedelta(seconds=3600 * 23)) assert adapter.load_metadata(idp, 0) == metadata assert 'ERROR' in caplog.text assert 'not updated since 25' in caplog.text