hobo/hobo/logger.py

239 lines
8.1 KiB
Python

# hobo - portal to configure and deploy applications
# Copyright (C) 2015-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import ast
import datetime
import logging
import logging.handlers
import os
import pytz
import time
from django.conf import settings
from django.db import connection
from django.utils import six
from hobo.middleware.utils import StoreRequestMiddleware
class SettingsLogLevel(int):
def __new__(cls, default_log_level, debug_setting='DEBUG'):
ob = super(SettingsLogLevel, cls).__new__(
cls, getattr(logging, default_log_level))
ob.debug_setting = debug_setting
return ob
if six.PY2:
def __init__(self, default_log_level, debug_setting='DEBUG'):
self.debug_setting = debug_setting
super(SettingsLogLevel, self).__init__(
getattr(logging, default_log_level))
old_getEffectiveLevel = logging.Logger.getEffectiveLevel
def getEffectiveLevel(self):
level = old_getEffectiveLevel(self)
if isinstance(level, SettingsLogLevel):
from django.conf import settings
debug = getattr(settings, level.debug_setting, False)
if debug:
return logging.DEBUG
return level
# monkeypatch default getEffectiveLevel to get our new behaviour
logging.Logger.getEffectiveLevel = getEffectiveLevel
class RequestContextFilter(logging.Filter):
DEFAULT_TENANT = '-'
DEFAULT_IP = '-'
DEFAULT_PATH = '-'
DEFAULT_REQUEST_ID = '-'
DEFAULT_USER = "-"
DEFAULT_USER_NAME = '-'
DEFAULT_USER_EMAIL = '-'
DEFAULT_USER_DISPLAY_NAME = '-'
DEFAULT_USER_UUID = '-'
DEFAULT_APPLICATION = 'django'
def filter(self, record):
'''Add username, ip and request ID to the log record.
Inspired by django-log-request-id
'''
# lookup request from record then StoreRequestMiddleware
if not hasattr(record, 'request'):
record.request = StoreRequestMiddleware.get_request()
request = record.request
# lookup user from record then from request
if not hasattr(record, 'user'):
if (hasattr(request, 'user')
and hasattr(request.user, 'is_authenticated')
and request.user.is_authenticated):
record.user = request.user
else:
record.user = None
user = record.user
# lookup tenant
tenant = getattr(connection, 'tenant', None)
record.tenant = getattr(tenant, 'domain_url', self.DEFAULT_TENANT)
record.application = getattr(settings, 'PROJECT_NAME', self.DEFAULT_APPLICATION)
# populate request attributes
record.ip = self.DEFAULT_IP
record.path = self.DEFAULT_PATH
record.request_id = self.DEFAULT_REQUEST_ID
if request is not None and hasattr(request, 'META'):
record.ip = request.META.get('REMOTE_ADDR', self.DEFAULT_IP)
record.path = request.get_full_path()
record.request_id = 'r:' + hex(id(request))[2:].upper()
# populate user attributes
record.user_name = self.DEFAULT_USER_NAME
record.user_email = self.DEFAULT_USER_EMAIL
record.user_display_name = self.DEFAULT_USER_DISPLAY_NAME
record.user_uuid = self.DEFAULT_USER_UUID
record.user = self.DEFAULT_USER
if user is not None:
if hasattr(user, 'saml_identifiers'):
saml_identifier = user.saml_identifiers.first()
if saml_identifier:
record.user_uuid = saml_identifier.name_id
record.user = record.user_uuid[:6]
if hasattr(user, 'get_full_name') and user.get_full_name():
record.user = record.user_display_name = user.get_full_name()
if getattr(user, 'email', None):
record.user = record.user_email = user.email
if getattr(user, 'username', None):
record.user = record.user_name = user.username
return True
class ForceDebugFilter(logging.Filter):
def filter(self, record):
record.levelno = logging.DEBUG
record.levelname = 'DEBUG'
return super(ForceDebugFilter, self).filter(record)
class LogRecord(logging.LogRecord):
'''Subclass LogRecord to make multiline log parseable'''
def getMessage(self):
return super(LogRecord, self).getMessage().replace('\n', '\n ')
class TimedRotatingFileHandler(logging.handlers.TimedRotatingFileHandler):
def format(self, record):
old_class = record.__class__
record.__class__ = LogRecord
try:
return super(TimedRotatingFileHandler, self).format(record)
finally:
record.__class__ = old_class
class DebugLogFilter(object):
'''Filter debug log records based on the DEBUG_LOG setting'''
def filter(self, record):
debug_log = getattr(settings, 'DEBUG_LOG', False)
if not debug_log:
return False
# change class to add space after newlines in message
record.__class__ = LogRecord
if debug_log is True:
return True
elif hasattr(debug_log, 'encode'):
# debug_log is a string
domains = [domain.strip() for domain in debug_log.split(',')]
return any(record.name == domain or (record.name.startswith(domain) and record.name[len(domain)] == '.')
for domain in domains)
else:
return bool(debug_log)
class DebugLog(object):
def __init__(self, path):
self.path = path
def _pre_lines(self, cursor=0):
if not os.path.exists(self.path):
return
with open(self.path, 'rb') as fd:
accum = ''
try:
fd.seek(cursor)
except Exception:
return
for line in fd:
size = len(line)
cursor += size
line = line.decode('utf-8')
if not accum:
accum = line
elif not line.startswith(' '):
yield cursor - size, accum
accum = line
else:
accum += line[1:]
if accum:
yield cursor, accum
keys = ['tenant', 'ip', 'user', 'request_id', 'level', 'logger']
def _parse(self, cursor=0):
for cursor, line in self._pre_lines(cursor=cursor):
if line.endswith('\n'):
line = line[:-1]
parts = line.split(' \x1f', settings.DEBUG_LOG_FORMAT.count(' \x1f'))
try:
timestamp = datetime.datetime.strptime(parts[0], '%Y-%m-%d %H:%M:%S,%f')
timestamp = pytz.timezone(time.tzname[0]).localize(timestamp)
except ValueError:
continue
message = parts[-1]
d = {key: value for key, value in zip(self.keys, parts[1:-1])}
if 'user' in d:
try:
d['user'] = ast.literal_eval(d['user'])
except SyntaxError:
pass
d.update({
'cursor': cursor,
'timestamp': timestamp,
'message': message,
})
yield d
@classmethod
def lines(cls, cursor=0):
debug_log_path = getattr(settings, 'DEBUG_LOG_PATH', None)
if not debug_log_path:
return
if not os.path.exists(debug_log_path):
return
for record in cls(debug_log_path)._parse(cursor=cursor):
yield record