ldap: prevent use of lock_email outside of a transaction (#70439)
gitea/authentic/pipeline/head There was a failure building this commit
Details
gitea/authentic/pipeline/head There was a failure building this commit
Details
This commit is contained in:
parent
5726eadf20
commit
0c77d67608
|
@ -1431,14 +1431,20 @@ class LDAPBackend:
|
|||
)
|
||||
return None
|
||||
|
||||
def _lookup_by_email(self, ou, block, attributes):
|
||||
def _get_email_from_attributes(self, block, attributes):
|
||||
email_field = block.get('email_field')
|
||||
if not email_field:
|
||||
return
|
||||
if email_field not in attributes:
|
||||
return
|
||||
email = attributes[email_field][0]
|
||||
Lock.lock_email(email)
|
||||
return attributes[email_field][0]
|
||||
|
||||
def _lookup_by_email(self, ou, block, attributes, lock=False):
|
||||
email = self._get_email_from_attributes(block, attributes)
|
||||
if not email:
|
||||
return
|
||||
if lock:
|
||||
Lock.lock_email(email)
|
||||
try:
|
||||
log.debug('ldap: lookup using email %r', email)
|
||||
return self._lookup_user_queryset(block=block).filter(ou=ou).get_by_email(email)
|
||||
|
@ -1514,7 +1520,7 @@ class LDAPBackend:
|
|||
return attribute, guid
|
||||
return None, None
|
||||
|
||||
def _lookup_existing_user(self, username, block, attributes):
|
||||
def _lookup_existing_user(self, username, block, attributes, lock=False):
|
||||
user = None
|
||||
ou = self._get_target_ou(block)
|
||||
for lookup_type in block['lookups']:
|
||||
|
@ -1527,7 +1533,7 @@ class LDAPBackend:
|
|||
elif lookup_type == 'external_id' and attributes:
|
||||
user = self._lookup_by_external_id(block=block, attributes=attributes)
|
||||
elif lookup_type == 'email' and attributes:
|
||||
user = self._lookup_by_email(ou=ou, block=block, attributes=attributes)
|
||||
user = self._lookup_by_email(ou=ou, block=block, attributes=attributes, lock=lock)
|
||||
elif lookup_type == 'guid' and attributes:
|
||||
user = self._lookup_by_guid(block=block, attributes=attributes)
|
||||
if user:
|
||||
|
@ -1634,7 +1640,7 @@ class LDAPBackend:
|
|||
def _return_django_user(self, dn, username, password, conn, block, attributes):
|
||||
from authentic2.manager.journal_event_types import ManagerUserActivation
|
||||
|
||||
user = self._lookup_existing_user(username, block, attributes)
|
||||
user = self._lookup_existing_user(username, block, attributes, lock=True)
|
||||
if user:
|
||||
log.debug('found existing user %r', user)
|
||||
else:
|
||||
|
|
|
@ -325,7 +325,7 @@ def test_connection_timeout_options(slapd, wraps_ldap_set_option, db, settings):
|
|||
assert network_timeout_set
|
||||
|
||||
|
||||
def test_simple(slapd, settings, client, db):
|
||||
def test_simple(slapd, settings, client, transactional_db):
|
||||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
|
@ -3258,3 +3258,19 @@ def test_deactivate_orphaned_users_when_no_provisionning(slapd, settings, client
|
|||
resp = app.get('/manage/users/%s/' % deactivated_user.user.pk)
|
||||
assert 'Deactivated' in resp.text
|
||||
assert 'associated LDAP account does not exist anymore' in resp.text
|
||||
|
||||
|
||||
def test_good_user_wrong_password(slapd, transactional_db, settings, client):
|
||||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
'binddn': force_str(DN),
|
||||
'bindpw': PASS,
|
||||
'basedn': 'o=ôrga',
|
||||
'use_tls': False,
|
||||
}
|
||||
]
|
||||
result = client.post(
|
||||
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': 'wrong'}, follow=True
|
||||
)
|
||||
assert result.status_code == 200
|
||||
|
|
Loading…
Reference in New Issue