misc: more logs for some cron jobs (#57604)
gitea-wip/wcs/pipeline/head There was a failure building this commit Details

This commit is contained in:
Lauréline Guérin 2021-10-11 17:59:15 +02:00
parent fd12e0cb51
commit b7ca09ad9d
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
6 changed files with 115 additions and 97 deletions

View File

@ -15,6 +15,7 @@
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import base64
import contextlib
import copy
import datetime
import glob
@ -1767,19 +1768,23 @@ Status of the form just changed (from "{{ form_previous_status }}" to "{{ form_s
Substitutions.register('form_name', category=_('Form'), comment=_('Form Name'))
def clean_drafts(publisher):
def clean_drafts(publisher, **kwargs):
import wcs.qommon.storage as st
from wcs.carddef import CardDef
job = kwargs.pop('job', None)
for formdef in FormDef.select() + CardDef.select():
removal_date = datetime.date.today() - datetime.timedelta(days=formdef.get_drafts_lifespan())
for formdata in formdef.data_class().select(
[st.Equal('status', 'draft'), st.Less('receipt_time', removal_date.timetuple())]
):
formdata.remove_self()
with job.log_long_job(
'%s %s' % (formdef.xml_root_node, formdef.url_name)
) if job else contextlib.ExitStack():
removal_date = datetime.date.today() - datetime.timedelta(days=formdef.get_drafts_lifespan())
for formdata in formdef.data_class().select(
[st.Equal('status', 'draft'), st.Less('receipt_time', removal_date.timetuple())]
):
formdata.remove_self()
def clean_unused_files(publisher):
def clean_unused_files(publisher, **kwargs):
unused_files_behaviour = publisher.get_site_option('unused-files-behaviour')
if unused_files_behaviour not in ('move', 'remove'):
return

View File

@ -22,7 +22,7 @@ import re
import sys
import traceback
import zipfile
from contextlib import contextmanager
from contextlib import ExitStack, contextmanager
from django.utils.encoding import force_text
@ -427,11 +427,13 @@ class WcsPublisher(QommonPublisher):
# Could also could happen on file descriptor exhaustion.
pass
def apply_global_action_timeouts(self):
def apply_global_action_timeouts(self, **kwargs):
from wcs.workflows import Workflow, WorkflowGlobalActionTimeoutTrigger
job = kwargs.pop('job', None)
for workflow in Workflow.select():
WorkflowGlobalActionTimeoutTrigger.apply(workflow)
with job.log_long_job('workflow %s' % workflow.url_name) if job else ExitStack():
WorkflowGlobalActionTimeoutTrigger.apply(workflow)
def migrate_sql(self):
from . import sql

View File

@ -1373,7 +1373,7 @@ TextsDirectory.register(
)
def handle_unused_accounts(publisher):
def handle_unused_accounts(publisher, **kwargs):
if 'password' not in get_cfg('identification', {}).get('methods', []):
return
identities_cfg = get_cfg('identities', {})
@ -1425,7 +1425,7 @@ def handle_unused_accounts(publisher):
# XXX: notify admin too
def handle_expired_tokens(publisher):
def handle_expired_tokens(publisher, **kwargs):
if 'password' not in get_cfg('identification', {}).get('methods', []):
return
now = time.time()

View File

@ -569,7 +569,7 @@ class QommonPublisher(Publisher):
return
cls.cronjobs.append(cronjob)
def clean_nonces(self, delta=60, now=None):
def clean_nonces(self, delta=60, now=None, **kwargs):
nonce_dir = os.path.join(get_publisher().app_dir, 'nonces')
if not os.path.exists(nonce_dir):
return
@ -585,7 +585,7 @@ class QommonPublisher(Publisher):
except locket.LockError:
pass
def clean_sessions(self):
def clean_sessions(self, **kwargs):
cleaning_lock_file = os.path.join(self.app_dir, 'cleaning_sessions.lock')
try:
with locket.lock_file(cleaning_lock_file, timeout=0):
@ -619,7 +619,7 @@ class QommonPublisher(Publisher):
except locket.LockError:
pass
def clean_afterjobs(self):
def clean_afterjobs(self, **kwargs):
now = time.time()
for job_id in AfterJob.keys():
job = AfterJob.get(job_id)
@ -640,17 +640,17 @@ class QommonPublisher(Publisher):
except OSError:
pass
def clean_tempfiles(self):
def clean_tempfiles(self, **kwargs):
now = time.time()
one_month_ago = now - 30 * 86400
self._clean_files(one_month_ago, os.path.join(self.app_dir, 'tempfiles'))
def clean_thumbnails(self):
def clean_thumbnails(self, **kwargs):
now = time.time()
one_month_ago = now - 30 * 86400
self._clean_files(one_month_ago, os.path.join(self.app_dir, 'thumbs'))
def clean_loggederrors(self):
def clean_loggederrors(self, **kwargs):
if not self.loggederror_class:
return

View File

@ -14,6 +14,8 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import contextlib
from quixote import get_publisher
from wcs.workflows import WorkflowStatusItem, register_item_class
@ -98,69 +100,73 @@ def lax_int(s):
return 10000
def send_aggregation_emails(publisher):
def send_aggregation_emails(publisher, **kwargs):
from wcs.formdef import FormDef
publisher.reload_cfg()
site_name = publisher.cfg.get('misc', {}).get('sitename', None)
job = kwargs.pop('job', None)
cache = {}
for aggregate_id in AggregationEmail.keys():
aggregate = AggregationEmail.get(aggregate_id)
aggregate.remove_self()
try:
role = get_publisher().role_class.get(aggregate_id)
except KeyError:
continue
if not role.get_emails():
continue
if not aggregate.items:
continue
last_formdef = None
body = []
for item in sorted(aggregate.items, key=lambda x: (lax_int(x['formdef']), lax_int(x['formdata']))):
formdef_id = item.get('formdef')
if formdef_id in cache:
formdef, formdata, workflow = cache[formdef_id]
else:
try:
formdef = FormDef.get(formdef_id)
except KeyError:
# formdef has been deleted after AggregationEmail creation
continue
formdata = formdef.data_class()
workflow = formdef.workflow
cache[formdef_id] = (formdef, formdata, workflow)
with job.log_long_job('aggregation email %s' % aggregate_id) if job else contextlib.ExitStack():
aggregate = AggregationEmail.get(aggregate_id)
aggregate.remove_self()
try:
data = formdata.get(item.get('formdata'))
role = get_publisher().role_class.get(aggregate_id)
except KeyError:
continue
status = data.get_status()
url = item.get('formurl')
if not role.get_emails():
continue
if not aggregate.items:
continue
if last_formdef != formdef:
if last_formdef is not None:
body.append('') # blank line
last_formdef = formdef
body.append(formdef.name)
body.append('-' * len(formdef.name))
body.append('')
last_formdef = None
body = []
for item in sorted(
aggregate.items, key=lambda x: (lax_int(x['formdef']), lax_int(x['formdata']))
):
formdef_id = item.get('formdef')
if formdef_id in cache:
formdef, formdata, workflow = cache[formdef_id]
else:
try:
formdef = FormDef.get(formdef_id)
except KeyError:
# formdef has been deleted after AggregationEmail creation
continue
formdata = formdef.data_class()
workflow = formdef.workflow
cache[formdef_id] = (formdef, formdata, workflow)
body.append('- %sstatus (%s)' % (url, status.name))
try:
data = formdata.get(item.get('formdata'))
except KeyError:
continue
status = data.get_status()
url = item.get('formurl')
if not body:
continue
if last_formdef != formdef:
if last_formdef is not None:
body.append('') # blank line
last_formdef = formdef
body.append(formdef.name)
body.append('-' * len(formdef.name))
body.append('')
body = '\n'.join(body)
body.append('- %sstatus (%s)' % (url, status.name))
mail_subject = _('New arrivals')
if site_name:
mail_subject += ' (%s)' % site_name
if not body:
continue
emails.email(mail_subject, body, email_rcpt=role.get_emails())
body = '\n'.join(body)
mail_subject = _('New arrivals')
if site_name:
mail_subject += ' (%s)' % site_name
emails.email(mail_subject, body, email_rcpt=role.get_emails())
def register_cronjob():

View File

@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import contextlib
import datetime
import itertools
import json
@ -313,51 +314,55 @@ def workflows_with_timeout():
return wfs_status
def _apply_timeouts(publisher):
def _apply_timeouts(publisher, **kwargs):
'''Traverse all filled form and apply expired timeout jumps if needed'''
from ..carddef import CardDef
from ..formdef import FormDef
wfs_status = workflows_with_timeout()
job = kwargs.pop('job', None)
for formdef in itertools.chain(FormDef.select(ignore_errors=True), CardDef.select(ignore_errors=True)):
status_ids = wfs_status.get(str(formdef.workflow_id))
if not status_ids:
continue
formdata_class = formdef.data_class()
for status_id in status_ids:
if publisher.is_using_postgresql():
# get minimum delay for jumps in this status
delay = math.inf
for jump_action in wfs_status[str(formdef.workflow_id)][status_id]:
if Template.is_template_string(jump_action.timeout):
delay = 0
break
delay = min(delay, int(jump_action.timeout))
# limit delay to minimal delay
if delay < JUMP_TIMEOUT_INTERVAL * 60:
delay = JUMP_TIMEOUT_INTERVAL * 60
with job.log_long_job(
'%s %s' % (formdef.xml_root_node, formdef.url_name)
) if job else contextlib.ExitStack():
formdata_class = formdef.data_class()
for status_id in status_ids:
if publisher.is_using_postgresql():
# get minimum delay for jumps in this status
delay = math.inf
for jump_action in wfs_status[str(formdef.workflow_id)][status_id]:
if Template.is_template_string(jump_action.timeout):
delay = 0
break
delay = min(delay, int(jump_action.timeout))
# limit delay to minimal delay
if delay < JUMP_TIMEOUT_INTERVAL * 60:
delay = JUMP_TIMEOUT_INTERVAL * 60
criterias = [
Equal('status', status_id),
LessOrEqual(
'last_update_time',
(datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
),
]
formdatas = formdata_class.select_iterator(criterias, ignore_errors=True)
else:
formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
criterias = [
Equal('status', status_id),
LessOrEqual(
'last_update_time',
(datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
),
]
formdatas = formdata_class.select_iterator(criterias, ignore_errors=True)
else:
formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
for formdata in formdatas:
for jump_action in wfs_status[str(formdef.workflow_id)][formdata.status]:
get_publisher().substitutions.reset()
get_publisher().substitutions.feed(get_publisher())
get_publisher().substitutions.feed(formdef)
get_publisher().substitutions.feed(formdata)
if jump_action.must_jump(formdata):
jump_and_perform(formdata, jump_action, event=('timeout-jump', jump_action.id))
break
for formdata in formdatas:
for jump_action in wfs_status[str(formdef.workflow_id)][formdata.status]:
get_publisher().substitutions.reset()
get_publisher().substitutions.feed(get_publisher())
get_publisher().substitutions.feed(formdef)
get_publisher().substitutions.feed(formdata)
if jump_action.must_jump(formdata):
jump_and_perform(formdata, jump_action, event=('timeout-jump', jump_action.id))
break
def register_cronjob():