add user lookup by attributes (#33739)
This commit is contained in:
parent
f2908b2ef3
commit
31015e6580
26
README
26
README
|
@ -261,6 +261,32 @@ MELLON_DEFAULT_ASSERTION_CONSUMER_BINDING
|
|||
Should be post or artifact. Default is post. You can refer to the SAML 2.0
|
||||
specification to learn the difference.
|
||||
|
||||
MELLON_LOOKUP_BY_ATTRIBUTES
|
||||
---------------------------
|
||||
|
||||
Allow looking for user with some SAML attributes if the received NameID is
|
||||
still unknown. It must be a list of dictionnaries with two mandatory keys
|
||||
`user_field` and `saml_attribute`. The optionnal key `ignore-case` should be a
|
||||
boolean indicating if the match is case-insensitive (default is to respect the
|
||||
case).
|
||||
|
||||
Each dictionnary is a rule for linking, applying all the rules should only
|
||||
return one user, the boolean operator OR is applied between the rules.
|
||||
|
||||
So for example if you received a SAML attribute named `email` and you want to
|
||||
link user with the same email you would configured it like that:
|
||||
|
||||
MELLON_LOOKUP_BY_ATTRIBUTES = [
|
||||
{
|
||||
'saml_attribute': 'email',
|
||||
'user_field': 'email',
|
||||
}
|
||||
]
|
||||
|
||||
The targeted user(s) field(s) should be as much as possible unique
|
||||
individually, if not django-mellon will refuse to link multiple users matching
|
||||
the rules.
|
||||
|
||||
Tests
|
||||
=====
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ import lasso
|
|||
import requests
|
||||
import requests.exceptions
|
||||
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from django.core.exceptions import PermissionDenied, FieldDoesNotExist
|
||||
from django.contrib import auth
|
||||
from django.contrib.auth.models import Group
|
||||
from django.utils import six
|
||||
|
@ -21,6 +21,15 @@ class UserCreationError(Exception):
|
|||
pass
|
||||
|
||||
|
||||
def display_truncated_list(l, max_length=10):
|
||||
s = '[' + ', '.join(map(six.text_type, l))
|
||||
if len(l) > max_length:
|
||||
s += '..truncated more than %d items (%d)]' % (max_length, len(l))
|
||||
else:
|
||||
s += ']'
|
||||
return s
|
||||
|
||||
|
||||
class DefaultAdapter(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
@ -128,32 +137,97 @@ class DefaultAdapter(object):
|
|||
name_id = saml_attributes['name_id_content']
|
||||
issuer = saml_attributes['issuer']
|
||||
try:
|
||||
return User.objects.get(saml_identifiers__name_id=name_id,
|
||||
user = User.objects.get(saml_identifiers__name_id=name_id,
|
||||
saml_identifiers__issuer=issuer)
|
||||
self.logger.info('looked up user %s with name_id %s from issuer %s',
|
||||
user, name_id, issuer)
|
||||
return user
|
||||
except User.DoesNotExist:
|
||||
pass
|
||||
|
||||
if not utils.get_setting(idp, 'PROVISION'):
|
||||
self.logger.warning('provisionning disabled, login refused')
|
||||
return None
|
||||
user = None
|
||||
lookup_by_attributes = utils.get_setting(idp, 'LOOKUP_BY_ATTRIBUTES')
|
||||
if lookup_by_attributes:
|
||||
user = self._lookup_by_attributes(idp, saml_attributes, lookup_by_attributes)
|
||||
|
||||
created = False
|
||||
if not user:
|
||||
if not utils.get_setting(idp, 'PROVISION'):
|
||||
self.logger.debug('provisionning disabled, login refused')
|
||||
return None
|
||||
created = True
|
||||
user = self.create_user(User)
|
||||
|
||||
user = self.create_user(User)
|
||||
nameid_user = self._link_user(idp, saml_attributes, issuer, name_id, user)
|
||||
if user != nameid_user:
|
||||
self.logger.info('looked up user %s with name_id %s from issuer %s',
|
||||
nameid_user, name_id, issuer)
|
||||
user.delete()
|
||||
if created:
|
||||
user.delete()
|
||||
return nameid_user
|
||||
|
||||
try:
|
||||
self.finish_create_user(idp, saml_attributes, nameid_user)
|
||||
except UserCreationError:
|
||||
nameid_user.delete()
|
||||
return None
|
||||
self.logger.info('created new user %s with name_id %s from issuer %s',
|
||||
nameid_user, name_id, issuer)
|
||||
if created:
|
||||
try:
|
||||
self.finish_create_user(idp, saml_attributes, nameid_user)
|
||||
except UserCreationError:
|
||||
user.delete()
|
||||
return None
|
||||
self.logger.info('created new user %s with name_id %s from issuer %s',
|
||||
nameid_user, name_id, issuer)
|
||||
return nameid_user
|
||||
|
||||
def _lookup_by_attributes(self, idp, saml_attributes, lookup_by_attributes):
|
||||
if not isinstance(lookup_by_attributes, list):
|
||||
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list', lookup_by_attributes)
|
||||
return None
|
||||
|
||||
users = set()
|
||||
for line in lookup_by_attributes:
|
||||
if not isinstance(line, dict):
|
||||
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list of dicts', line)
|
||||
continue
|
||||
user_field = line.get('user_field')
|
||||
if not hasattr(user_field, 'isalpha'):
|
||||
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: user_field is missing', line)
|
||||
continue
|
||||
try:
|
||||
User._meta.get_field(user_field)
|
||||
except FieldDoesNotExist:
|
||||
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r, user field %s does not exist',
|
||||
line, user_field)
|
||||
continue
|
||||
saml_attribute = line.get('saml_attribute')
|
||||
if not hasattr(saml_attribute, 'isalpha'):
|
||||
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: saml_attribute is missing', line)
|
||||
continue
|
||||
values = saml_attributes.get(saml_attribute)
|
||||
if not values:
|
||||
self.logger.error('looking for user by saml attribute %r and user field %r, skipping because empty',
|
||||
saml_attribute, user_field)
|
||||
continue
|
||||
ignore_case = line.get('ignore-case', False)
|
||||
for value in values:
|
||||
key = user_field
|
||||
if ignore_case:
|
||||
key += '__iexact'
|
||||
users_found = User.objects.filter(saml_identifiers__isnull=True, **{key: value})
|
||||
if not users_found:
|
||||
self.logger.debug('looking for users by attribute %r and user field %r with value %r: not found',
|
||||
saml_attribute, user_field, value)
|
||||
continue
|
||||
self.logger.info(u'looking for user by attribute %r and user field %r with value %r: found %s',
|
||||
saml_attribute, user_field, value, display_truncated_list(users_found))
|
||||
users.update(users_found)
|
||||
if len(users) == 1:
|
||||
user = list(users)[0]
|
||||
self.logger.info(u'looking for user by attributes %r: found user %s',
|
||||
lookup_by_attributes, user)
|
||||
return user
|
||||
elif len(users) > 1:
|
||||
self.logger.warning(u'looking for user by attributes %r: too many users found(%d), failing',
|
||||
lookup_by_attributes, len(users))
|
||||
return None
|
||||
|
||||
def _link_user(self, idp, saml_attributes, issuer, name_id, user):
|
||||
saml_id, created = models.UserSAMLIdentifier.objects.get_or_create(
|
||||
name_id=name_id, issuer=issuer, defaults={'user': user})
|
||||
|
|
|
@ -40,6 +40,7 @@ class AppSettings(object):
|
|||
'ARTIFACT_RESOLVE_TIMEOUT': 10.0,
|
||||
'LOGIN_HINTS': [],
|
||||
'SIGNATURE_METHOD': 'RSA-SHA256',
|
||||
'LOOKUP_BY_ATTRIBUTES': [],
|
||||
}
|
||||
|
||||
@property
|
||||
|
|
|
@ -11,23 +11,42 @@ from mellon.backends import SAMLBackend
|
|||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
idp = {
|
||||
'METADATA': open('tests/metadata.xml').read(),
|
||||
}
|
||||
saml_attributes = {
|
||||
'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
|
||||
'name_id_content': 'x' * 32,
|
||||
'issuer': 'http://idp5/metadata',
|
||||
'username': ['foobar'],
|
||||
'email': ['test@example.net'],
|
||||
'first_name': ['Foo'],
|
||||
'last_name': ['Bar'],
|
||||
'is_superuser': ['true'],
|
||||
'group': ['GroupA', 'GroupB', 'GroupC'],
|
||||
}
|
||||
User = auth.get_user_model()
|
||||
|
||||
|
||||
def test_format_username(settings):
|
||||
@pytest.fixture
|
||||
def idp():
|
||||
return {
|
||||
'METADATA': open('tests/metadata.xml').read(),
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def saml_attributes():
|
||||
return {
|
||||
'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
|
||||
'name_id_content': 'x' * 32,
|
||||
'issuer': 'http://idp5/metadata',
|
||||
'username': ['foobar'],
|
||||
'email': ['test@example.net'],
|
||||
'first_name': ['Foo'],
|
||||
'last_name': ['Bar'],
|
||||
'is_superuser': ['true'],
|
||||
'group': ['GroupA', 'GroupB', 'GroupC'],
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def john(db):
|
||||
return User.objects.create(username='john.doe', email='john.doe@example.com')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def jane(db):
|
||||
return User.objects.create(username='jane.doe', email='john.doe@example.com')
|
||||
|
||||
|
||||
def test_format_username(settings, idp, saml_attributes):
|
||||
adapter = DefaultAdapter()
|
||||
assert adapter.format_username(idp, {}) is None
|
||||
assert adapter.format_username(idp, saml_attributes) == ('x' * 32 + '@saml')[:30]
|
||||
|
@ -37,8 +56,7 @@ def test_format_username(settings):
|
|||
assert adapter.format_username(idp, saml_attributes) == 'foobar'
|
||||
|
||||
|
||||
def test_lookup_user(settings):
|
||||
User = auth.get_user_model()
|
||||
def test_lookup_user(settings, idp, saml_attributes):
|
||||
adapter = DefaultAdapter()
|
||||
user = adapter.lookup_user(idp, saml_attributes)
|
||||
assert user is not None
|
||||
|
@ -55,7 +73,7 @@ def test_lookup_user(settings):
|
|||
assert User.objects.count() == 0
|
||||
|
||||
|
||||
def test_lookup_user_transaction(transactional_db, concurrency):
|
||||
def test_lookup_user_transaction(transactional_db, concurrency, idp, saml_attributes):
|
||||
adapter = DefaultAdapter()
|
||||
p = ThreadPool(concurrency)
|
||||
|
||||
|
@ -81,7 +99,7 @@ def test_lookup_user_transaction(transactional_db, concurrency):
|
|||
assert len(set(user.pk for user in users)) == 1
|
||||
|
||||
|
||||
def test_provision_user_attributes(settings, django_user_model, caplog):
|
||||
def test_provision_user_attributes(settings, django_user_model, idp, saml_attributes, caplog):
|
||||
settings.MELLON_IDENTITY_PROVIDERS = [idp]
|
||||
settings.MELLON_ATTRIBUTE_MAPPING = {
|
||||
'email': u'{attributes[email][0]}',
|
||||
|
@ -102,7 +120,7 @@ def test_provision_user_attributes(settings, django_user_model, caplog):
|
|||
assert 'set field email' in caplog.text
|
||||
|
||||
|
||||
def test_provision_user_groups(settings, django_user_model, caplog):
|
||||
def test_provision_user_groups(settings, django_user_model, idp, saml_attributes, caplog):
|
||||
settings.MELLON_IDENTITY_PROVIDERS = [idp]
|
||||
settings.MELLON_GROUP_ATTRIBUTE = 'group'
|
||||
user = SAMLBackend().authenticate(saml_attributes=saml_attributes)
|
||||
|
@ -118,11 +136,11 @@ def test_provision_user_groups(settings, django_user_model, caplog):
|
|||
user = SAMLBackend().authenticate(saml_attributes=saml_attributes2)
|
||||
assert user.groups.count() == 2
|
||||
assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes2['group'])
|
||||
assert len(caplog.records) == 5
|
||||
assert len(caplog.records) == 6
|
||||
assert 'removing group GroupA' in caplog.records[-1].message
|
||||
|
||||
|
||||
def test_provision_is_superuser(settings, django_user_model, caplog):
|
||||
def test_provision_is_superuser(settings, django_user_model, idp, saml_attributes, caplog):
|
||||
settings.MELLON_IDENTITY_PROVIDERS = [idp]
|
||||
settings.MELLON_SUPERUSER_MAPPING = {
|
||||
'is_superuser': 'true',
|
||||
|
@ -137,7 +155,7 @@ def test_provision_is_superuser(settings, django_user_model, caplog):
|
|||
assert not 'flag is_staff and is_superuser removed' in caplog.text
|
||||
|
||||
|
||||
def test_provision_absent_attribute(settings, django_user_model, caplog):
|
||||
def test_provision_absent_attribute(settings, django_user_model, idp, saml_attributes, caplog):
|
||||
settings.MELLON_IDENTITY_PROVIDERS = [idp]
|
||||
settings.MELLON_ATTRIBUTE_MAPPING = {
|
||||
'email': '{attributes[email][0]}',
|
||||
|
@ -155,7 +173,7 @@ def test_provision_absent_attribute(settings, django_user_model, caplog):
|
|||
assert 'set field last_name' in caplog.text
|
||||
|
||||
|
||||
def test_provision_long_attribute(settings, django_user_model, caplog):
|
||||
def test_provision_long_attribute(settings, django_user_model, idp, saml_attributes, caplog):
|
||||
settings.MELLON_IDENTITY_PROVIDERS = [idp]
|
||||
settings.MELLON_ATTRIBUTE_MAPPING = {
|
||||
'email': '{attributes[email][0]}',
|
||||
|
@ -174,19 +192,17 @@ def test_provision_long_attribute(settings, django_user_model, caplog):
|
|||
assert 'set field email' in caplog.text
|
||||
|
||||
|
||||
def test_lookup_user_transient_with_email(private_settings):
|
||||
def test_lookup_user_transient_with_email(private_settings, idp, saml_attributes):
|
||||
private_settings.MELLON_TRANSIENT_FEDERATION_ATTRIBUTE = 'email'
|
||||
User = auth.get_user_model()
|
||||
adapter = DefaultAdapter()
|
||||
saml_attributes2 = saml_attributes.copy()
|
||||
saml_attributes2['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
|
||||
saml_attributes['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
|
||||
assert User.objects.count() == 0
|
||||
user = adapter.lookup_user(idp, saml_attributes2)
|
||||
user = adapter.lookup_user(idp, saml_attributes)
|
||||
assert user is not None
|
||||
assert user.saml_identifiers.count() == 1
|
||||
assert user.saml_identifiers.first().name_id == saml_attributes2['email'][0]
|
||||
assert user.saml_identifiers.first().name_id == saml_attributes['email'][0]
|
||||
|
||||
user2 = adapter.lookup_user(idp, saml_attributes2)
|
||||
user2 = adapter.lookup_user(idp, saml_attributes)
|
||||
assert user.id == user2.id
|
||||
|
||||
User.objects.all().delete()
|
||||
|
@ -196,3 +212,102 @@ def test_lookup_user_transient_with_email(private_settings):
|
|||
user = adapter.lookup_user(idp, saml_attributes)
|
||||
assert user is None
|
||||
assert User.objects.count() == 0
|
||||
|
||||
|
||||
def test_lookup_user_by_attributes_bad_setting1(settings, idp, saml_attributes, caplog):
|
||||
settings.MELLON_PROVISION = False
|
||||
|
||||
adapter = DefaultAdapter()
|
||||
settings.MELLON_LOOKUP_BY_ATTRIBUTES = 'coin'
|
||||
assert adapter.lookup_user(idp, saml_attributes) is None
|
||||
assert caplog.records[-1].message.endswith('it must be a list')
|
||||
|
||||
|
||||
def test_lookup_user_by_attributes_bad_setting2(settings, idp, saml_attributes, caplog):
|
||||
settings.MELLON_PROVISION = False
|
||||
|
||||
adapter = DefaultAdapter()
|
||||
settings.MELLON_LOOKUP_BY_ATTRIBUTES = ['coin']
|
||||
assert adapter.lookup_user(idp, saml_attributes) is None
|
||||
assert caplog.records[-1].message.endswith('it must be a list of dicts')
|
||||
|
||||
|
||||
def test_lookup_user_by_attributes_bad_setting3(settings, idp, saml_attributes, caplog):
|
||||
settings.MELLON_PROVISION = False
|
||||
|
||||
adapter = DefaultAdapter()
|
||||
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{}]
|
||||
assert adapter.lookup_user(idp, saml_attributes) is None
|
||||
assert caplog.records[-1].message.endswith('user_field is missing')
|
||||
|
||||
|
||||
def test_lookup_user_by_attributes_bad_setting4(settings, idp, saml_attributes, caplog):
|
||||
settings.MELLON_PROVISION = False
|
||||
|
||||
adapter = DefaultAdapter()
|
||||
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username'}]
|
||||
assert adapter.lookup_user(idp, saml_attributes) is None
|
||||
assert caplog.records[-1].message.endswith('saml_attribute is missing')
|
||||
|
||||
|
||||
def test_lookup_user_by_attributes_not_found(settings, idp, saml_attributes, caplog):
|
||||
settings.MELLON_PROVISION = False
|
||||
|
||||
adapter = DefaultAdapter()
|
||||
caplog.set_level('DEBUG')
|
||||
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username', 'saml_attribute': 'saml_at1'}]
|
||||
saml_attributes['saml_at1'] = ['john.doe']
|
||||
assert adapter.lookup_user(idp, saml_attributes) is None
|
||||
assert caplog.records[-2].message.endswith(': not found')
|
||||
|
||||
|
||||
def test_lookup_user_by_attributes_too_many1(settings, idp, saml_attributes, john, jane, caplog):
|
||||
settings.MELLON_PROVISION = False
|
||||
|
||||
adapter = DefaultAdapter()
|
||||
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'email', 'saml_attribute': 'saml_at1'}]
|
||||
saml_attributes['saml_at1'] = ['john.doe@example.com']
|
||||
assert adapter.lookup_user(idp, saml_attributes) is None
|
||||
assert 'too many users found(2)' in caplog.records[-1].message
|
||||
|
||||
|
||||
def test_lookup_user_by_attributes_too_manyi2(settings, idp, saml_attributes, john, jane, caplog):
|
||||
settings.MELLON_PROVISION = False
|
||||
|
||||
adapter = DefaultAdapter()
|
||||
saml_attributes['saml_at1'] = ['john.doe']
|
||||
saml_attributes['saml_at2'] = ['jane.doe']
|
||||
|
||||
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
|
||||
{'user_field': 'username', 'saml_attribute': 'saml_at1'},
|
||||
{'user_field': 'username', 'saml_attribute': 'saml_at2'},
|
||||
]
|
||||
assert adapter.lookup_user(idp, saml_attributes) is None
|
||||
assert 'too many users found(2)' in caplog.records[-1].message
|
||||
|
||||
|
||||
def test_lookup_user_by_attributes_found(settings, idp, saml_attributes, john, jane, caplog):
|
||||
settings.MELLON_PROVISION = False
|
||||
|
||||
adapter = DefaultAdapter()
|
||||
saml_attributes['saml_at1'] = ['john.doe']
|
||||
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
|
||||
{'user_field': 'username', 'saml_attribute': 'saml_at1'},
|
||||
]
|
||||
assert adapter.lookup_user(idp, saml_attributes) == john
|
||||
|
||||
|
||||
def test_lookup_user_by_attributes_ignore_case(settings, idp, saml_attributes, john, jane, caplog):
|
||||
settings.MELLON_PROVISION = False
|
||||
|
||||
adapter = DefaultAdapter()
|
||||
saml_attributes['saml_at1'] = ['Jane.Doe']
|
||||
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
|
||||
{'user_field': 'username', 'saml_attribute': 'saml_at1'},
|
||||
]
|
||||
assert adapter.lookup_user(idp, saml_attributes) is None
|
||||
|
||||
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
|
||||
{'user_field': 'username', 'saml_attribute': 'saml_at1', 'ignore-case': True},
|
||||
]
|
||||
assert adapter.lookup_user(idp, saml_attributes) == jane
|
||||
|
|
Loading…
Reference in New Issue