misc: more logs for some cron jobs (#57604)
gitea-wip/wcs/pipeline/head There was a failure building this commit
Details
gitea-wip/wcs/pipeline/head There was a failure building this commit
Details
This commit is contained in:
parent
fd12e0cb51
commit
b7ca09ad9d
|
@ -15,6 +15,7 @@
|
|||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import base64
|
||||
import contextlib
|
||||
import copy
|
||||
import datetime
|
||||
import glob
|
||||
|
@ -1767,19 +1768,23 @@ Status of the form just changed (from "{{ form_previous_status }}" to "{{ form_s
|
|||
Substitutions.register('form_name', category=_('Form'), comment=_('Form Name'))
|
||||
|
||||
|
||||
def clean_drafts(publisher):
|
||||
def clean_drafts(publisher, **kwargs):
|
||||
import wcs.qommon.storage as st
|
||||
from wcs.carddef import CardDef
|
||||
|
||||
job = kwargs.pop('job', None)
|
||||
for formdef in FormDef.select() + CardDef.select():
|
||||
removal_date = datetime.date.today() - datetime.timedelta(days=formdef.get_drafts_lifespan())
|
||||
for formdata in formdef.data_class().select(
|
||||
[st.Equal('status', 'draft'), st.Less('receipt_time', removal_date.timetuple())]
|
||||
):
|
||||
formdata.remove_self()
|
||||
with job.log_long_job(
|
||||
'%s %s' % (formdef.xml_root_node, formdef.url_name)
|
||||
) if job else contextlib.ExitStack():
|
||||
removal_date = datetime.date.today() - datetime.timedelta(days=formdef.get_drafts_lifespan())
|
||||
for formdata in formdef.data_class().select(
|
||||
[st.Equal('status', 'draft'), st.Less('receipt_time', removal_date.timetuple())]
|
||||
):
|
||||
formdata.remove_self()
|
||||
|
||||
|
||||
def clean_unused_files(publisher):
|
||||
def clean_unused_files(publisher, **kwargs):
|
||||
unused_files_behaviour = publisher.get_site_option('unused-files-behaviour')
|
||||
if unused_files_behaviour not in ('move', 'remove'):
|
||||
return
|
||||
|
|
|
@ -22,7 +22,7 @@ import re
|
|||
import sys
|
||||
import traceback
|
||||
import zipfile
|
||||
from contextlib import contextmanager
|
||||
from contextlib import ExitStack, contextmanager
|
||||
|
||||
from django.utils.encoding import force_text
|
||||
|
||||
|
@ -427,11 +427,13 @@ class WcsPublisher(QommonPublisher):
|
|||
# Could also could happen on file descriptor exhaustion.
|
||||
pass
|
||||
|
||||
def apply_global_action_timeouts(self):
|
||||
def apply_global_action_timeouts(self, **kwargs):
|
||||
from wcs.workflows import Workflow, WorkflowGlobalActionTimeoutTrigger
|
||||
|
||||
job = kwargs.pop('job', None)
|
||||
for workflow in Workflow.select():
|
||||
WorkflowGlobalActionTimeoutTrigger.apply(workflow)
|
||||
with job.log_long_job('workflow %s' % workflow.url_name) if job else ExitStack():
|
||||
WorkflowGlobalActionTimeoutTrigger.apply(workflow)
|
||||
|
||||
def migrate_sql(self):
|
||||
from . import sql
|
||||
|
|
|
@ -1373,7 +1373,7 @@ TextsDirectory.register(
|
|||
)
|
||||
|
||||
|
||||
def handle_unused_accounts(publisher):
|
||||
def handle_unused_accounts(publisher, **kwargs):
|
||||
if 'password' not in get_cfg('identification', {}).get('methods', []):
|
||||
return
|
||||
identities_cfg = get_cfg('identities', {})
|
||||
|
@ -1425,7 +1425,7 @@ def handle_unused_accounts(publisher):
|
|||
# XXX: notify admin too
|
||||
|
||||
|
||||
def handle_expired_tokens(publisher):
|
||||
def handle_expired_tokens(publisher, **kwargs):
|
||||
if 'password' not in get_cfg('identification', {}).get('methods', []):
|
||||
return
|
||||
now = time.time()
|
||||
|
|
|
@ -569,7 +569,7 @@ class QommonPublisher(Publisher):
|
|||
return
|
||||
cls.cronjobs.append(cronjob)
|
||||
|
||||
def clean_nonces(self, delta=60, now=None):
|
||||
def clean_nonces(self, delta=60, now=None, **kwargs):
|
||||
nonce_dir = os.path.join(get_publisher().app_dir, 'nonces')
|
||||
if not os.path.exists(nonce_dir):
|
||||
return
|
||||
|
@ -585,7 +585,7 @@ class QommonPublisher(Publisher):
|
|||
except locket.LockError:
|
||||
pass
|
||||
|
||||
def clean_sessions(self):
|
||||
def clean_sessions(self, **kwargs):
|
||||
cleaning_lock_file = os.path.join(self.app_dir, 'cleaning_sessions.lock')
|
||||
try:
|
||||
with locket.lock_file(cleaning_lock_file, timeout=0):
|
||||
|
@ -619,7 +619,7 @@ class QommonPublisher(Publisher):
|
|||
except locket.LockError:
|
||||
pass
|
||||
|
||||
def clean_afterjobs(self):
|
||||
def clean_afterjobs(self, **kwargs):
|
||||
now = time.time()
|
||||
for job_id in AfterJob.keys():
|
||||
job = AfterJob.get(job_id)
|
||||
|
@ -640,17 +640,17 @@ class QommonPublisher(Publisher):
|
|||
except OSError:
|
||||
pass
|
||||
|
||||
def clean_tempfiles(self):
|
||||
def clean_tempfiles(self, **kwargs):
|
||||
now = time.time()
|
||||
one_month_ago = now - 30 * 86400
|
||||
self._clean_files(one_month_ago, os.path.join(self.app_dir, 'tempfiles'))
|
||||
|
||||
def clean_thumbnails(self):
|
||||
def clean_thumbnails(self, **kwargs):
|
||||
now = time.time()
|
||||
one_month_ago = now - 30 * 86400
|
||||
self._clean_files(one_month_ago, os.path.join(self.app_dir, 'thumbs'))
|
||||
|
||||
def clean_loggederrors(self):
|
||||
def clean_loggederrors(self, **kwargs):
|
||||
if not self.loggederror_class:
|
||||
return
|
||||
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import contextlib
|
||||
|
||||
from quixote import get_publisher
|
||||
|
||||
from wcs.workflows import WorkflowStatusItem, register_item_class
|
||||
|
@ -98,69 +100,73 @@ def lax_int(s):
|
|||
return 10000
|
||||
|
||||
|
||||
def send_aggregation_emails(publisher):
|
||||
def send_aggregation_emails(publisher, **kwargs):
|
||||
from wcs.formdef import FormDef
|
||||
|
||||
publisher.reload_cfg()
|
||||
site_name = publisher.cfg.get('misc', {}).get('sitename', None)
|
||||
job = kwargs.pop('job', None)
|
||||
|
||||
cache = {}
|
||||
for aggregate_id in AggregationEmail.keys():
|
||||
aggregate = AggregationEmail.get(aggregate_id)
|
||||
aggregate.remove_self()
|
||||
|
||||
try:
|
||||
role = get_publisher().role_class.get(aggregate_id)
|
||||
except KeyError:
|
||||
continue
|
||||
if not role.get_emails():
|
||||
continue
|
||||
if not aggregate.items:
|
||||
continue
|
||||
|
||||
last_formdef = None
|
||||
body = []
|
||||
for item in sorted(aggregate.items, key=lambda x: (lax_int(x['formdef']), lax_int(x['formdata']))):
|
||||
formdef_id = item.get('formdef')
|
||||
if formdef_id in cache:
|
||||
formdef, formdata, workflow = cache[formdef_id]
|
||||
else:
|
||||
try:
|
||||
formdef = FormDef.get(formdef_id)
|
||||
except KeyError:
|
||||
# formdef has been deleted after AggregationEmail creation
|
||||
continue
|
||||
formdata = formdef.data_class()
|
||||
workflow = formdef.workflow
|
||||
cache[formdef_id] = (formdef, formdata, workflow)
|
||||
with job.log_long_job('aggregation email %s' % aggregate_id) if job else contextlib.ExitStack():
|
||||
aggregate = AggregationEmail.get(aggregate_id)
|
||||
aggregate.remove_self()
|
||||
|
||||
try:
|
||||
data = formdata.get(item.get('formdata'))
|
||||
role = get_publisher().role_class.get(aggregate_id)
|
||||
except KeyError:
|
||||
continue
|
||||
status = data.get_status()
|
||||
url = item.get('formurl')
|
||||
if not role.get_emails():
|
||||
continue
|
||||
if not aggregate.items:
|
||||
continue
|
||||
|
||||
if last_formdef != formdef:
|
||||
if last_formdef is not None:
|
||||
body.append('') # blank line
|
||||
last_formdef = formdef
|
||||
body.append(formdef.name)
|
||||
body.append('-' * len(formdef.name))
|
||||
body.append('')
|
||||
last_formdef = None
|
||||
body = []
|
||||
for item in sorted(
|
||||
aggregate.items, key=lambda x: (lax_int(x['formdef']), lax_int(x['formdata']))
|
||||
):
|
||||
formdef_id = item.get('formdef')
|
||||
if formdef_id in cache:
|
||||
formdef, formdata, workflow = cache[formdef_id]
|
||||
else:
|
||||
try:
|
||||
formdef = FormDef.get(formdef_id)
|
||||
except KeyError:
|
||||
# formdef has been deleted after AggregationEmail creation
|
||||
continue
|
||||
formdata = formdef.data_class()
|
||||
workflow = formdef.workflow
|
||||
cache[formdef_id] = (formdef, formdata, workflow)
|
||||
|
||||
body.append('- %sstatus (%s)' % (url, status.name))
|
||||
try:
|
||||
data = formdata.get(item.get('formdata'))
|
||||
except KeyError:
|
||||
continue
|
||||
status = data.get_status()
|
||||
url = item.get('formurl')
|
||||
|
||||
if not body:
|
||||
continue
|
||||
if last_formdef != formdef:
|
||||
if last_formdef is not None:
|
||||
body.append('') # blank line
|
||||
last_formdef = formdef
|
||||
body.append(formdef.name)
|
||||
body.append('-' * len(formdef.name))
|
||||
body.append('')
|
||||
|
||||
body = '\n'.join(body)
|
||||
body.append('- %sstatus (%s)' % (url, status.name))
|
||||
|
||||
mail_subject = _('New arrivals')
|
||||
if site_name:
|
||||
mail_subject += ' (%s)' % site_name
|
||||
if not body:
|
||||
continue
|
||||
|
||||
emails.email(mail_subject, body, email_rcpt=role.get_emails())
|
||||
body = '\n'.join(body)
|
||||
|
||||
mail_subject = _('New arrivals')
|
||||
if site_name:
|
||||
mail_subject += ' (%s)' % site_name
|
||||
|
||||
emails.email(mail_subject, body, email_rcpt=role.get_emails())
|
||||
|
||||
|
||||
def register_cronjob():
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import contextlib
|
||||
import datetime
|
||||
import itertools
|
||||
import json
|
||||
|
@ -313,51 +314,55 @@ def workflows_with_timeout():
|
|||
return wfs_status
|
||||
|
||||
|
||||
def _apply_timeouts(publisher):
|
||||
def _apply_timeouts(publisher, **kwargs):
|
||||
'''Traverse all filled form and apply expired timeout jumps if needed'''
|
||||
from ..carddef import CardDef
|
||||
from ..formdef import FormDef
|
||||
|
||||
wfs_status = workflows_with_timeout()
|
||||
job = kwargs.pop('job', None)
|
||||
|
||||
for formdef in itertools.chain(FormDef.select(ignore_errors=True), CardDef.select(ignore_errors=True)):
|
||||
status_ids = wfs_status.get(str(formdef.workflow_id))
|
||||
if not status_ids:
|
||||
continue
|
||||
formdata_class = formdef.data_class()
|
||||
for status_id in status_ids:
|
||||
if publisher.is_using_postgresql():
|
||||
# get minimum delay for jumps in this status
|
||||
delay = math.inf
|
||||
for jump_action in wfs_status[str(formdef.workflow_id)][status_id]:
|
||||
if Template.is_template_string(jump_action.timeout):
|
||||
delay = 0
|
||||
break
|
||||
delay = min(delay, int(jump_action.timeout))
|
||||
# limit delay to minimal delay
|
||||
if delay < JUMP_TIMEOUT_INTERVAL * 60:
|
||||
delay = JUMP_TIMEOUT_INTERVAL * 60
|
||||
with job.log_long_job(
|
||||
'%s %s' % (formdef.xml_root_node, formdef.url_name)
|
||||
) if job else contextlib.ExitStack():
|
||||
formdata_class = formdef.data_class()
|
||||
for status_id in status_ids:
|
||||
if publisher.is_using_postgresql():
|
||||
# get minimum delay for jumps in this status
|
||||
delay = math.inf
|
||||
for jump_action in wfs_status[str(formdef.workflow_id)][status_id]:
|
||||
if Template.is_template_string(jump_action.timeout):
|
||||
delay = 0
|
||||
break
|
||||
delay = min(delay, int(jump_action.timeout))
|
||||
# limit delay to minimal delay
|
||||
if delay < JUMP_TIMEOUT_INTERVAL * 60:
|
||||
delay = JUMP_TIMEOUT_INTERVAL * 60
|
||||
|
||||
criterias = [
|
||||
Equal('status', status_id),
|
||||
LessOrEqual(
|
||||
'last_update_time',
|
||||
(datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
|
||||
),
|
||||
]
|
||||
formdatas = formdata_class.select_iterator(criterias, ignore_errors=True)
|
||||
else:
|
||||
formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
|
||||
criterias = [
|
||||
Equal('status', status_id),
|
||||
LessOrEqual(
|
||||
'last_update_time',
|
||||
(datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
|
||||
),
|
||||
]
|
||||
formdatas = formdata_class.select_iterator(criterias, ignore_errors=True)
|
||||
else:
|
||||
formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
|
||||
|
||||
for formdata in formdatas:
|
||||
for jump_action in wfs_status[str(formdef.workflow_id)][formdata.status]:
|
||||
get_publisher().substitutions.reset()
|
||||
get_publisher().substitutions.feed(get_publisher())
|
||||
get_publisher().substitutions.feed(formdef)
|
||||
get_publisher().substitutions.feed(formdata)
|
||||
if jump_action.must_jump(formdata):
|
||||
jump_and_perform(formdata, jump_action, event=('timeout-jump', jump_action.id))
|
||||
break
|
||||
for formdata in formdatas:
|
||||
for jump_action in wfs_status[str(formdef.workflow_id)][formdata.status]:
|
||||
get_publisher().substitutions.reset()
|
||||
get_publisher().substitutions.feed(get_publisher())
|
||||
get_publisher().substitutions.feed(formdef)
|
||||
get_publisher().substitutions.feed(formdata)
|
||||
if jump_action.must_jump(formdata):
|
||||
jump_and_perform(formdata, jump_action, event=('timeout-jump', jump_action.id))
|
||||
break
|
||||
|
||||
|
||||
def register_cronjob():
|
||||
|
|
Loading…
Reference in New Issue