general: use uwsgi spooler to run afterjobs (#48407)

This commit is contained in:
Frédéric Péters 2020-12-05 12:06:55 +01:00
parent efe289e64b
commit 8a5ff15656
15 changed files with 197 additions and 63 deletions

1
debian/control vendored
View File

@ -21,6 +21,7 @@ Depends: ${misc:Depends}, ${python3:Depends},
python3-psycopg2,
python3-pyproj,
python3-requests,
python3-uwsgidecorators,
python3-vobject,
python3-xstatic-leaflet,
python3-xstatic-leaflet-gesturehandling,

View File

@ -3,6 +3,7 @@
import os
PROJECT_NAME = 'wcs'
WCS_MANAGE_COMMAND = '/usr/bin/wcs-manage'
#
# hobotization

4
debian/uwsgi.ini vendored
View File

@ -12,6 +12,10 @@ http-socket = /run/wcs/wcs.sock
chmod-socket = 666
vacuum = true
spooler-processes = 3
spooler-python-import = wcs.qommon.spooler
# spooler directory is set using the command line in systemd unit file / init.d startup file.
master = true
enable-threads = true
harakiri = 120

1
debian/wcs.dirs vendored
View File

@ -3,4 +3,5 @@ usr/sbin
usr/lib/wcs
var/lib/wcs
var/lib/wcs/collectstatic
var/lib/wcs/spooler
var/log/wcs

1
debian/wcs.init vendored
View File

@ -44,6 +44,7 @@ GROUP=$NAME
DAEMON_ARGS=${DAEMON_ARGS:-"--pidfile=$PIDFILE
--uid $USER --gid $GROUP
--ini /etc/$NAME/uwsgi.ini
--spooler /var/lib/wcs/spooler/
--daemonize /var/log/uwsgi.$NAME.log"}
# Load the VERBOSE setting and other rcS variables

1
debian/wcs.postinst vendored
View File

@ -35,6 +35,7 @@ case "$1" in
chown $USER:$GROUP /var/log/$NAME
chown $USER:$GROUP /var/lib/$NAME
chown $USER:$GROUP /var/lib/$NAME/collectstatic
chown $USER:$GROUP /var/lib/$NAME/spooler
# create a secret file
SECRET_FILE=$CONFIG_DIR/secret

3
debian/wcs.service vendored
View File

@ -10,7 +10,8 @@ User=%p
Group=%p
ExecStartPre=/usr/bin/wcs-manage migrate
ExecStartPre=/usr/bin/wcs-manage collectstatic
ExecStart=/usr/bin/uwsgi --ini /etc/%p/uwsgi.ini
ExecStartPre=/bin/mkdir -p /var/lib/wcs/spooler/%m/
ExecStart=/usr/bin/uwsgi --ini /etc/%p/uwsgi.ini --spooler /var/lib/wcs/spooler/%m/
ExecReload=/bin/kill -HUP $MAINPID
KillSignal=SIGQUIT
TimeoutStartSec=0

View File

@ -12,6 +12,7 @@ from wcs.workflows import Workflow
from wcs.wf.jump import JumpWorkflowStatusItem
from wcs.fields import StringField, EmailField
import wcs.qommon.ctl
from wcs.qommon.afterjobs import AfterJob
from wcs.qommon.management.commands.collectstatic import Command as CmdCollectStatic
from wcs.qommon.management.commands.migrate import Command as CmdMigrate
from wcs.qommon.management.commands.migrate_schemas import Command as CmdMigrateSchemas
@ -404,3 +405,20 @@ def test_import_site():
def test_shell():
with pytest.raises(CommandError):
call_command('shell') # missing tenant name
class TestAfterJob(AfterJob):
def execute(self):
pass
def test_runjob(pub):
with pytest.raises(CommandError):
call_command('runjob')
with pytest.raises(CommandError):
call_command('runjob', '--domain=example.net', '--job-id=%s' % 'invalid')
job = TestAfterJob(label='test')
job.store()
assert AfterJob.get(job.id).status == 'registered'
call_command('runjob', '--domain=example.net', '--job-id=%s' % job.id)
assert AfterJob.get(job.id).status == 'completed'

View File

@ -272,26 +272,26 @@ def test_cron_command():
def test_clean_afterjobs():
pub = create_temporary_pub()
job = AfterJob(id='a')
job.status = 'completed'
job.creation_time = time.time() - 3 * 3600
job.completion_time = time.time() - 3 * 3600
job.store()
job1 = AfterJob()
job1.status = 'completed'
job1.creation_time = time.time() - 3 * 3600
job1.completion_time = time.time() - 3 * 3600
job1.store()
job = AfterJob(id='b')
job.status = 'completed'
job.creation_time = time.time()
job.completion_time = time.time()
job.store()
job2 = AfterJob()
job2.status = 'completed'
job2.creation_time = time.time()
job2.completion_time = time.time()
job2.store()
job = AfterJob(id='c')
job.status = 'running'
job.creation_time = time.time() - 3 * 86400
job.store()
job3 = AfterJob()
job3.status = 'running'
job3.creation_time = time.time() - 3 * 86400
job3.store()
pub.clean_afterjobs()
assert AfterJob.count() == 1
assert AfterJob.select()[0].id == 'b'
assert AfterJob.select()[0].id == job2.id
def test_clean_tempfiles():

View File

@ -0,0 +1,39 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2020 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import sys
from django.core.management import CommandError
from wcs.qommon.http_response import AfterJob
from . import TenantCommand
class Command(TenantCommand):
'''Run an afterjob (internal command)'''
def add_arguments(self, parser):
parser.add_argument('--domain', action='store', required=True)
parser.add_argument('--job-id', action='store', required=True)
def handle(self, *args, **options):
domain = options.pop('domain')
self.init_tenant_publisher(domain)
try:
job = AfterJob.get(options['job_id'])
except KeyError:
raise CommandError('missing job')
job.run()

View File

@ -14,12 +14,16 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from quixote.directory import Directory
from quixote import get_response
import sys
import time
import traceback
import uuid
from . import errors
from .http_response import AfterJob
from . import _
from quixote import get_publisher, get_response
from quixote.directory import Directory
from . import _, N_, errors
from .storage import StorableObject
class AfterJobStatusDirectory(Directory):
@ -34,3 +38,62 @@ class AfterJobStatusDirectory(Directory):
if not job.completion_status:
return job.status + '|' + _(job.status)
return job.status + '|' + _(job.status) + ' ' + job.completion_status
class AfterJob(StorableObject):
_names = 'afterjobs'
_reset_class = False
label = None
status = None
creation_time = None
completion_time = None
completion_status = None
execute = None
def __init__(self, label=None, cmd=None, **kwargs):
super().__init__(id=str(uuid.uuid4()))
if label:
self.label = label
self.creation_time = time.time()
self.job_cmd = cmd
self.status = N_('registered')
self.kwargs = kwargs
def run(self, spool=False):
if self.completion_time:
return
if spool and self.id and self.execute:
from django.conf import settings
if 'uwsgi' in sys.modules and settings.WCS_MANAGE_COMMAND:
from .spooler import run_after_job
self.store()
run_after_job.spool(tenant_dir=get_publisher().app_dir, job_id=self.id)
return
self.status = N_('running')
if self.id:
self.store()
try:
if self.execute:
self.execute()
else:
self.job_cmd(job=self)
except Exception:
get_publisher().notify_of_exception(sys.exc_info())
self.exception = traceback.format_exc()
self.status = N_('failed')
else:
self.status = N_('completed')
self.completion_time = time.time()
if self.id:
self.store()
def __getstate__(self):
if not isinstance(self.job_cmd, str):
obj_dict = self.__dict__.copy()
obj_dict['job_cmd'] = None
return obj_dict
return self.__dict__

View File

@ -21,22 +21,11 @@ import sys
from django.utils.encoding import force_bytes
from quixote import get_publisher
from quixote.util import randbytes
import quixote.http_response
from quixote import get_publisher, get_request
from . import N_
from .storage import StorableObject
class AfterJob(StorableObject):
_names = 'afterjobs'
label = None
status = None
creation_time = None
completion_time = None
completion_status = None
from .afterjobs import AfterJob
class HTTPResponse(quixote.http_response.HTTPResponse):
@ -150,39 +139,18 @@ class HTTPResponse(quixote.http_response.HTTPResponse):
return '\n'.join(['<link rel="stylesheet" type="text/css" href="%scss/%s?%s" />' % (
root_url, x, version_hash) for x in self.css_includes])
def add_after_job(self, label, cmd, fire_and_forget = False):
def add_after_job(self, label_or_instance, cmd=None, fire_and_forget=False):
if not self.after_jobs:
self.after_jobs = []
job_id = randbytes(8)
job = AfterJob(id = job_id)
job.label = label
job.creation_time = time.time()
job.status = N_('registered')
if isinstance(label_or_instance, AfterJob):
job = label_or_instance
else:
job = AfterJob(label=label_or_instance, cmd=cmd)
if fire_and_forget:
job.id = None
else:
job.store()
self.after_jobs.append((job, cmd))
self.after_jobs.append(job)
return job
def process_after_jobs(self):
if not self.after_jobs:
return
for job, job_function in self.after_jobs:
if job.completion_time:
continue
job.status = N_('running')
if job.id:
job.store()
try:
job_function(job=job)
except:
get_publisher().notify_of_exception(sys.exc_info())
job.exception = traceback.format_exc()
job.status = N_('failed')
else:
job.status = N_('completed')
job.completion_time = time.time()
if job.id:
job.store()
for job in self.after_jobs or []:
job.run(spool=True)

30
wcs/qommon/spooler.py Normal file
View File

@ -0,0 +1,30 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2020 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import subprocess
from uwsgidecorators import spool
@spool
def run_after_job(args):
from django.conf import settings
subprocess.run([
settings.WCS_MANAGE_COMMAND,
'runjob',
'--domain', args['tenant_dir'].strip('/').split('/')[-1],
'--job-id', args['job_id']
])

View File

@ -291,6 +291,7 @@ class StorableObject(object):
_indexes = None
_hashed_indexes = None
_filename = None # None, unless must be saved to a specific location
_reset_class = True # reset loaded object class
def __init__(self, id = None):
self.id = id
@ -506,7 +507,8 @@ class StorableObject(object):
finally:
if fd:
fd.close()
o.__class__ = cls
if cls._reset_class:
o.__class__ = cls
if any((isinstance(k, bytes) for k in o.__dict__)):
pickle_2to3_conversion(o)
if not ignore_migration:

View File

@ -173,6 +173,10 @@ CKEDITOR_CONFIGS = {
WCS_LEGACY_CONFIG_FILE = None
# management command, used to run afterjobs in uwsgi mode,
# usually /usr/bin/wcs-manage.
WCS_MANAGE_COMMAND = None
# proxies=REQUESTS_PROXIES is used in python-requests call
# http://docs.python-requests.org/en/master/user/advanced/?highlight=proxy#proxies
REQUESTS_PROXIES = None