ldap: use get_by_email for ldap email lookup (#67600)
This commit is contained in:
parent
d271064052
commit
39ee9d898a
|
@ -1441,7 +1441,7 @@ class LDAPBackend:
|
|||
Lock.lock_email(email)
|
||||
try:
|
||||
log.debug('ldap: lookup using email %r', email)
|
||||
return self._lookup_user_queryset(block=block).get(ou=ou, email=email)
|
||||
return self._lookup_user_queryset(block=block).filter(ou=ou).get_by_email(email)
|
||||
except LDAPUser.DoesNotExist:
|
||||
return None
|
||||
except LDAPUser.MultipleObjectsReturned:
|
||||
|
|
|
@ -25,8 +25,10 @@ from unittest import mock
|
|||
import ldap
|
||||
import pytest
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.models import Group
|
||||
from django.core import mail, management
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.db.models.query import QuerySet
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from django.utils.encoding import force_bytes, force_str
|
||||
|
@ -38,6 +40,8 @@ from authentic2.a2_rbac.models import OrganizationalUnit, Role
|
|||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
from authentic2.apps.authenticators.models import LoginPasswordAuthenticator
|
||||
from authentic2.backends import ldap_backend
|
||||
from authentic2.backends.ldap_backend import LDAPObject
|
||||
from authentic2.ldap_utils import DnFormatter, FilterFormatter
|
||||
from authentic2.models import Service
|
||||
from authentic2.utils import crypto, switch_user
|
||||
from authentic2.utils.misc import PasswordChangeError, authenticate
|
||||
|
@ -232,7 +236,7 @@ cn: Étienne Michu
|
|||
sn: Michu
|
||||
gn: Étienne
|
||||
l: locality{i}
|
||||
mail: etienne.michu@example.net
|
||||
mail: etienne.michu{i}@example.net
|
||||
|
||||
'''.format(
|
||||
i=i, password=PASS
|
||||
|
@ -253,7 +257,6 @@ memberUid: {uid}
|
|||
@pytest.fixture
|
||||
def wraps_ldap_set_option(monkeypatch):
|
||||
mock_set_option = mock.Mock()
|
||||
from authentic2.backends.ldap_backend import LDAPObject
|
||||
|
||||
old_set_option = LDAPObject.set_option
|
||||
|
||||
|
@ -637,8 +640,6 @@ def test_wrong_ou(slapd, settings, client, db):
|
|||
|
||||
|
||||
def test_dn_formatter():
|
||||
from authentic2.ldap_utils import DnFormatter, FilterFormatter
|
||||
|
||||
formatter = FilterFormatter()
|
||||
|
||||
assert formatter.format('uid={uid}', uid='john doe') == 'uid=john doe'
|
||||
|
@ -653,8 +654,6 @@ def test_dn_formatter():
|
|||
|
||||
|
||||
def test_group_mapping(slapd, settings, client, db):
|
||||
from django.contrib.auth.models import Group
|
||||
|
||||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
|
@ -676,8 +675,6 @@ def test_group_mapping(slapd, settings, client, db):
|
|||
|
||||
|
||||
def test_posix_group_mapping(slapd, settings, client, db):
|
||||
from django.contrib.auth.models import Group
|
||||
|
||||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
|
@ -796,8 +793,6 @@ def test_group_to_role_mapping_modify_disabled(slapd, settings, db, app, admin,
|
|||
|
||||
|
||||
def test_group_su(slapd, settings, client, db):
|
||||
from django.contrib.auth.models import Group
|
||||
|
||||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
|
@ -816,8 +811,6 @@ def test_group_su(slapd, settings, client, db):
|
|||
|
||||
|
||||
def test_group_staff(slapd, settings, client, db):
|
||||
from django.contrib.auth.models import Group
|
||||
|
||||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
|
@ -835,91 +828,127 @@ def test_group_staff(slapd, settings, client, db):
|
|||
assert not response.context['user'].is_superuser
|
||||
|
||||
|
||||
def test_get_users(slapd, settings, db, monkeypatch, caplog):
|
||||
import django.db.models.base
|
||||
from django.contrib.auth.models import Group
|
||||
class TestGetUsers:
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self, settings, slapd):
|
||||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
'basedn': 'o=ôrga',
|
||||
'use_tls': False,
|
||||
'create_group': True,
|
||||
'group_mapping': [
|
||||
['cn=group2,o=ôrga', ['Group2']],
|
||||
],
|
||||
'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))',
|
||||
'group_to_role_mapping': [
|
||||
['cn=unknown,o=dn', ['Role2']],
|
||||
],
|
||||
}
|
||||
]
|
||||
|
||||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
'basedn': 'o=ôrga',
|
||||
'use_tls': False,
|
||||
'create_group': True,
|
||||
'group_mapping': [
|
||||
['cn=group2,o=ôrga', ['Group2']],
|
||||
],
|
||||
'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))',
|
||||
'group_to_role_mapping': [
|
||||
['cn=unknown,o=dn', ['Role2']],
|
||||
],
|
||||
'lookups': ['external_id', 'username'],
|
||||
}
|
||||
]
|
||||
save = mock.Mock(wraps=ldap_backend.LDAPUser.save)
|
||||
bulk_create = mock.Mock(wraps=django.db.models.query.QuerySet.bulk_create)
|
||||
@pytest.fixture
|
||||
def save(self, monkeypatch):
|
||||
mock_save = mock.Mock(wraps=ldap_backend.LDAPUser.save)
|
||||
|
||||
# pylint: disable=unnecessary-lambda
|
||||
monkeypatch.setattr(ldap_backend.LDAPUser, 'save', lambda *args, **kwargs: save(*args, **kwargs))
|
||||
# pylint: disable=unnecessary-lambda
|
||||
monkeypatch.setattr(
|
||||
django.db.models.query.QuerySet, 'bulk_create', lambda *args, **kwargs: bulk_create(*args, **kwargs)
|
||||
)
|
||||
def save(*args, **kwargs):
|
||||
return mock_save(*args, **kwargs)
|
||||
|
||||
assert Group.objects.count() == 0
|
||||
# Provision all users and their groups
|
||||
assert User.objects.count() == 0
|
||||
users = list(ldap_backend.LDAPBackend.get_users())
|
||||
assert len(users) == 6
|
||||
assert User.objects.count() == 6
|
||||
assert bulk_create.call_count == 1
|
||||
assert save.call_count == 18
|
||||
assert Group.objects.count() == 1
|
||||
assert Group.objects.get().user_set.count() == 1
|
||||
monkeypatch.setattr(ldap_backend.LDAPUser, 'save', save)
|
||||
return mock_save
|
||||
|
||||
# Check that if nothing changed no save() is made
|
||||
save.reset_mock()
|
||||
bulk_create.reset_mock()
|
||||
with utils.check_log(caplog, 'ldap: unknown group "cn=unknown,o=dn" mapped to a role'):
|
||||
@pytest.fixture
|
||||
def bulk_create(self, monkeypatch):
|
||||
mock_bulk_create = mock.Mock(wraps=QuerySet.bulk_create)
|
||||
|
||||
def bulk_create(*args, **kwargs):
|
||||
return mock_bulk_create(*args, **kwargs)
|
||||
|
||||
monkeypatch.setattr(QuerySet, 'bulk_create', bulk_create)
|
||||
return mock_bulk_create
|
||||
|
||||
def test_get_users_basic(self, slapd, db, save, bulk_create, caplog):
|
||||
assert Group.objects.count() == 0
|
||||
assert User.objects.count() == 0
|
||||
|
||||
# Provision all users and their groups
|
||||
users = list(ldap_backend.LDAPBackend.get_users())
|
||||
assert save.call_count == 0
|
||||
assert bulk_create.call_count == 0
|
||||
assert len(users) == 6
|
||||
assert User.objects.count() == 6
|
||||
assert bulk_create.call_count == 1
|
||||
assert save.call_count == 18
|
||||
assert Group.objects.count() == 1
|
||||
assert Group.objects.get().user_set.count() == 1
|
||||
|
||||
# Check that if we delete 1 user, only this user is created
|
||||
save.reset_mock()
|
||||
bulk_create.reset_mock()
|
||||
User.objects.filter(username='etienne.michu@ldap').delete()
|
||||
assert User.objects.count() == 5
|
||||
users = list(ldap_backend.LDAPBackend.get_users())
|
||||
assert len(users) == 6
|
||||
assert User.objects.count() == 6
|
||||
assert save.call_count == 3
|
||||
assert bulk_create.call_count == 1
|
||||
# Check that if nothing changed no save() is made
|
||||
save.reset_mock()
|
||||
bulk_create.reset_mock()
|
||||
with utils.check_log(caplog, 'ldap: unknown group "cn=unknown,o=dn" mapped to a role'):
|
||||
users = list(ldap_backend.LDAPBackend.get_users())
|
||||
assert save.call_count == 0
|
||||
assert bulk_create.call_count == 0
|
||||
|
||||
# uppercase user uid in the directory and check that no new user is created
|
||||
conn = slapd.get_connection_admin()
|
||||
ldif = [(ldap.MOD_REPLACE, 'uid', force_bytes(UID.upper()))]
|
||||
conn.modify_s(DN, ldif)
|
||||
save.reset_mock()
|
||||
bulk_create.reset_mock()
|
||||
users = list(ldap_backend.LDAPBackend.get_users())
|
||||
assert len(users) == 6
|
||||
assert User.objects.count() == 6
|
||||
assert save.call_count == 0
|
||||
assert bulk_create.call_count == 0
|
||||
# Check that if we delete 1 user, only this user is created
|
||||
save.reset_mock()
|
||||
bulk_create.reset_mock()
|
||||
User.objects.filter(username='etienne.michu@ldap').delete()
|
||||
assert User.objects.count() == 5
|
||||
users = list(ldap_backend.LDAPBackend.get_users())
|
||||
assert len(users) == 6
|
||||
assert User.objects.count() == 6
|
||||
assert save.call_count == 3
|
||||
assert bulk_create.call_count == 1
|
||||
|
||||
# create user with the same username, but case-different
|
||||
save.reset_mock()
|
||||
bulk_create.reset_mock()
|
||||
u = ldap_backend.LDAPUser.objects.create(username=UID.capitalize())
|
||||
ldap_backend.UserExternalId.objects.create(external_id=UID.capitalize(), source='ldap', user=u)
|
||||
# set user login time as if he logged in
|
||||
user = ldap_backend.LDAPUser.objects.get(username='%s@ldap' % UID)
|
||||
user.last_login = timezone.now()
|
||||
user.save()
|
||||
assert ldap_backend.LDAPUser.objects.count() == 7
|
||||
users = list(ldap_backend.LDAPBackend.get_users())
|
||||
assert len(users) == 6
|
||||
assert ldap_backend.LDAPUser.objects.filter(username='%s' % UID.capitalize()).count() == 0
|
||||
# uppercase user uid in the directory and check that no new user is created
|
||||
conn = slapd.get_connection_admin()
|
||||
ldif = [(ldap.MOD_REPLACE, 'uid', force_bytes(UID.upper()))]
|
||||
conn.modify_s(DN, ldif)
|
||||
save.reset_mock()
|
||||
bulk_create.reset_mock()
|
||||
users = list(ldap_backend.LDAPBackend.get_users())
|
||||
assert len(users) == 6
|
||||
assert User.objects.count() == 6
|
||||
assert save.call_count == 0
|
||||
assert bulk_create.call_count == 0
|
||||
|
||||
def test_get_users_email_lookup_case(self, slapd, db):
|
||||
User.objects.create(
|
||||
username='foo.bar',
|
||||
first_name='foo',
|
||||
last_name='bar',
|
||||
email='EtiEnne.Michu@example.net',
|
||||
ou=get_default_ou(),
|
||||
)
|
||||
|
||||
list(ldap_backend.LDAPBackend.get_users())
|
||||
|
||||
assert User.objects.count() == 6
|
||||
assert ldap_backend.UserExternalId.objects.count() == 6
|
||||
|
||||
def test_get_users_no_duplicate_on_uid_case_change(self, settings, db, save, bulk_create):
|
||||
# https://dev.entrouvert.org/issues/27697
|
||||
# old problem, now that we use guid to federate with LDAP account it does matter anymore
|
||||
settings.LDAP_AUTH_SETTINGS[0]['lookups'] = ['external_id', 'username']
|
||||
|
||||
list(ldap_backend.LDAPBackend.get_users())
|
||||
|
||||
assert ldap_backend.LDAPUser.objects.count() == 6
|
||||
|
||||
# create user with the same username, but case-different
|
||||
user = ldap_backend.LDAPUser.objects.create(username=UID.capitalize())
|
||||
ldap_backend.UserExternalId.objects.create(external_id=UID.capitalize(), source='ldap', user=user)
|
||||
# set user login time as if he logged in
|
||||
user = ldap_backend.LDAPUser.objects.get(username='%s@ldap' % UID)
|
||||
user.last_login = timezone.now()
|
||||
user.save()
|
||||
|
||||
assert ldap_backend.LDAPUser.objects.count() == 7
|
||||
assert ldap_backend.UserExternalId.objects.count() == 7
|
||||
|
||||
list(ldap_backend.LDAPBackend.get_users())
|
||||
assert ldap_backend.LDAPUser.objects.count() == 6
|
||||
assert ldap_backend.UserExternalId.objects.count() == 6
|
||||
assert ldap_backend.LDAPUser.objects.filter(username='%s' % UID.capitalize()).count() == 0
|
||||
|
||||
|
||||
def test_set_mandatory_roles(slapd, settings, db):
|
||||
|
@ -3156,8 +3185,6 @@ def test_authenticate_no_authentication(slapd, settings, client, db):
|
|||
|
||||
|
||||
def test_get_users_no_provisionning(slapd, settings, db, monkeypatch, caplog):
|
||||
from django.contrib.auth.models import Group
|
||||
|
||||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
|
|
Loading…
Reference in New Issue