commands: prevent duplicated log messages in sync-ldap-users (#58404)
This commit is contained in:
parent
2ec1984adc
commit
30099cc66d
|
@ -22,41 +22,51 @@ except ImportError:
|
|||
ldap = None
|
||||
|
||||
import logging
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from authentic2.backends.ldap_backend import LDAPBackend
|
||||
|
||||
|
||||
@contextmanager
|
||||
def log_ldap_to_console(verbosity):
|
||||
if 'TERM' not in os.environ:
|
||||
yield
|
||||
else:
|
||||
handler = logging.StreamHandler()
|
||||
# add timestamp to messages
|
||||
formatter = logging.Formatter(fmt='%(asctime)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
if verbosity == 1:
|
||||
handler.setLevel(logging.ERROR)
|
||||
elif verbosity == 2:
|
||||
handler.setLevel(logging.INFO)
|
||||
elif verbosity == 3:
|
||||
handler.setLevel(logging.DEBUG)
|
||||
|
||||
logger = logging.getLogger('authentic2.backends.ldap_backend')
|
||||
initial_level = logger.level
|
||||
try:
|
||||
logger.propagate = False
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.addHandler(handler)
|
||||
yield
|
||||
finally:
|
||||
logger.propagate = True
|
||||
logger.setLevel(initial_level)
|
||||
logger.removeHandler(handler)
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('--realm', help='Limit sync to this realm')
|
||||
|
||||
def handle(self, *args, **kwargs):
|
||||
root_logger = logging.getLogger()
|
||||
ldap_logger = logging.getLogger('authentic2.backends.ldap_backend')
|
||||
|
||||
# ensure log messages are displayed only once on terminal
|
||||
stream_handlers = [
|
||||
x for x in root_logger.handlers if isinstance(x, logging.StreamHandler) if x.stream.isatty()
|
||||
]
|
||||
if stream_handlers:
|
||||
handler = stream_handlers[0]
|
||||
else:
|
||||
handler = logging.StreamHandler()
|
||||
ldap_logger.addHandler(handler)
|
||||
|
||||
# add timestamp to messages
|
||||
formatter = logging.Formatter(fmt='%(asctime)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
verbosity = int(kwargs['verbosity'])
|
||||
if verbosity == 1:
|
||||
ldap_logger.setLevel(logging.ERROR)
|
||||
elif verbosity == 2:
|
||||
ldap_logger.setLevel(logging.INFO)
|
||||
elif verbosity == 3:
|
||||
ldap_logger.setLevel(logging.DEBUG)
|
||||
|
||||
for dummy in LDAPBackend.get_users(realm=kwargs['realm']):
|
||||
continue
|
||||
with log_ldap_to_console(verbosity):
|
||||
for dummy in LDAPBackend.get_users(realm=kwargs['realm']):
|
||||
continue
|
||||
|
|
|
@ -1756,15 +1756,15 @@ def test_ou_selector_default_ou(slapd, settings, app, ou1):
|
|||
assert '_auth_user_id' in app.session
|
||||
|
||||
|
||||
def test_sync_ldap_users(slapd, settings, app, db, caplog):
|
||||
caplog.set_level(logging.DEBUG) # force pytest to reset log level after test
|
||||
|
||||
@mock.patch.dict(os.environ, {'TERM': 'xterm-256color'})
|
||||
@mock.patch('authentic2.backends.ldap_backend.logging.StreamHandler.emit')
|
||||
def test_sync_ldap_users(mocked_emit, slapd, settings, app, db):
|
||||
management.call_command('sync-ldap-users')
|
||||
assert len(caplog.records) == 0
|
||||
assert mocked_emit.call_count == 0
|
||||
|
||||
management.call_command('sync-ldap-users', verbosity=2)
|
||||
assert len(caplog.records) == 1
|
||||
assert caplog.records[0].message == 'No LDAP server configured.'
|
||||
assert mocked_emit.call_count == 1
|
||||
assert mocked_emit.call_args[0][0].getMessage() == 'No LDAP server configured.'
|
||||
|
||||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
|
@ -1793,20 +1793,25 @@ def test_sync_ldap_users(slapd, settings, app, db, caplog):
|
|||
)
|
||||
|
||||
assert User.objects.count() == 0
|
||||
caplog.clear()
|
||||
mocked_emit.reset_mock()
|
||||
management.call_command('sync-ldap-users', verbosity=2)
|
||||
assert len(caplog.records) == 11
|
||||
assert caplog.messages[0] == 'Synchronising users from realm "ldap"'
|
||||
assert caplog.messages[1] == 'Binding to server %s (anonymously)' % slapd.ldap_url
|
||||
assert mocked_emit.call_count == 11
|
||||
assert mocked_emit.call_args_list[0][0][0].getMessage() == 'Synchronising users from realm "ldap"'
|
||||
assert (
|
||||
caplog.messages[2]
|
||||
mocked_emit.call_args_list[1][0][0].getMessage()
|
||||
== 'Binding to server %s (anonymously)' % slapd.ldap_url
|
||||
)
|
||||
assert (
|
||||
mocked_emit.call_args_list[2][0][0].getMessage()
|
||||
== (
|
||||
"Updated user etienne.michu@ldap (uuid %s) from dn=cn=Étienne Michu,o=ôrga, uid=['etienne.michu'], "
|
||||
"sn=['Michu'], givenname=['Étienne'], l=['Paris'], mail=['etienne.michu@example.net']"
|
||||
)
|
||||
% User.objects.first().uuid
|
||||
)
|
||||
assert caplog.messages[-1] == 'Search for (|(mail=*)(uid=*)) returned 7 users.'
|
||||
assert (
|
||||
mocked_emit.call_args_list[-1][0][0].getMessage() == 'Search for (|(mail=*)(uid=*)) returned 7 users.'
|
||||
)
|
||||
|
||||
assert User.objects.count() == 6
|
||||
assert all(user.first_name == 'Étienne' for user in User.objects.all())
|
||||
|
@ -1826,9 +1831,9 @@ def test_sync_ldap_users(slapd, settings, app, db, caplog):
|
|||
)
|
||||
|
||||
User.objects.update(first_name='John')
|
||||
caplog.clear()
|
||||
mocked_emit.reset_mock()
|
||||
management.call_command('sync-ldap-users', verbosity=3)
|
||||
assert len(caplog.records) == 42
|
||||
assert mocked_emit.call_count == 42
|
||||
|
||||
|
||||
def test_get_users_select_realm(slapd, settings, db, caplog):
|
||||
|
|
Loading…
Reference in New Issue