ldap_backend: add a synchronization timeout block option (#63560)
gitea/authentic/pipeline/head This commit looks good Details

This commit is contained in:
Paul Marillonnet 2023-03-14 16:33:43 +01:00
parent ff62daa956
commit a61a732a35
2 changed files with 79 additions and 6 deletions

View File

@ -466,6 +466,7 @@ class LDAPBackend:
'fname_field': 'givenName',
'lname_field': 'sn',
'timeout': 5,
'sync_timeout': 30,
'referrals': False,
'disable_update': False,
'bind_with_username': False,
@ -1645,7 +1646,7 @@ class LDAPBackend:
@classmethod
def get_users_for_block(cls, block):
log.info('Synchronising users from realm "%s"', block['realm'])
conn = cls.get_connection(block)
conn = cls.get_connection(block, synchronization=True)
if conn is None:
log.warning('unable to synchronize with LDAP servers %s', force_str(block['url']))
return
@ -1802,7 +1803,7 @@ class LDAPBackend:
return new_results
@classmethod
def get_connections(cls, block, credentials=(), raises=False):
def get_connections(cls, block, credentials=(), raises=False, synchronization=False):
'''Try each replicas, and yield successfull connections'''
if not block['url']:
raise ImproperlyConfigured("block['url'] must contain at least one url")
@ -1811,9 +1812,12 @@ class LDAPBackend:
for key, value in block['global_ldap_options'].items():
ldap.set_option(key, value)
conn = LDAPObject(url)
if block['timeout'] > 0:
if block['timeout'] > 0 and synchronization is False:
conn.set_option(ldap.OPT_NETWORK_TIMEOUT, block['timeout'])
conn.set_option(ldap.OPT_TIMEOUT, block['timeout'])
elif block['sync_timeout'] > 0 and synchronization is True:
conn.set_option(ldap.OPT_NETWORK_TIMEOUT, block['sync_timeout'])
conn.set_option(ldap.OPT_TIMEOUT, block['sync_timeout'])
conn.set_option(
ldap.OPT_X_TLS_REQUIRE_CERT, getattr(ldap, 'OPT_X_TLS_' + block['require_cert'].upper())
)
@ -1924,9 +1928,11 @@ class LDAPBackend:
return False, 'ldap is down'
@classmethod
def get_connection(cls, block, credentials=(), raises=False):
def get_connection(cls, block, credentials=(), raises=False, synchronization=False):
'''Try to get at least one connection'''
for conn in cls.get_connections(block, credentials=credentials, raises=raises):
for conn in cls.get_connections(
block, credentials=credentials, raises=raises, synchronization=synchronization
):
return conn
@classmethod

View File

@ -245,11 +245,78 @@ memberUid: {uid}
return slapd
@pytest.fixture
def wraps_ldap_set_option(monkeypatch):
mock_set_option = mock.Mock()
from authentic2.backends.ldap_backend import LDAPObject
old_set_option = LDAPObject.set_option
def set_option(self, *args, **kwargs):
mock_set_option(*args, **kwargs)
return old_set_option(self, *args, **kwargs)
monkeypatch.setattr('authentic2.backends.ldap_backend.LDAPObject.set_option', set_option)
return mock_set_option
def test_connection(slapd):
conn = slapd.get_connection()
conn.simple_bind_s(DN, PASS)
def test_connection_timeout_options(slapd, wraps_ldap_set_option, db, settings):
settings.LDAP_AUTH_SETTINGS = [
{
'url': [slapd.ldap_url],
'basedn': 'o=ôrga',
'bindsasl': (),
'binddn': force_str(DN),
'bindpw': PASS,
'global_ldap_options': {},
'require_cert': 'demand',
'cacertfile': '',
'cacertdir': '',
'certfile': cert_file,
'keyfile': key_file,
'use_tls': False,
'referrals': False,
'ldap_options': {},
'connect_with_user_credentials': True,
# relevant options here:
'timeout': 10,
'sync_timeout': 20,
}
]
ldap_backend.LDAPBackend.get_connection(settings.LDAP_AUTH_SETTINGS[0])
timeout_set = False
network_timeout_set = False
for call_args in wraps_ldap_set_option.call_args_list:
if call_args.args[0] == 20482: # OPT_TIMEOUT
assert call_args.args[1] == 10
timeout_set = True
if call_args.args[0] == 20485: # OPT_NETWORK_TIMEOUT
assert call_args.args[1] == 10
network_timeout_set = True
assert timeout_set
assert network_timeout_set
wraps_ldap_set_option.reset_mock()
dummy = [user for user in ldap_backend.LDAPBackend.get_users()]
timeout_set = False
network_timeout_set = False
for call_args in wraps_ldap_set_option.call_args_list:
if call_args.args[0] == 20482: # OPT_TIMEOUT
assert call_args.args[1] == 20
timeout_set = True
if call_args.args[0] == 20485: # OPT_NETWORK_TIMEOUT
assert call_args.args[1] == 20
network_timeout_set = True
assert timeout_set
assert network_timeout_set
def test_simple(slapd, settings, client, db):
settings.LDAP_AUTH_SETTINGS = [
{
@ -2307,7 +2374,7 @@ def test_technical_info_ldap(app, admin, superuser, slapd, settings, monkeypatch
assert 'LDAPTLS_REQCERT=never ldapsearch' in ldap_config_text
def buggy_get_connections(config, credentials, raises):
def buggy_get_connections(config, credentials, raises, synchronization):
raise ldap.LDAPError('some buggy connection error message')
# mock a buggy connection