hobo/hobo/logger.py

157 lines
5.5 KiB
Python

# hobo - portal to configure and deploy applications
# Copyright (C) 2015-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import logging.handlers
import warnings
from django.conf import settings
from django.db import connection
from hobo.middleware.utils import StoreRequestMiddleware
class SettingsLogLevel(str):
def __new__(cls, value):
warnings.warn(
'SettingsLogLevel is deprecated, use DEBUG_LOG instead.', DeprecationWarning, stacklevel=2
)
return super().__new__(value)
class RequestContextFilter(logging.Filter):
DEFAULT_TENANT = '-'
DEFAULT_IP = '-'
DEFAULT_PATH = '-'
DEFAULT_REQUEST_ID = '-'
DEFAULT_USER = '-'
DEFAULT_USER_NAME = '-'
DEFAULT_USER_EMAIL = '-'
DEFAULT_USER_DISPLAY_NAME = '-'
DEFAULT_USER_UUID = '-'
DEFAULT_APPLICATION = 'django'
def filter(self, record):
"""Add username, ip and request ID to the log record.
Inspired by django-log-request-id
"""
# remove http 403 records, already logged by uwsgi
if hasattr(record, 'status_code') and record.status_code == 403:
return False
# prevent multiple execution on the same record
if getattr(record, 'request_context', False):
return True
# lookup request from record then StoreRequestMiddleware
if not hasattr(record, 'request'):
record.request = StoreRequestMiddleware.get_request()
request = record.request
# lookup user from record then from request
if not hasattr(record, 'user'):
if (
hasattr(request, 'user')
and hasattr(request.user, 'is_authenticated')
and request.user.is_authenticated
):
record.user = request.user
else:
record.user = None
user = record.user
# lookup tenant
tenant = getattr(connection, 'tenant', None)
record.tenant = getattr(tenant, 'domain_url', self.DEFAULT_TENANT)
record.application = getattr(settings, 'PROJECT_NAME', self.DEFAULT_APPLICATION)
# populate request attributes
record.ip = self.DEFAULT_IP
record.path = self.DEFAULT_PATH
record.request_id = self.DEFAULT_REQUEST_ID
if request is not None and hasattr(request, 'META'):
record.ip = request.META.get('REMOTE_ADDR', self.DEFAULT_IP)
record.path = request.get_full_path()
record.request_id = 'r:' + hex(id(request))[2:].upper()
# populate user attributes
record.user_name = self.DEFAULT_USER_NAME
record.user_email = self.DEFAULT_USER_EMAIL
record.user_display_name = self.DEFAULT_USER_DISPLAY_NAME
record.user_uuid = self.DEFAULT_USER_UUID
record.user = self.DEFAULT_USER
if user is not None:
if hasattr(user, 'saml_identifiers'):
saml_identifier = user.saml_identifiers.first()
if saml_identifier:
record.user_uuid = saml_identifier.name_id
record.user = record.user_uuid[:6]
if hasattr(user, 'original_get_full_name') and user.original_get_full_name():
# record original full name, not templated version from user_name app
record.user = record.user_display_name = user.original_get_full_name()
if getattr(user, 'email', None):
record.user = record.user_email = user.email
if getattr(user, 'username', None):
record.user = record.user_name = user.username
record.request_context = True
return True
class ForceDebugFilter(logging.Filter):
def filter(self, record):
record.levelno = logging.DEBUG
record.levelname = 'DEBUG'
return super().filter(record)
class DebugLogFilter:
'''Filter debug log records based on the DEBUG_LOG setting'''
def filter(self, record):
debug_log = getattr(settings, 'DEBUG_LOG', False)
if record.levelno > logging.DEBUG:
return True
if not debug_log:
return False
if debug_log is True:
return True
elif hasattr(debug_log, 'encode'):
# debug_log is a string
domains = [domain.strip() for domain in debug_log.split(',')]
return any(
record.name == domain or (record.name.startswith(domain) and record.name[len(domain)] == '.')
for domain in domains
)
else:
return bool(debug_log)
class ClampLogLevel(logging.Filter):
def __init__(self, level):
self.levelname = level.upper()
self.levelno = getattr(logging, self.levelname)
super().__init__()
def filter(self, record):
if record.levelno > self.levelno:
record.levelno = self.levelno
record.levelname = self.levelname
return super().filter(record)