diff --git a/wcs/formdef.py b/wcs/formdef.py index 26170b1c9..e2a0ae9a0 100644 --- a/wcs/formdef.py +++ b/wcs/formdef.py @@ -15,6 +15,7 @@ # along with this program; if not, see . import base64 +import contextlib import copy import datetime import glob @@ -1767,19 +1768,23 @@ Status of the form just changed (from "{{ form_previous_status }}" to "{{ form_s Substitutions.register('form_name', category=_('Form'), comment=_('Form Name')) -def clean_drafts(publisher): +def clean_drafts(publisher, **kwargs): import wcs.qommon.storage as st from wcs.carddef import CardDef + job = kwargs.pop('job', None) for formdef in FormDef.select() + CardDef.select(): - removal_date = datetime.date.today() - datetime.timedelta(days=formdef.get_drafts_lifespan()) - for formdata in formdef.data_class().select( - [st.Equal('status', 'draft'), st.Less('receipt_time', removal_date.timetuple())] - ): - formdata.remove_self() + with job.log_long_job( + '%s %s' % (formdef.xml_root_node, formdef.url_name) + ) if job else contextlib.ExitStack(): + removal_date = datetime.date.today() - datetime.timedelta(days=formdef.get_drafts_lifespan()) + for formdata in formdef.data_class().select( + [st.Equal('status', 'draft'), st.Less('receipt_time', removal_date.timetuple())] + ): + formdata.remove_self() -def clean_unused_files(publisher): +def clean_unused_files(publisher, **kwargs): unused_files_behaviour = publisher.get_site_option('unused-files-behaviour') if unused_files_behaviour not in ('move', 'remove'): return diff --git a/wcs/publisher.py b/wcs/publisher.py index 283d5046d..d0bfdcf38 100644 --- a/wcs/publisher.py +++ b/wcs/publisher.py @@ -22,7 +22,7 @@ import re import sys import traceback import zipfile -from contextlib import contextmanager +from contextlib import ExitStack, contextmanager from django.utils.encoding import force_text @@ -427,11 +427,13 @@ class WcsPublisher(QommonPublisher): # Could also could happen on file descriptor exhaustion. pass - def apply_global_action_timeouts(self): + def apply_global_action_timeouts(self, **kwargs): from wcs.workflows import Workflow, WorkflowGlobalActionTimeoutTrigger + job = kwargs.pop('job', None) for workflow in Workflow.select(): - WorkflowGlobalActionTimeoutTrigger.apply(workflow) + with job.log_long_job('workflow %s' % workflow.url_name) if job else ExitStack(): + WorkflowGlobalActionTimeoutTrigger.apply(workflow) def migrate_sql(self): from . import sql diff --git a/wcs/qommon/ident/password.py b/wcs/qommon/ident/password.py index a640e0aa2..9e1d79ed3 100644 --- a/wcs/qommon/ident/password.py +++ b/wcs/qommon/ident/password.py @@ -1373,7 +1373,7 @@ TextsDirectory.register( ) -def handle_unused_accounts(publisher): +def handle_unused_accounts(publisher, **kwargs): if 'password' not in get_cfg('identification', {}).get('methods', []): return identities_cfg = get_cfg('identities', {}) @@ -1425,7 +1425,7 @@ def handle_unused_accounts(publisher): # XXX: notify admin too -def handle_expired_tokens(publisher): +def handle_expired_tokens(publisher, **kwargs): if 'password' not in get_cfg('identification', {}).get('methods', []): return now = time.time() diff --git a/wcs/qommon/publisher.py b/wcs/qommon/publisher.py index 28d7d3941..a1baaadf2 100644 --- a/wcs/qommon/publisher.py +++ b/wcs/qommon/publisher.py @@ -569,7 +569,7 @@ class QommonPublisher(Publisher): return cls.cronjobs.append(cronjob) - def clean_nonces(self, delta=60, now=None): + def clean_nonces(self, delta=60, now=None, **kwargs): nonce_dir = os.path.join(get_publisher().app_dir, 'nonces') if not os.path.exists(nonce_dir): return @@ -585,7 +585,7 @@ class QommonPublisher(Publisher): except locket.LockError: pass - def clean_sessions(self): + def clean_sessions(self, **kwargs): cleaning_lock_file = os.path.join(self.app_dir, 'cleaning_sessions.lock') try: with locket.lock_file(cleaning_lock_file, timeout=0): @@ -619,7 +619,7 @@ class QommonPublisher(Publisher): except locket.LockError: pass - def clean_afterjobs(self): + def clean_afterjobs(self, **kwargs): now = time.time() for job_id in AfterJob.keys(): job = AfterJob.get(job_id) @@ -640,17 +640,17 @@ class QommonPublisher(Publisher): except OSError: pass - def clean_tempfiles(self): + def clean_tempfiles(self, **kwargs): now = time.time() one_month_ago = now - 30 * 86400 self._clean_files(one_month_ago, os.path.join(self.app_dir, 'tempfiles')) - def clean_thumbnails(self): + def clean_thumbnails(self, **kwargs): now = time.time() one_month_ago = now - 30 * 86400 self._clean_files(one_month_ago, os.path.join(self.app_dir, 'thumbs')) - def clean_loggederrors(self): + def clean_loggederrors(self, **kwargs): if not self.loggederror_class: return diff --git a/wcs/wf/aggregation_email.py b/wcs/wf/aggregation_email.py index bdf5a7bdf..436f1b4c1 100644 --- a/wcs/wf/aggregation_email.py +++ b/wcs/wf/aggregation_email.py @@ -14,6 +14,8 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, see . +import contextlib + from quixote import get_publisher from wcs.workflows import WorkflowStatusItem, register_item_class @@ -98,69 +100,73 @@ def lax_int(s): return 10000 -def send_aggregation_emails(publisher): +def send_aggregation_emails(publisher, **kwargs): from wcs.formdef import FormDef publisher.reload_cfg() site_name = publisher.cfg.get('misc', {}).get('sitename', None) + job = kwargs.pop('job', None) cache = {} for aggregate_id in AggregationEmail.keys(): - aggregate = AggregationEmail.get(aggregate_id) - aggregate.remove_self() - - try: - role = get_publisher().role_class.get(aggregate_id) - except KeyError: - continue - if not role.get_emails(): - continue - if not aggregate.items: - continue - - last_formdef = None - body = [] - for item in sorted(aggregate.items, key=lambda x: (lax_int(x['formdef']), lax_int(x['formdata']))): - formdef_id = item.get('formdef') - if formdef_id in cache: - formdef, formdata, workflow = cache[formdef_id] - else: - try: - formdef = FormDef.get(formdef_id) - except KeyError: - # formdef has been deleted after AggregationEmail creation - continue - formdata = formdef.data_class() - workflow = formdef.workflow - cache[formdef_id] = (formdef, formdata, workflow) + with job.log_long_job('aggregation email %s' % aggregate_id) if job else contextlib.ExitStack(): + aggregate = AggregationEmail.get(aggregate_id) + aggregate.remove_self() try: - data = formdata.get(item.get('formdata')) + role = get_publisher().role_class.get(aggregate_id) except KeyError: continue - status = data.get_status() - url = item.get('formurl') + if not role.get_emails(): + continue + if not aggregate.items: + continue - if last_formdef != formdef: - if last_formdef is not None: - body.append('') # blank line - last_formdef = formdef - body.append(formdef.name) - body.append('-' * len(formdef.name)) - body.append('') + last_formdef = None + body = [] + for item in sorted( + aggregate.items, key=lambda x: (lax_int(x['formdef']), lax_int(x['formdata'])) + ): + formdef_id = item.get('formdef') + if formdef_id in cache: + formdef, formdata, workflow = cache[formdef_id] + else: + try: + formdef = FormDef.get(formdef_id) + except KeyError: + # formdef has been deleted after AggregationEmail creation + continue + formdata = formdef.data_class() + workflow = formdef.workflow + cache[formdef_id] = (formdef, formdata, workflow) - body.append('- %sstatus (%s)' % (url, status.name)) + try: + data = formdata.get(item.get('formdata')) + except KeyError: + continue + status = data.get_status() + url = item.get('formurl') - if not body: - continue + if last_formdef != formdef: + if last_formdef is not None: + body.append('') # blank line + last_formdef = formdef + body.append(formdef.name) + body.append('-' * len(formdef.name)) + body.append('') - body = '\n'.join(body) + body.append('- %sstatus (%s)' % (url, status.name)) - mail_subject = _('New arrivals') - if site_name: - mail_subject += ' (%s)' % site_name + if not body: + continue - emails.email(mail_subject, body, email_rcpt=role.get_emails()) + body = '\n'.join(body) + + mail_subject = _('New arrivals') + if site_name: + mail_subject += ' (%s)' % site_name + + emails.email(mail_subject, body, email_rcpt=role.get_emails()) def register_cronjob(): diff --git a/wcs/wf/jump.py b/wcs/wf/jump.py index dfb5caed6..90bb634da 100644 --- a/wcs/wf/jump.py +++ b/wcs/wf/jump.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, see . +import contextlib import datetime import itertools import json @@ -313,51 +314,55 @@ def workflows_with_timeout(): return wfs_status -def _apply_timeouts(publisher): +def _apply_timeouts(publisher, **kwargs): '''Traverse all filled form and apply expired timeout jumps if needed''' from ..carddef import CardDef from ..formdef import FormDef wfs_status = workflows_with_timeout() + job = kwargs.pop('job', None) for formdef in itertools.chain(FormDef.select(ignore_errors=True), CardDef.select(ignore_errors=True)): status_ids = wfs_status.get(str(formdef.workflow_id)) if not status_ids: continue - formdata_class = formdef.data_class() - for status_id in status_ids: - if publisher.is_using_postgresql(): - # get minimum delay for jumps in this status - delay = math.inf - for jump_action in wfs_status[str(formdef.workflow_id)][status_id]: - if Template.is_template_string(jump_action.timeout): - delay = 0 - break - delay = min(delay, int(jump_action.timeout)) - # limit delay to minimal delay - if delay < JUMP_TIMEOUT_INTERVAL * 60: - delay = JUMP_TIMEOUT_INTERVAL * 60 + with job.log_long_job( + '%s %s' % (formdef.xml_root_node, formdef.url_name) + ) if job else contextlib.ExitStack(): + formdata_class = formdef.data_class() + for status_id in status_ids: + if publisher.is_using_postgresql(): + # get minimum delay for jumps in this status + delay = math.inf + for jump_action in wfs_status[str(formdef.workflow_id)][status_id]: + if Template.is_template_string(jump_action.timeout): + delay = 0 + break + delay = min(delay, int(jump_action.timeout)) + # limit delay to minimal delay + if delay < JUMP_TIMEOUT_INTERVAL * 60: + delay = JUMP_TIMEOUT_INTERVAL * 60 - criterias = [ - Equal('status', status_id), - LessOrEqual( - 'last_update_time', - (datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(), - ), - ] - formdatas = formdata_class.select_iterator(criterias, ignore_errors=True) - else: - formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True) + criterias = [ + Equal('status', status_id), + LessOrEqual( + 'last_update_time', + (datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(), + ), + ] + formdatas = formdata_class.select_iterator(criterias, ignore_errors=True) + else: + formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True) - for formdata in formdatas: - for jump_action in wfs_status[str(formdef.workflow_id)][formdata.status]: - get_publisher().substitutions.reset() - get_publisher().substitutions.feed(get_publisher()) - get_publisher().substitutions.feed(formdef) - get_publisher().substitutions.feed(formdata) - if jump_action.must_jump(formdata): - jump_and_perform(formdata, jump_action, event=('timeout-jump', jump_action.id)) - break + for formdata in formdatas: + for jump_action in wfs_status[str(formdef.workflow_id)][formdata.status]: + get_publisher().substitutions.reset() + get_publisher().substitutions.feed(get_publisher()) + get_publisher().substitutions.feed(formdef) + get_publisher().substitutions.feed(formdata) + if jump_action.must_jump(formdata): + jump_and_perform(formdata, jump_action, event=('timeout-jump', jump_action.id)) + break def register_cronjob():