add user lookup by attributes (#33739)

This commit is contained in:
Benjamin Dauvergne 2019-06-06 13:52:21 +02:00 committed by Thomas NOEL
parent f2908b2ef3
commit 31015e6580
4 changed files with 261 additions and 45 deletions

26
README
View File

@ -261,6 +261,32 @@ MELLON_DEFAULT_ASSERTION_CONSUMER_BINDING
Should be post or artifact. Default is post. You can refer to the SAML 2.0
specification to learn the difference.
MELLON_LOOKUP_BY_ATTRIBUTES
---------------------------
Allow looking for user with some SAML attributes if the received NameID is
still unknown. It must be a list of dictionnaries with two mandatory keys
`user_field` and `saml_attribute`. The optionnal key `ignore-case` should be a
boolean indicating if the match is case-insensitive (default is to respect the
case).
Each dictionnary is a rule for linking, applying all the rules should only
return one user, the boolean operator OR is applied between the rules.
So for example if you received a SAML attribute named `email` and you want to
link user with the same email you would configured it like that:
MELLON_LOOKUP_BY_ATTRIBUTES = [
{
'saml_attribute': 'email',
'user_field': 'email',
}
]
The targeted user(s) field(s) should be as much as possible unique
individually, if not django-mellon will refuse to link multiple users matching
the rules.
Tests
=====

View File

@ -6,7 +6,7 @@ import lasso
import requests
import requests.exceptions
from django.core.exceptions import PermissionDenied
from django.core.exceptions import PermissionDenied, FieldDoesNotExist
from django.contrib import auth
from django.contrib.auth.models import Group
from django.utils import six
@ -21,6 +21,15 @@ class UserCreationError(Exception):
pass
def display_truncated_list(l, max_length=10):
s = '[' + ', '.join(map(six.text_type, l))
if len(l) > max_length:
s += '..truncated more than %d items (%d)]' % (max_length, len(l))
else:
s += ']'
return s
class DefaultAdapter(object):
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
@ -128,32 +137,97 @@ class DefaultAdapter(object):
name_id = saml_attributes['name_id_content']
issuer = saml_attributes['issuer']
try:
return User.objects.get(saml_identifiers__name_id=name_id,
user = User.objects.get(saml_identifiers__name_id=name_id,
saml_identifiers__issuer=issuer)
self.logger.info('looked up user %s with name_id %s from issuer %s',
user, name_id, issuer)
return user
except User.DoesNotExist:
pass
if not utils.get_setting(idp, 'PROVISION'):
self.logger.warning('provisionning disabled, login refused')
return None
user = None
lookup_by_attributes = utils.get_setting(idp, 'LOOKUP_BY_ATTRIBUTES')
if lookup_by_attributes:
user = self._lookup_by_attributes(idp, saml_attributes, lookup_by_attributes)
created = False
if not user:
if not utils.get_setting(idp, 'PROVISION'):
self.logger.debug('provisionning disabled, login refused')
return None
created = True
user = self.create_user(User)
user = self.create_user(User)
nameid_user = self._link_user(idp, saml_attributes, issuer, name_id, user)
if user != nameid_user:
self.logger.info('looked up user %s with name_id %s from issuer %s',
nameid_user, name_id, issuer)
user.delete()
if created:
user.delete()
return nameid_user
try:
self.finish_create_user(idp, saml_attributes, nameid_user)
except UserCreationError:
nameid_user.delete()
return None
self.logger.info('created new user %s with name_id %s from issuer %s',
nameid_user, name_id, issuer)
if created:
try:
self.finish_create_user(idp, saml_attributes, nameid_user)
except UserCreationError:
user.delete()
return None
self.logger.info('created new user %s with name_id %s from issuer %s',
nameid_user, name_id, issuer)
return nameid_user
def _lookup_by_attributes(self, idp, saml_attributes, lookup_by_attributes):
if not isinstance(lookup_by_attributes, list):
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list', lookup_by_attributes)
return None
users = set()
for line in lookup_by_attributes:
if not isinstance(line, dict):
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list of dicts', line)
continue
user_field = line.get('user_field')
if not hasattr(user_field, 'isalpha'):
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: user_field is missing', line)
continue
try:
User._meta.get_field(user_field)
except FieldDoesNotExist:
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r, user field %s does not exist',
line, user_field)
continue
saml_attribute = line.get('saml_attribute')
if not hasattr(saml_attribute, 'isalpha'):
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: saml_attribute is missing', line)
continue
values = saml_attributes.get(saml_attribute)
if not values:
self.logger.error('looking for user by saml attribute %r and user field %r, skipping because empty',
saml_attribute, user_field)
continue
ignore_case = line.get('ignore-case', False)
for value in values:
key = user_field
if ignore_case:
key += '__iexact'
users_found = User.objects.filter(saml_identifiers__isnull=True, **{key: value})
if not users_found:
self.logger.debug('looking for users by attribute %r and user field %r with value %r: not found',
saml_attribute, user_field, value)
continue
self.logger.info(u'looking for user by attribute %r and user field %r with value %r: found %s',
saml_attribute, user_field, value, display_truncated_list(users_found))
users.update(users_found)
if len(users) == 1:
user = list(users)[0]
self.logger.info(u'looking for user by attributes %r: found user %s',
lookup_by_attributes, user)
return user
elif len(users) > 1:
self.logger.warning(u'looking for user by attributes %r: too many users found(%d), failing',
lookup_by_attributes, len(users))
return None
def _link_user(self, idp, saml_attributes, issuer, name_id, user):
saml_id, created = models.UserSAMLIdentifier.objects.get_or_create(
name_id=name_id, issuer=issuer, defaults={'user': user})

View File

@ -40,6 +40,7 @@ class AppSettings(object):
'ARTIFACT_RESOLVE_TIMEOUT': 10.0,
'LOGIN_HINTS': [],
'SIGNATURE_METHOD': 'RSA-SHA256',
'LOOKUP_BY_ATTRIBUTES': [],
}
@property

View File

@ -11,23 +11,42 @@ from mellon.backends import SAMLBackend
pytestmark = pytest.mark.django_db
idp = {
'METADATA': open('tests/metadata.xml').read(),
}
saml_attributes = {
'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
'name_id_content': 'x' * 32,
'issuer': 'http://idp5/metadata',
'username': ['foobar'],
'email': ['test@example.net'],
'first_name': ['Foo'],
'last_name': ['Bar'],
'is_superuser': ['true'],
'group': ['GroupA', 'GroupB', 'GroupC'],
}
User = auth.get_user_model()
def test_format_username(settings):
@pytest.fixture
def idp():
return {
'METADATA': open('tests/metadata.xml').read(),
}
@pytest.fixture
def saml_attributes():
return {
'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
'name_id_content': 'x' * 32,
'issuer': 'http://idp5/metadata',
'username': ['foobar'],
'email': ['test@example.net'],
'first_name': ['Foo'],
'last_name': ['Bar'],
'is_superuser': ['true'],
'group': ['GroupA', 'GroupB', 'GroupC'],
}
@pytest.fixture
def john(db):
return User.objects.create(username='john.doe', email='john.doe@example.com')
@pytest.fixture
def jane(db):
return User.objects.create(username='jane.doe', email='john.doe@example.com')
def test_format_username(settings, idp, saml_attributes):
adapter = DefaultAdapter()
assert adapter.format_username(idp, {}) is None
assert adapter.format_username(idp, saml_attributes) == ('x' * 32 + '@saml')[:30]
@ -37,8 +56,7 @@ def test_format_username(settings):
assert adapter.format_username(idp, saml_attributes) == 'foobar'
def test_lookup_user(settings):
User = auth.get_user_model()
def test_lookup_user(settings, idp, saml_attributes):
adapter = DefaultAdapter()
user = adapter.lookup_user(idp, saml_attributes)
assert user is not None
@ -55,7 +73,7 @@ def test_lookup_user(settings):
assert User.objects.count() == 0
def test_lookup_user_transaction(transactional_db, concurrency):
def test_lookup_user_transaction(transactional_db, concurrency, idp, saml_attributes):
adapter = DefaultAdapter()
p = ThreadPool(concurrency)
@ -81,7 +99,7 @@ def test_lookup_user_transaction(transactional_db, concurrency):
assert len(set(user.pk for user in users)) == 1
def test_provision_user_attributes(settings, django_user_model, caplog):
def test_provision_user_attributes(settings, django_user_model, idp, saml_attributes, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_ATTRIBUTE_MAPPING = {
'email': u'{attributes[email][0]}',
@ -102,7 +120,7 @@ def test_provision_user_attributes(settings, django_user_model, caplog):
assert 'set field email' in caplog.text
def test_provision_user_groups(settings, django_user_model, caplog):
def test_provision_user_groups(settings, django_user_model, idp, saml_attributes, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_GROUP_ATTRIBUTE = 'group'
user = SAMLBackend().authenticate(saml_attributes=saml_attributes)
@ -118,11 +136,11 @@ def test_provision_user_groups(settings, django_user_model, caplog):
user = SAMLBackend().authenticate(saml_attributes=saml_attributes2)
assert user.groups.count() == 2
assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes2['group'])
assert len(caplog.records) == 5
assert len(caplog.records) == 6
assert 'removing group GroupA' in caplog.records[-1].message
def test_provision_is_superuser(settings, django_user_model, caplog):
def test_provision_is_superuser(settings, django_user_model, idp, saml_attributes, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_SUPERUSER_MAPPING = {
'is_superuser': 'true',
@ -137,7 +155,7 @@ def test_provision_is_superuser(settings, django_user_model, caplog):
assert not 'flag is_staff and is_superuser removed' in caplog.text
def test_provision_absent_attribute(settings, django_user_model, caplog):
def test_provision_absent_attribute(settings, django_user_model, idp, saml_attributes, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_ATTRIBUTE_MAPPING = {
'email': '{attributes[email][0]}',
@ -155,7 +173,7 @@ def test_provision_absent_attribute(settings, django_user_model, caplog):
assert 'set field last_name' in caplog.text
def test_provision_long_attribute(settings, django_user_model, caplog):
def test_provision_long_attribute(settings, django_user_model, idp, saml_attributes, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_ATTRIBUTE_MAPPING = {
'email': '{attributes[email][0]}',
@ -174,19 +192,17 @@ def test_provision_long_attribute(settings, django_user_model, caplog):
assert 'set field email' in caplog.text
def test_lookup_user_transient_with_email(private_settings):
def test_lookup_user_transient_with_email(private_settings, idp, saml_attributes):
private_settings.MELLON_TRANSIENT_FEDERATION_ATTRIBUTE = 'email'
User = auth.get_user_model()
adapter = DefaultAdapter()
saml_attributes2 = saml_attributes.copy()
saml_attributes2['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
saml_attributes['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
assert User.objects.count() == 0
user = adapter.lookup_user(idp, saml_attributes2)
user = adapter.lookup_user(idp, saml_attributes)
assert user is not None
assert user.saml_identifiers.count() == 1
assert user.saml_identifiers.first().name_id == saml_attributes2['email'][0]
assert user.saml_identifiers.first().name_id == saml_attributes['email'][0]
user2 = adapter.lookup_user(idp, saml_attributes2)
user2 = adapter.lookup_user(idp, saml_attributes)
assert user.id == user2.id
User.objects.all().delete()
@ -196,3 +212,102 @@ def test_lookup_user_transient_with_email(private_settings):
user = adapter.lookup_user(idp, saml_attributes)
assert user is None
assert User.objects.count() == 0
def test_lookup_user_by_attributes_bad_setting1(settings, idp, saml_attributes, caplog):
settings.MELLON_PROVISION = False
adapter = DefaultAdapter()
settings.MELLON_LOOKUP_BY_ATTRIBUTES = 'coin'
assert adapter.lookup_user(idp, saml_attributes) is None
assert caplog.records[-1].message.endswith('it must be a list')
def test_lookup_user_by_attributes_bad_setting2(settings, idp, saml_attributes, caplog):
settings.MELLON_PROVISION = False
adapter = DefaultAdapter()
settings.MELLON_LOOKUP_BY_ATTRIBUTES = ['coin']
assert adapter.lookup_user(idp, saml_attributes) is None
assert caplog.records[-1].message.endswith('it must be a list of dicts')
def test_lookup_user_by_attributes_bad_setting3(settings, idp, saml_attributes, caplog):
settings.MELLON_PROVISION = False
adapter = DefaultAdapter()
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{}]
assert adapter.lookup_user(idp, saml_attributes) is None
assert caplog.records[-1].message.endswith('user_field is missing')
def test_lookup_user_by_attributes_bad_setting4(settings, idp, saml_attributes, caplog):
settings.MELLON_PROVISION = False
adapter = DefaultAdapter()
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username'}]
assert adapter.lookup_user(idp, saml_attributes) is None
assert caplog.records[-1].message.endswith('saml_attribute is missing')
def test_lookup_user_by_attributes_not_found(settings, idp, saml_attributes, caplog):
settings.MELLON_PROVISION = False
adapter = DefaultAdapter()
caplog.set_level('DEBUG')
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username', 'saml_attribute': 'saml_at1'}]
saml_attributes['saml_at1'] = ['john.doe']
assert adapter.lookup_user(idp, saml_attributes) is None
assert caplog.records[-2].message.endswith(': not found')
def test_lookup_user_by_attributes_too_many1(settings, idp, saml_attributes, john, jane, caplog):
settings.MELLON_PROVISION = False
adapter = DefaultAdapter()
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'email', 'saml_attribute': 'saml_at1'}]
saml_attributes['saml_at1'] = ['john.doe@example.com']
assert adapter.lookup_user(idp, saml_attributes) is None
assert 'too many users found(2)' in caplog.records[-1].message
def test_lookup_user_by_attributes_too_manyi2(settings, idp, saml_attributes, john, jane, caplog):
settings.MELLON_PROVISION = False
adapter = DefaultAdapter()
saml_attributes['saml_at1'] = ['john.doe']
saml_attributes['saml_at2'] = ['jane.doe']
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
{'user_field': 'username', 'saml_attribute': 'saml_at1'},
{'user_field': 'username', 'saml_attribute': 'saml_at2'},
]
assert adapter.lookup_user(idp, saml_attributes) is None
assert 'too many users found(2)' in caplog.records[-1].message
def test_lookup_user_by_attributes_found(settings, idp, saml_attributes, john, jane, caplog):
settings.MELLON_PROVISION = False
adapter = DefaultAdapter()
saml_attributes['saml_at1'] = ['john.doe']
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
{'user_field': 'username', 'saml_attribute': 'saml_at1'},
]
assert adapter.lookup_user(idp, saml_attributes) == john
def test_lookup_user_by_attributes_ignore_case(settings, idp, saml_attributes, john, jane, caplog):
settings.MELLON_PROVISION = False
adapter = DefaultAdapter()
saml_attributes['saml_at1'] = ['Jane.Doe']
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
{'user_field': 'username', 'saml_attribute': 'saml_at1'},
]
assert adapter.lookup_user(idp, saml_attributes) is None
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
{'user_field': 'username', 'saml_attribute': 'saml_at1', 'ignore-case': True},
]
assert adapter.lookup_user(idp, saml_attributes) == jane