django-mellon/tests/test_default_adapter.py

199 lines
7.3 KiB
Python

import pytest
import re
import lasso
from multiprocessing.pool import ThreadPool
from django.contrib import auth
from django.db import connection
from mellon.adapters import DefaultAdapter
from mellon.backends import SAMLBackend
pytestmark = pytest.mark.django_db
idp = {
'METADATA': open('tests/metadata.xml').read(),
}
saml_attributes = {
'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
'name_id_content': 'x' * 32,
'issuer': 'http://idp5/metadata',
'username': ['foobar'],
'email': ['test@example.net'],
'first_name': ['Foo'],
'last_name': ['Bar'],
'is_superuser': ['true'],
'group': ['GroupA', 'GroupB', 'GroupC'],
}
def test_format_username(settings):
adapter = DefaultAdapter()
assert adapter.format_username(idp, {}) is None
assert adapter.format_username(idp, saml_attributes) == ('x' * 32 + '@saml')[:30]
settings.MELLON_USERNAME_TEMPLATE = '{attributes[name_id_content]}'
assert adapter.format_username(idp, saml_attributes) == ('x' * 32)[:30]
settings.MELLON_USERNAME_TEMPLATE = '{attributes[username][0]}'
assert adapter.format_username(idp, saml_attributes) == 'foobar'
def test_lookup_user(settings):
User = auth.get_user_model()
adapter = DefaultAdapter()
user = adapter.lookup_user(idp, saml_attributes)
assert user is not None
user2 = adapter.lookup_user(idp, saml_attributes)
assert user.id == user2.id
User.objects.all().delete()
assert User.objects.count() == 0
settings.MELLON_PROVISION = False
user = adapter.lookup_user(idp, saml_attributes)
assert user is None
assert User.objects.count() == 0
def test_lookup_user_transaction(transactional_db, concurrency):
adapter = DefaultAdapter()
p = ThreadPool(concurrency)
if connection.vendor == 'postgresql':
with connection.cursor() as c:
c.execute('SHOW max_connections')
max_connections = c.fetchone()[0]
if int(max_connections) <= concurrency:
pytest.skip('Number of concurrent connections above postgresql maximum limit')
def f(i):
# sqlite has a default lock timeout of 5s seconds between different access to the same in
# memory DB
if connection.vendor == 'sqlite':
connection.cursor().execute('PRAGMA busy_timeout = 400000')
try:
return adapter.lookup_user(idp, saml_attributes)
finally:
connection.close()
users = p.map(f, range(concurrency))
assert len(users) == concurrency
assert len(set(user.pk for user in users)) == 1
def test_provision_user_attributes(settings, django_user_model, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_ATTRIBUTE_MAPPING = {
'email': u'{attributes[email][0]}',
'first_name': u'{attributes[first_name][0]}',
'last_name': u'{attributes[last_name][0]}',
}
user = SAMLBackend().authenticate(saml_attributes=saml_attributes)
assert user.username == 'x' * 30
assert user.first_name == 'Foo'
assert user.last_name == 'Bar'
assert user.email == 'test@example.net'
assert user.is_superuser is False
assert user.is_staff is False
assert len(caplog.records) == 4
assert 'created new user' in caplog.text
assert 'set field first_name' in caplog.text
assert 'set field last_name' in caplog.text
assert 'set field email' in caplog.text
def test_provision_user_groups(settings, django_user_model, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_GROUP_ATTRIBUTE = 'group'
user = SAMLBackend().authenticate(saml_attributes=saml_attributes)
assert user.groups.count() == 3
assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes['group'])
assert len(caplog.records) == 4
assert 'created new user' in caplog.text
assert 'adding group GroupA' in caplog.text
assert 'adding group GroupB' in caplog.text
assert 'adding group GroupC' in caplog.text
saml_attributes2 = saml_attributes.copy()
saml_attributes2['group'] = ['GroupB', 'GroupC']
user = SAMLBackend().authenticate(saml_attributes=saml_attributes2)
assert user.groups.count() == 2
assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes2['group'])
assert len(caplog.records) == 5
assert 'removing group GroupA' in caplog.records[-1].message
def test_provision_is_superuser(settings, django_user_model, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_SUPERUSER_MAPPING = {
'is_superuser': 'true',
}
user = SAMLBackend().authenticate(saml_attributes=saml_attributes)
assert user.is_superuser is True
assert user.is_staff is True
assert 'flag is_staff and is_superuser added' in caplog.text
user = SAMLBackend().authenticate(saml_attributes=saml_attributes)
assert user.is_superuser is True
assert user.is_staff is True
assert not 'flag is_staff and is_superuser removed' in caplog.text
def test_provision_absent_attribute(settings, django_user_model, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_ATTRIBUTE_MAPPING = {
'email': '{attributes[email][0]}',
'first_name': '{attributes[first_name][0]}',
'last_name': '{attributes[last_name][0]}',
}
local_saml_attributes = saml_attributes.copy()
del local_saml_attributes['email']
user = SAMLBackend().authenticate(saml_attributes=local_saml_attributes)
assert not user.email
assert len(caplog.records) == 4
assert 'created new user' in caplog.text
assert re.search(r'invalid reference.*email', caplog.text)
assert 'set field first_name' in caplog.text
assert 'set field last_name' in caplog.text
def test_provision_long_attribute(settings, django_user_model, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_ATTRIBUTE_MAPPING = {
'email': '{attributes[email][0]}',
'first_name': '{attributes[first_name][0]}',
'last_name': '{attributes[last_name][0]}',
}
local_saml_attributes = saml_attributes.copy()
local_saml_attributes['first_name'] = [('y' * 32)]
user = SAMLBackend().authenticate(saml_attributes=local_saml_attributes)
assert user.first_name == 'y' * 30
assert len(caplog.records) == 4
assert 'created new user' in caplog.text
assert 'set field first_name' in caplog.text
assert 'to value %r ' % (u'y' * 30) in caplog.text
assert 'set field last_name' in caplog.text
assert 'set field email' in caplog.text
def test_lookup_user_transient_with_email(private_settings):
private_settings.MELLON_TRANSIENT_FEDERATION_ATTRIBUTE = 'email'
User = auth.get_user_model()
adapter = DefaultAdapter()
saml_attributes2 = saml_attributes.copy()
saml_attributes2['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
assert User.objects.count() == 0
user = adapter.lookup_user(idp, saml_attributes2)
assert user is not None
assert user.saml_identifiers.count() == 1
assert user.saml_identifiers.first().name_id == saml_attributes2['email'][0]
user2 = adapter.lookup_user(idp, saml_attributes2)
assert user.id == user2.id
User.objects.all().delete()
assert User.objects.count() == 0
private_settings.MELLON_PROVISION = False
user = adapter.lookup_user(idp, saml_attributes)
assert user is None
assert User.objects.count() == 0