cron: log basic resource usage summary (#89037)
gitea/wcs/pipeline/head This commit looks good Details

This commit is contained in:
Frédéric Péters 2024-04-21 16:49:43 +02:00
parent 5ef57d4671
commit cc0f8dda1c
5 changed files with 58 additions and 23 deletions

View File

@ -78,30 +78,13 @@ def nocache(settings):
@pytest.fixture
def sql_queries(monkeypatch):
import psycopg2.extensions
import wcs.sql
queries = []
wcs.sql.cleanup_connection()
class LoggingCursor(psycopg2.extensions.cursor):
"""A cursor that logs queries using its connection logging facilities."""
def execute(self, query, vars=None):
queries.append(query)
return super().execute(query, vars)
class MyLoggingConnection(wcs.sql.WcsPgConnection):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cursor_factory = LoggingCursor
backup_original_class = wcs.sql.WcsPgConnection
wcs.sql.WcsPgConnection = MyLoggingConnection
wcs.sql.LoggingCursor.queries = queries
yield queries
wcs.sql.cleanup_connection()
wcs.sql.WcsPgConnection = backup_original_class
@pytest.fixture

View File

@ -261,7 +261,11 @@ def get_logs(hostname=None):
os.path.join(base_dir, 'cron-logs', now.strftime('%Y'), 'cron.log-%s' % now.strftime('%Y%m%d'))
) as fd:
lines = fd.readlines()
lines = [line.split(']', 1)[1].strip() for line in lines] # split on ] to get what follows the PID
# split on ] to get what follows the PID
lines = [line.split(']', 1)[1].strip() for line in lines]
# remove resource usage details
clean_usage_line_re = re.compile('(resource usage summary).*')
lines = [clean_usage_line_re.sub(r'\1', x) for x in lines]
return lines
@ -440,7 +444,7 @@ def test_cron_command_jobs(settings):
clear_log_files()
call_command('cron', job_name='job2', domain='example.net')
assert jobs == ['job2']
assert get_logs('example.net') == ['start', "running jobs: ['job2']"]
assert get_logs('example.net') == ['start', "running jobs: ['job2']", 'resource usage summary']
get_publisher_class().cronjobs = []
jobs = []
clear_log_files()
@ -450,6 +454,7 @@ def test_cron_command_jobs(settings):
'start',
"running jobs: ['job2']",
'long job: job2 (took 0 minutes, 0 CPU minutes)',
'resource usage summary',
]
assert jobs == ['job2']
get_publisher_class().cronjobs = []
@ -464,6 +469,7 @@ def test_cron_command_jobs(settings):
'job3: running on "bar" took 0 minutes, 0 CPU minutes',
'job3: running on "blah" took 0 minutes, 0 CPU minutes',
'long job: job3 (took 0 minutes, 0 CPU minutes)',
'resource usage summary',
]
assert jobs == ['job3']
@ -547,6 +553,7 @@ def test_cron_command_job_exception(settings):
'start',
"running jobs: ['job1']",
'exception running job job1: Error',
'resource usage summary',
]
clean_temporary_pub()
@ -570,7 +577,12 @@ def test_cron_command_job_log(settings):
get_publisher_class().cronjobs = []
clear_log_files()
call_command('cron', job_name='job1', domain='example.net')
assert get_logs('example.net') == ['start', "running jobs: ['job1']", 'hello']
assert get_logs('example.net') == [
'start',
"running jobs: ['job1']",
'hello',
'resource usage summary',
]
pub.load_site_options()
pub.site_options.set('options', 'cron-log-level', 'debug')

View File

@ -710,4 +710,5 @@ def test_timeout_cron_debug_log(pub):
assert formdef.data_class().get(formdata_id).status == 'wf-st2'
assert get_logs('example.net')[:2] == ['start', "running jobs: ['evaluate_jumps']"]
assert 'applying timeouts on baz' in get_logs('example.net')[2]
assert 'event: timeout-jump' in get_logs('example.net')[3]
assert 'SELECT' in get_logs('example.net')[3]
assert 'event: timeout-jump' in [x for x in get_logs('example.net') if 'SQL' not in x][3]

View File

@ -75,7 +75,7 @@ class CronJob:
log_dir = os.path.join(base_dir, 'cron-logs', now.strftime('%Y'))
os.makedirs(log_dir, exist_ok=True)
with open(os.path.join(log_dir, 'cron.log-%s' % now.strftime('%Y%m%d')), 'a+') as fd:
fd.write('%s [%s] %s\n' % (now.isoformat(), os.getpid(), message))
fd.write('%s [%s] %s\n' % (now.isoformat(), os.getpid(), message.replace('\n', ' ')))
def log_debug(self, message, in_tenant=True):
if get_publisher().get_site_option('cron-log-level') != 'debug':
@ -83,6 +83,10 @@ class CronJob:
memory = psutil.Process().memory_info().rss / (1024 * 1024)
self.log(f'(mem: {memory:.1f}M) {message}', in_tenant=in_tenant)
@classmethod
def log_sql(cls, message, in_tenant=True):
cls.log(f'SQL: {message}', in_tenant=in_tenant)
def is_time(self, timetuple):
minutes = self.minutes
if minutes:
@ -127,6 +131,13 @@ def cron_worker(publisher, since, job_name=None):
if jobs:
CronJob.log('running jobs: %r' % sorted([x.name or x for x in jobs]))
import wcs.sql
wcs.sql.LoggingCursor.queries_count = 0
if get_publisher().get_site_option('cron-log-level') == 'debug':
wcs.sql.LoggingCursor.queries_log_function = CronJob.log_sql
process_start = time.process_time()
memory_start = psutil.Process().memory_info().rss / (1024 * 1024)
for job in jobs:
publisher.current_cron_job = job
publisher.install_lang()
@ -139,3 +150,15 @@ def cron_worker(publisher, since, job_name=None):
except Exception as e:
job.log(f'exception running job {job.name}: {e}')
publisher.capture_exception(sys.exc_info())
if jobs:
wcs.sql.LoggingCursor.queries_log_function = None
process_end = time.process_time()
memory_end = psutil.Process().memory_info().rss / (1024 * 1024)
CronJob.log(
'resource usage summary: CPU time: %.2fs / Memory: %.2fM / SQL queries: %s'
% (
process_end - process_start,
memory_end - memory_start,
wcs.sql.LoggingCursor.queries_count,
)
)

View File

@ -96,9 +96,25 @@ SQL_TYPE_MAPPING = {
}
class LoggingCursor(psycopg2.extensions.cursor):
# keep track of (number of) queries, for tests and cron logging and usage summary.
queries = None
queries_count = 0
queries_log_function = None
def execute(self, query, vars=None):
LoggingCursor.queries_count += 1
if self.queries_log_function:
self.queries_log_function(query)
if self.queries is not None:
self.queries.append(query)
return super().execute(query, vars)
class WcsPgConnection(psycopg2.extensions.connection):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cursor_factory = LoggingCursor
self._wcs_in_transaction = False
self._wcs_savepoints = []