168 lines
5.7 KiB
Python
168 lines
5.7 KiB
Python
import logging
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
from _pytest.logging import LogCaptureHandler
|
|
from django.contrib.auth.models import User
|
|
from django.test import override_settings
|
|
from mellon.models import Issuer
|
|
from tenant_schemas.utils import tenant_context
|
|
|
|
from hobo.journal import JournalHandler
|
|
from hobo.logger import DebugLogFilter, RequestContextFilter
|
|
|
|
|
|
class MockSender:
|
|
def __init__(self):
|
|
self.calls = []
|
|
|
|
def send(self, *args, **kwargs):
|
|
self.calls.append((args, kwargs))
|
|
|
|
|
|
def test_request_context_filter(caplog, settings, tenants, client):
|
|
root_logger = logging.getLogger()
|
|
for handler in root_logger.handlers:
|
|
if handler.__class__ == LogCaptureHandler:
|
|
handler.addFilter(RequestContextFilter())
|
|
break
|
|
else:
|
|
assert False, 'No LogCaptureHandler found'
|
|
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
user = User.objects.create(
|
|
first_name='John', last_name='Doe', username='john.doe', email='jodn.doe@example.com'
|
|
)
|
|
user.set_password('john.doe')
|
|
user.save()
|
|
issuer, dummy = Issuer.objects.get_or_create(entity_id='https://idp.example.com')
|
|
user.saml_identifiers.create(name_id='ab' * 16, issuer=issuer)
|
|
|
|
for tenant in tenants:
|
|
settings.ALLOWED_HOSTS.append(tenant.domain_url)
|
|
with override_settings(ROOT_URLCONF='hobo.test_urls'):
|
|
client.get(
|
|
'/?forbidden=123',
|
|
SERVER_NAME=tenant.domain_url,
|
|
HTTP_X_FORWARDED_FOR='99.99.99.99, 127.0.0.1',
|
|
)
|
|
records = [record for record in caplog.records]
|
|
assert len(records) == 2 # on test_urls' "wat!" test error has been logged
|
|
for record in records:
|
|
assert not hasattr(record, 'status_code') # hence no 403 logged
|
|
assert record.msg != 'forbidden access'
|
|
caplog.clear()
|
|
|
|
for tenant in tenants:
|
|
settings.ALLOWED_HOSTS.append(tenant.domain_url)
|
|
with tenant_context(tenant):
|
|
client.login(username='john.doe', password='john.doe')
|
|
client.get('/', SERVER_NAME=tenant.domain_url, HTTP_X_FORWARDED_FOR='99.99.99.99, 127.0.0.1')
|
|
records = [record for record in caplog.records if record.levelno >= logging.INFO]
|
|
assert len(records) == 2
|
|
for tenant, record in zip(tenants, records):
|
|
assert record.ip == '99.99.99.99'
|
|
assert record.tenant == tenant.domain_url
|
|
assert record.path == '/'
|
|
assert record.request_id.startswith('r:')
|
|
assert record.user == user.username
|
|
assert record.user_email == user.email
|
|
assert record.user_name == user.username
|
|
assert record.user_display_name == 'John Doe'
|
|
assert record.user_uuid == 'ab' * 16
|
|
assert record.application == 'fake-agent'
|
|
|
|
|
|
@pytest.fixture
|
|
def sender():
|
|
with mock.patch('hobo.journal.journal.send') as sender:
|
|
yield sender
|
|
|
|
|
|
@pytest.fixture
|
|
def journald_handler(sender):
|
|
root_logger = logging.getLogger()
|
|
journald_handler = JournalHandler()
|
|
journald_handler.addFilter(RequestContextFilter())
|
|
|
|
root_logger.handlers.insert(0, journald_handler) # head insert
|
|
try:
|
|
yield journald_handler
|
|
finally:
|
|
root_logger.handlers.pop(0)
|
|
|
|
|
|
def test_systemd(settings, tenants, client, journald_handler, sender):
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
user = User.objects.create(
|
|
first_name='John', last_name='Doe', username='john.doe', email='jodn.doe@example.com'
|
|
)
|
|
user.set_password('john.doe')
|
|
user.save()
|
|
issuer, dummy = Issuer.objects.get_or_create(entity_id='https://idp.example.com')
|
|
user.saml_identifiers.create(name_id='ab' * 16, issuer=issuer)
|
|
|
|
for tenant in tenants:
|
|
settings.ALLOWED_HOSTS.append(tenant.domain_url)
|
|
with tenant_context(tenant):
|
|
client.login(username='john.doe', password='john.doe')
|
|
client.get('/', SERVER_NAME=tenant.domain_url, HTTP_X_FORWARDED_FOR='99.99.99.99, 127.0.0.1')
|
|
|
|
assert len(sender.mock_calls) == 2
|
|
for tenant, call in zip(tenants, sender.mock_calls):
|
|
for key, value in [
|
|
('APPLICATION', 'fake-agent'),
|
|
('IP', '99.99.99.99'),
|
|
('LEVELNAME', 'ERROR'),
|
|
('LEVELNO', 40),
|
|
('LOGGER', 'hobo.test_urls'),
|
|
('MESSAGE', 'wat!'),
|
|
('MODULE', 'test_urls'),
|
|
('MSG', 'wat!'),
|
|
('NAME', 'hobo.test_urls'),
|
|
('PATH', '/'),
|
|
('PRIORITY', '3'),
|
|
('REQUEST_ID', mock.ANY),
|
|
('TENANT', tenant.domain_url),
|
|
('USER_DISPLAY_NAME', 'John Doe'),
|
|
('USER_EMAIL', 'jodn.doe@example.com'),
|
|
('USER', 'john.doe'),
|
|
('USER_NAME', 'john.doe'),
|
|
('USER_UUID', 'abababababababababababababababab'),
|
|
]:
|
|
assert call.kwargs[key] == value
|
|
|
|
|
|
def test_debug_log_filter(caplog, settings):
|
|
# default caplog log level is INFO
|
|
caplog.set_level(logging.DEBUG)
|
|
|
|
root_logger = logging.getLogger()
|
|
caplog.handler.addFilter(DebugLogFilter())
|
|
|
|
root_logger.debug('l1')
|
|
assert 'l1' not in caplog.text
|
|
|
|
settings.DEBUG_LOG = True
|
|
root_logger.debug('l2')
|
|
assert 'l2' in caplog.text
|
|
|
|
settings.DEBUG_LOG = False
|
|
root_logger.debug('l3')
|
|
assert 'l3' not in caplog.text
|
|
|
|
settings.DEBUG_LOG = 'app1,app2'
|
|
root_logger.debug('l4')
|
|
assert 'l4' not in caplog.text
|
|
|
|
logging.getLogger('app3').debug('l5')
|
|
assert 'l5' not in caplog.text
|
|
|
|
logging.getLogger('app1').debug('l6')
|
|
assert 'l6' in caplog.text
|
|
|
|
logging.getLogger('app2').debug('l7')
|
|
assert 'l7' in caplog.text
|