summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Dauvergne <bdauvergne@entrouvert.com>2019-06-06 11:52:21 (GMT)
committerThomas NOEL <tnoel@entrouvert.com>2019-06-11 14:46:02 (GMT)
commit31015e6580b4f7ca300c66f880ae8b281a18a883 (patch)
tree60920014b5073a4e60fbcd322741cdfac2845301
parentf2908b2ef30d249a496b441235f5e7fad3887cd7 (diff)
downloaddjango-mellon-31015e6580b4f7ca300c66f880ae8b281a18a883.zip
django-mellon-31015e6580b4f7ca300c66f880ae8b281a18a883.tar.gz
django-mellon-31015e6580b4f7ca300c66f880ae8b281a18a883.tar.bz2
add user lookup by attributes (#33739)
-rw-r--r--README26
-rw-r--r--mellon/adapters.py102
-rw-r--r--mellon/app_settings.py1
-rw-r--r--tests/test_default_adapter.py181
4 files changed, 263 insertions, 47 deletions
diff --git a/README b/README
index a06e1e9..53f88d4 100644
--- a/README
+++ b/README
@@ -261,6 +261,32 @@ MELLON_DEFAULT_ASSERTION_CONSUMER_BINDING
Should be post or artifact. Default is post. You can refer to the SAML 2.0
specification to learn the difference.
+MELLON_LOOKUP_BY_ATTRIBUTES
+---------------------------
+
+Allow looking for user with some SAML attributes if the received NameID is
+still unknown. It must be a list of dictionnaries with two mandatory keys
+`user_field` and `saml_attribute`. The optionnal key `ignore-case` should be a
+boolean indicating if the match is case-insensitive (default is to respect the
+case).
+
+Each dictionnary is a rule for linking, applying all the rules should only
+return one user, the boolean operator OR is applied between the rules.
+
+So for example if you received a SAML attribute named `email` and you want to
+link user with the same email you would configured it like that:
+
+ MELLON_LOOKUP_BY_ATTRIBUTES = [
+ {
+ 'saml_attribute': 'email',
+ 'user_field': 'email',
+ }
+ ]
+
+The targeted user(s) field(s) should be as much as possible unique
+individually, if not django-mellon will refuse to link multiple users matching
+the rules.
+
Tests
=====
diff --git a/mellon/adapters.py b/mellon/adapters.py
index a79932b..8f94f15 100644
--- a/mellon/adapters.py
+++ b/mellon/adapters.py
@@ -6,7 +6,7 @@ import lasso
import requests
import requests.exceptions
-from django.core.exceptions import PermissionDenied
+from django.core.exceptions import PermissionDenied, FieldDoesNotExist
from django.contrib import auth
from django.contrib.auth.models import Group
from django.utils import six
@@ -21,6 +21,15 @@ class UserCreationError(Exception):
pass
+def display_truncated_list(l, max_length=10):
+ s = '[' + ', '.join(map(six.text_type, l))
+ if len(l) > max_length:
+ s += '..truncated more than %d items (%d)]' % (max_length, len(l))
+ else:
+ s += ']'
+ return s
+
+
class DefaultAdapter(object):
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
@@ -128,32 +137,97 @@ class DefaultAdapter(object):
name_id = saml_attributes['name_id_content']
issuer = saml_attributes['issuer']
try:
- return User.objects.get(saml_identifiers__name_id=name_id,
+ user = User.objects.get(saml_identifiers__name_id=name_id,
saml_identifiers__issuer=issuer)
+ self.logger.info('looked up user %s with name_id %s from issuer %s',
+ user, name_id, issuer)
+ return user
except User.DoesNotExist:
pass
- if not utils.get_setting(idp, 'PROVISION'):
- self.logger.warning('provisionning disabled, login refused')
- return None
+ user = None
+ lookup_by_attributes = utils.get_setting(idp, 'LOOKUP_BY_ATTRIBUTES')
+ if lookup_by_attributes:
+ user = self._lookup_by_attributes(idp, saml_attributes, lookup_by_attributes)
+
+ created = False
+ if not user:
+ if not utils.get_setting(idp, 'PROVISION'):
+ self.logger.debug('provisionning disabled, login refused')
+ return None
+ created = True
+ user = self.create_user(User)
- user = self.create_user(User)
nameid_user = self._link_user(idp, saml_attributes, issuer, name_id, user)
if user != nameid_user:
self.logger.info('looked up user %s with name_id %s from issuer %s',
nameid_user, name_id, issuer)
- user.delete()
+ if created:
+ user.delete()
return nameid_user
- try:
- self.finish_create_user(idp, saml_attributes, nameid_user)
- except UserCreationError:
- nameid_user.delete()
- return None
- self.logger.info('created new user %s with name_id %s from issuer %s',
- nameid_user, name_id, issuer)
+ if created:
+ try:
+ self.finish_create_user(idp, saml_attributes, nameid_user)
+ except UserCreationError:
+ user.delete()
+ return None
+ self.logger.info('created new user %s with name_id %s from issuer %s',
+ nameid_user, name_id, issuer)
return nameid_user
+ def _lookup_by_attributes(self, idp, saml_attributes, lookup_by_attributes):
+ if not isinstance(lookup_by_attributes, list):
+ self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list', lookup_by_attributes)
+ return None
+
+ users = set()
+ for line in lookup_by_attributes:
+ if not isinstance(line, dict):
+ self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list of dicts', line)
+ continue
+ user_field = line.get('user_field')
+ if not hasattr(user_field, 'isalpha'):
+ self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: user_field is missing', line)
+ continue
+ try:
+ User._meta.get_field(user_field)
+ except FieldDoesNotExist:
+ self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r, user field %s does not exist',
+ line, user_field)
+ continue
+ saml_attribute = line.get('saml_attribute')
+ if not hasattr(saml_attribute, 'isalpha'):
+ self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: saml_attribute is missing', line)
+ continue
+ values = saml_attributes.get(saml_attribute)
+ if not values:
+ self.logger.error('looking for user by saml attribute %r and user field %r, skipping because empty',
+ saml_attribute, user_field)
+ continue
+ ignore_case = line.get('ignore-case', False)
+ for value in values:
+ key = user_field
+ if ignore_case:
+ key += '__iexact'
+ users_found = User.objects.filter(saml_identifiers__isnull=True, **{key: value})
+ if not users_found:
+ self.logger.debug('looking for users by attribute %r and user field %r with value %r: not found',
+ saml_attribute, user_field, value)
+ continue
+ self.logger.info(u'looking for user by attribute %r and user field %r with value %r: found %s',
+ saml_attribute, user_field, value, display_truncated_list(users_found))
+ users.update(users_found)
+ if len(users) == 1:
+ user = list(users)[0]
+ self.logger.info(u'looking for user by attributes %r: found user %s',
+ lookup_by_attributes, user)
+ return user
+ elif len(users) > 1:
+ self.logger.warning(u'looking for user by attributes %r: too many users found(%d), failing',
+ lookup_by_attributes, len(users))
+ return None
+
def _link_user(self, idp, saml_attributes, issuer, name_id, user):
saml_id, created = models.UserSAMLIdentifier.objects.get_or_create(
name_id=name_id, issuer=issuer, defaults={'user': user})
diff --git a/mellon/app_settings.py b/mellon/app_settings.py
index ae095bd..d8dda76 100644
--- a/mellon/app_settings.py
+++ b/mellon/app_settings.py
@@ -40,6 +40,7 @@ class AppSettings(object):
'ARTIFACT_RESOLVE_TIMEOUT': 10.0,
'LOGIN_HINTS': [],
'SIGNATURE_METHOD': 'RSA-SHA256',
+ 'LOOKUP_BY_ATTRIBUTES': [],
}
@property
diff --git a/tests/test_default_adapter.py b/tests/test_default_adapter.py
index 8cf4382..43c9064 100644
--- a/tests/test_default_adapter.py
+++ b/tests/test_default_adapter.py
@@ -11,23 +11,42 @@ from mellon.backends import SAMLBackend
pytestmark = pytest.mark.django_db
-idp = {
- 'METADATA': open('tests/metadata.xml').read(),
-}
-saml_attributes = {
- 'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
- 'name_id_content': 'x' * 32,
- 'issuer': 'http://idp5/metadata',
- 'username': ['foobar'],
- 'email': ['test@example.net'],
- 'first_name': ['Foo'],
- 'last_name': ['Bar'],
- 'is_superuser': ['true'],
- 'group': ['GroupA', 'GroupB', 'GroupC'],
-}
-
-
-def test_format_username(settings):
+User = auth.get_user_model()
+
+
+@pytest.fixture
+def idp():
+ return {
+ 'METADATA': open('tests/metadata.xml').read(),
+ }
+
+
+@pytest.fixture
+def saml_attributes():
+ return {
+ 'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
+ 'name_id_content': 'x' * 32,
+ 'issuer': 'http://idp5/metadata',
+ 'username': ['foobar'],
+ 'email': ['test@example.net'],
+ 'first_name': ['Foo'],
+ 'last_name': ['Bar'],
+ 'is_superuser': ['true'],
+ 'group': ['GroupA', 'GroupB', 'GroupC'],
+ }
+
+
+@pytest.fixture
+def john(db):
+ return User.objects.create(username='john.doe', email='john.doe@example.com')
+
+
+@pytest.fixture
+def jane(db):
+ return User.objects.create(username='jane.doe', email='john.doe@example.com')
+
+
+def test_format_username(settings, idp, saml_attributes):
adapter = DefaultAdapter()
assert adapter.format_username(idp, {}) is None
assert adapter.format_username(idp, saml_attributes) == ('x' * 32 + '@saml')[:30]
@@ -37,8 +56,7 @@ def test_format_username(settings):
assert adapter.format_username(idp, saml_attributes) == 'foobar'
-def test_lookup_user(settings):
- User = auth.get_user_model()
+def test_lookup_user(settings, idp, saml_attributes):
adapter = DefaultAdapter()
user = adapter.lookup_user(idp, saml_attributes)
assert user is not None
@@ -55,7 +73,7 @@ def test_lookup_user(settings):
assert User.objects.count() == 0
-def test_lookup_user_transaction(transactional_db, concurrency):
+def test_lookup_user_transaction(transactional_db, concurrency, idp, saml_attributes):
adapter = DefaultAdapter()
p = ThreadPool(concurrency)
@@ -81,7 +99,7 @@ def test_lookup_user_transaction(transactional_db, concurrency):
assert len(set(user.pk for user in users)) == 1
-def test_provision_user_attributes(settings, django_user_model, caplog):
+def test_provision_user_attributes(settings, django_user_model, idp, saml_attributes, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_ATTRIBUTE_MAPPING = {
'email': u'{attributes[email][0]}',
@@ -102,7 +120,7 @@ def test_provision_user_attributes(settings, django_user_model, caplog):
assert 'set field email' in caplog.text
-def test_provision_user_groups(settings, django_user_model, caplog):
+def test_provision_user_groups(settings, django_user_model, idp, saml_attributes, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_GROUP_ATTRIBUTE = 'group'
user = SAMLBackend().authenticate(saml_attributes=saml_attributes)
@@ -118,11 +136,11 @@ def test_provision_user_groups(settings, django_user_model, caplog):
user = SAMLBackend().authenticate(saml_attributes=saml_attributes2)
assert user.groups.count() == 2
assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes2['group'])
- assert len(caplog.records) == 5
+ assert len(caplog.records) == 6
assert 'removing group GroupA' in caplog.records[-1].message
-def test_provision_is_superuser(settings, django_user_model, caplog):
+def test_provision_is_superuser(settings, django_user_model, idp, saml_attributes, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_SUPERUSER_MAPPING = {
'is_superuser': 'true',
@@ -137,7 +155,7 @@ def test_provision_is_superuser(settings, django_user_model, caplog):
assert not 'flag is_staff and is_superuser removed' in caplog.text
-def test_provision_absent_attribute(settings, django_user_model, caplog):
+def test_provision_absent_attribute(settings, django_user_model, idp, saml_attributes, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_ATTRIBUTE_MAPPING = {
'email': '{attributes[email][0]}',
@@ -155,7 +173,7 @@ def test_provision_absent_attribute(settings, django_user_model, caplog):
assert 'set field last_name' in caplog.text
-def test_provision_long_attribute(settings, django_user_model, caplog):
+def test_provision_long_attribute(settings, django_user_model, idp, saml_attributes, caplog):
settings.MELLON_IDENTITY_PROVIDERS = [idp]
settings.MELLON_ATTRIBUTE_MAPPING = {
'email': '{attributes[email][0]}',
@@ -174,19 +192,17 @@ def test_provision_long_attribute(settings, django_user_model, caplog):
assert 'set field email' in caplog.text
-def test_lookup_user_transient_with_email(private_settings):
+def test_lookup_user_transient_with_email(private_settings, idp, saml_attributes):
private_settings.MELLON_TRANSIENT_FEDERATION_ATTRIBUTE = 'email'
- User = auth.get_user_model()
adapter = DefaultAdapter()
- saml_attributes2 = saml_attributes.copy()
- saml_attributes2['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
+ saml_attributes['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
assert User.objects.count() == 0
- user = adapter.lookup_user(idp, saml_attributes2)
+ user = adapter.lookup_user(idp, saml_attributes)
assert user is not None
assert user.saml_identifiers.count() == 1
- assert user.saml_identifiers.first().name_id == saml_attributes2['email'][0]
+ assert user.saml_identifiers.first().name_id == saml_attributes['email'][0]
- user2 = adapter.lookup_user(idp, saml_attributes2)
+ user2 = adapter.lookup_user(idp, saml_attributes)
assert user.id == user2.id
User.objects.all().delete()
@@ -196,3 +212,102 @@ def test_lookup_user_transient_with_email(private_settings):
user = adapter.lookup_user(idp, saml_attributes)
assert user is None
assert User.objects.count() == 0
+
+
+def test_lookup_user_by_attributes_bad_setting1(settings, idp, saml_attributes, caplog):
+ settings.MELLON_PROVISION = False
+
+ adapter = DefaultAdapter()
+ settings.MELLON_LOOKUP_BY_ATTRIBUTES = 'coin'
+ assert adapter.lookup_user(idp, saml_attributes) is None
+ assert caplog.records[-1].message.endswith('it must be a list')
+
+
+def test_lookup_user_by_attributes_bad_setting2(settings, idp, saml_attributes, caplog):
+ settings.MELLON_PROVISION = False
+
+ adapter = DefaultAdapter()
+ settings.MELLON_LOOKUP_BY_ATTRIBUTES = ['coin']
+ assert adapter.lookup_user(idp, saml_attributes) is None
+ assert caplog.records[-1].message.endswith('it must be a list of dicts')
+
+
+def test_lookup_user_by_attributes_bad_setting3(settings, idp, saml_attributes, caplog):
+ settings.MELLON_PROVISION = False
+
+ adapter = DefaultAdapter()
+ settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{}]
+ assert adapter.lookup_user(idp, saml_attributes) is None
+ assert caplog.records[-1].message.endswith('user_field is missing')
+
+
+def test_lookup_user_by_attributes_bad_setting4(settings, idp, saml_attributes, caplog):
+ settings.MELLON_PROVISION = False
+
+ adapter = DefaultAdapter()
+ settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username'}]
+ assert adapter.lookup_user(idp, saml_attributes) is None
+ assert caplog.records[-1].message.endswith('saml_attribute is missing')
+
+
+def test_lookup_user_by_attributes_not_found(settings, idp, saml_attributes, caplog):
+ settings.MELLON_PROVISION = False
+
+ adapter = DefaultAdapter()
+ caplog.set_level('DEBUG')
+ settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username', 'saml_attribute': 'saml_at1'}]
+ saml_attributes['saml_at1'] = ['john.doe']
+ assert adapter.lookup_user(idp, saml_attributes) is None
+ assert caplog.records[-2].message.endswith(': not found')
+
+
+def test_lookup_user_by_attributes_too_many1(settings, idp, saml_attributes, john, jane, caplog):
+ settings.MELLON_PROVISION = False
+
+ adapter = DefaultAdapter()
+ settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'email', 'saml_attribute': 'saml_at1'}]
+ saml_attributes['saml_at1'] = ['john.doe@example.com']
+ assert adapter.lookup_user(idp, saml_attributes) is None
+ assert 'too many users found(2)' in caplog.records[-1].message
+
+
+def test_lookup_user_by_attributes_too_manyi2(settings, idp, saml_attributes, john, jane, caplog):
+ settings.MELLON_PROVISION = False
+
+ adapter = DefaultAdapter()
+ saml_attributes['saml_at1'] = ['john.doe']
+ saml_attributes['saml_at2'] = ['jane.doe']
+
+ settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
+ {'user_field': 'username', 'saml_attribute': 'saml_at1'},
+ {'user_field': 'username', 'saml_attribute': 'saml_at2'},
+ ]
+ assert adapter.lookup_user(idp, saml_attributes) is None
+ assert 'too many users found(2)' in caplog.records[-1].message
+
+
+def test_lookup_user_by_attributes_found(settings, idp, saml_attributes, john, jane, caplog):
+ settings.MELLON_PROVISION = False
+
+ adapter = DefaultAdapter()
+ saml_attributes['saml_at1'] = ['john.doe']
+ settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
+ {'user_field': 'username', 'saml_attribute': 'saml_at1'},
+ ]
+ assert adapter.lookup_user(idp, saml_attributes) == john
+
+
+def test_lookup_user_by_attributes_ignore_case(settings, idp, saml_attributes, john, jane, caplog):
+ settings.MELLON_PROVISION = False
+
+ adapter = DefaultAdapter()
+ saml_attributes['saml_at1'] = ['Jane.Doe']
+ settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
+ {'user_field': 'username', 'saml_attribute': 'saml_at1'},
+ ]
+ assert adapter.lookup_user(idp, saml_attributes) is None
+
+ settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
+ {'user_field': 'username', 'saml_attribute': 'saml_at1', 'ignore-case': True},
+ ]
+ assert adapter.lookup_user(idp, saml_attributes) == jane