jobs: use uwsgi spooler to run jobs (#50017)

This commit is contained in:
Lauréline Guérin 2021-01-14 15:00:15 +01:00
parent e3d8d26281
commit d53ce49abb
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
16 changed files with 166 additions and 26 deletions

1
debian/control vendored
View File

@ -19,6 +19,7 @@ Depends: ${python3:Depends},
python3-gadjo,
python3-django-model-utils,
python3-requests,
python3-uwsgidecorators,
python3-setuptools,
python3-suds,
python3-cmislib,

View File

@ -6,6 +6,7 @@ import os
DEBUG = False
PROJECT_NAME = 'passerelle'
PASSERELLE_MANAGE_COMMAND = '/usr/bin/passerelle-manage'
#
# hobotization (multitenant)

View File

@ -1,8 +0,0 @@
MAILTO=root
*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants availability
*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants jobs
17 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants hourly
25 1 * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants daily
47 2 * * 7 passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants weekly
52 3 1 * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants monthly

View File

@ -1,3 +0,0 @@
#! /bin/sh
/sbin/runuser -u passerelle /usr/bin/passerelle-manage -- tenant_command clearsessions --all-tenants

View File

@ -1,5 +1,6 @@
/etc/passerelle
/usr/lib/passerelle
/var/lib/passerelle/collectstatic
/var/lib/passerelle/spooler
/var/lib/passerelle/tenants
/var/log/passerelle

View File

@ -38,6 +38,7 @@ GROUP=$NAME
DAEMON_ARGS=${DAEMON_ARGS:-"--pidfile=$PIDFILE
--uid $USER --gid $GROUP
--ini /etc/$NAME/uwsgi.ini
--spooler /var/lib/$NAME/spooler/
--daemonize /var/log/uwsgi.$NAME.log"}
# Load the VERBOSE setting and other rcS variables

View File

@ -20,6 +20,7 @@ case "$1" in
# ensure dirs ownership
chown $USER:$GROUP /var/log/$NAME
chown $USER:$GROUP /var/lib/$NAME/collectstatic
chown $USER:$GROUP /var/lib/$NAME/spooler
chown $USER:$GROUP /var/lib/$NAME/tenants
# create a secret file
SECRET_FILE=$CONFIG_DIR/secret

View File

@ -12,7 +12,8 @@ User=%p
Group=%p
ExecStartPre=/usr/bin/passerelle-manage migrate_schemas --noinput --verbosity 1
ExecStartPre=/usr/bin/passerelle-manage collectstatic --noinput
ExecStart=/usr/bin/uwsgi --ini /etc/%p/uwsgi.ini
ExecStartPre=/bin/mkdir -p /var/lib/passerelle/spooler/%m/
ExecStart=/usr/bin/uwsgi --ini /etc/%p/uwsgi.ini --spooler /var/lib/passerelle/spooler/%m/
ExecReload=/bin/kill -HUP $MAINPID
KillSignal=SIGQUIT
TimeoutStartSec=0

15
debian/uwsgi.ini vendored
View File

@ -12,6 +12,21 @@ http-socket = /run/passerelle/passerelle.sock
chmod-socket = 666
vacuum = true
spooler-processes = 3
spooler-python-import = passerelle.utils.spooler
# every five minutes
cron = -5 -1 -1 -1 -1 /usr/bin/passerelle-manage tenant_command cron --all-tenants availability
cron = -5 -1 -1 -1 -1 /usr/bin/passerelle-manage tenant_command cron --all-tenants jobs
# hourly
cron = 1 -1 -1 -1 -1 /usr/bin/passerelle-manage tenant_command clearsessions --all-tenants
cron = 17 -1 -1 -1 -1 /usr/bin/passerelle-manage tenant_command cron --all-tenants hourly
# daily
cron = 25 1 -1 -1 -1 /usr/bin/passerelle-manage tenant_command cron --all-tenants daily
# weekly
cron = 47 2 -1 -1 7 /usr/bin/passerelle-manage tenant_command cron --all-tenants weekly
# monthly
cron = 52 3 1 -1 -1 /usr/bin/passerelle-manage tenant_command cron --all-tenants monthly
master = true
enable-threads = true
harakiri = 120

View File

@ -0,0 +1,41 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.core.management.base import BaseCommand, CommandError
from django.db import connection, transaction
from passerelle.base.models import Job
class Command(BaseCommand):
'''Run a job (internal command)'''
def add_arguments(self, parser):
parser.add_argument('--job-id', action='store', required=True)
def handle(self, *args, **options):
skip_locked = {'skip_locked': True}
if not connection.features.has_select_for_update_skip_locked:
skip_locked = {}
with transaction.atomic():
try:
job = Job.objects.select_for_update(**skip_locked).get(pk=options['job_id'])
except Job.DoesNotExist:
raise CommandError('missing job')
job.status = 'running'
job.save()
# release lock
job.run()

View File

@ -574,7 +574,6 @@ class BaseResource(models.Model):
skipped_jobs = []
while True:
with transaction.atomic():
# lock an immediately runnable job
job = self.jobs_set().exclude(
pk__in=skipped_jobs
).filter(
@ -587,18 +586,9 @@ class BaseResource(models.Model):
job.status = 'running'
job.save()
# release lock
try:
getattr(self, job.method_name)(**job.parameters)
except SkipJob as e:
job.status = 'registered'
job.set_after_timestamp(e.after_timestamp)
result = job.run()
if result == 'skipped':
skipped_jobs.append(job.id)
except Exception as e:
self.handle_job_error(job, sys.exc_info())
else:
job.status = 'completed'
job.done_timestamp = timezone.now()
job.save()
def add_job(self, method_name, natural_id=None, after_timestamp=None, **kwargs):
resource_type = ContentType.objects.get_for_model(self)
@ -609,6 +599,7 @@ class BaseResource(models.Model):
parameters=kwargs)
job.set_after_timestamp(after_timestamp)
job.save()
transaction.on_commit(lambda: job.run(spool=True))
return job
def handle_job_error(self, job, exc_info):
@ -799,6 +790,30 @@ class Job(models.Model):
self.status_details.update({'new_job_pk': new_job.pk})
self.save()
def run(self, spool=False):
if spool and self.pk:
if 'uwsgi' in sys.modules and settings.PASSERELLE_MANAGE_COMMAND:
from passerelle.utils.spooler import run_job
tenant = getattr(connection, 'tenant', None)
run_job.spool(job_id=str(self.pk), domain=getattr(tenant, 'domain_url', None))
return
self.status = 'running'
self.save()
try:
getattr(self.resource, self.method_name)(**self.parameters)
except SkipJob as e:
self.status = 'registered'
self.set_after_timestamp(e.after_timestamp)
self.save()
return 'skipped'
except Exception:
self.resource.handle_job_error(self, sys.exc_info())
else:
self.status = 'completed'
self.done_timestamp = timezone.now()
self.save()
@six.python_2_unicode_compatible
class ResourceLog(models.Model):

View File

@ -202,6 +202,10 @@ MELLON_USERNAME_TEMPLATE = '{attributes[name_id_content]}'
MELLON_IDENTITY_PROVIDERS = []
# management command, used to run afterjobs in uwsgi mode,
# usually /usr/bin/passerelle-manage.
PASSERELLE_MANAGE_COMMAND = None
# REQUESTS_PROXIES that can be used by requests methods
# see http://docs.python-requests.org/en/latest/user/advanced/#proxies
REQUESTS_PROXIES = None

View File

@ -0,0 +1,44 @@
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import subprocess
from uwsgidecorators import spool
@spool
def run_job(args):
from django.conf import settings
cmd_args = [
settings.PASSERELLE_MANAGE_COMMAND,
]
if args.get('domain'):
# multitenant installation
cmd_args.append('tenant_command')
cmd_args += [
'runjob',
'--job-id', args['job_id']
]
if args.get('domain'):
# multitenant installation
cmd_args.append('--domain')
cmd_args.append(args['domain'])
subprocess.run(cmd_args)

View File

@ -3,6 +3,8 @@
import datetime
import os
from django.core.management import call_command
import mock
import pytest
@ -143,3 +145,18 @@ def test_jobs(mocked_get, app, base_adresse, freezer):
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
@mock.patch('passerelle.utils.Request.get')
def test_runjob(mocked_get, app, base_adresse, freezer):
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz')
with open(filepath, 'rb') as ban_file:
mocked_get.return_value = utils.FakedResponse(content=ban_file.read(), status_code=200)
freezer.move_to('2019-01-01 00:00:00')
job = base_adresse.add_job('update_streets_data')
assert job.status == 'registered'
call_command('runjob', '--job-id=%s' % job.pk)
assert Job.objects.get(id=job.id).status == 'completed'
assert StreetModel.objects.count() == 3

View File

@ -97,6 +97,7 @@ def test_create_demand(app, resource, ddpacs_payload, freezer, sftpserver, caplo
resource.outgoing_sftp = sftp.SFTP(
'sftp://john:doe@{server.host}:{server.port}/output/'.format(
server=sftpserver))
resource.save()
resource.jobs()
assert not content['output']
# Jump over the 6 hour wait time for retry
@ -123,6 +124,7 @@ def test_create_demand(app, resource, ddpacs_payload, freezer, sftpserver, caplo
resource.incoming_sftp = sftp.SFTP(
'sftp://john:doe@{server.host}:{server.port}/input/'.format(
server=sftpserver))
resource.save()
response_name, response_content = build_response_zip(
reference='A-1-1',

View File

@ -182,7 +182,10 @@ def test_sms_nostop_parameter(app, connector):
'to': ['+33688888888'],
}
for path in (base_path, base_path + '?nostop=1', base_path + '?nostop=foo', base_path + '?nostop'):
with mock.patch.object(connector, 'send_msg') as send_function:
send_patch = mock.patch(
'passerelle.apps.%s.models.%s.send_msg'
% (connector.__class__._meta.app_label, connector.__class__.__name__))
with send_patch as send_function:
send_function.return_value = {}
result = app.post_json(base_path, params=payload)
connector.jobs()
@ -204,7 +207,10 @@ def test_send_schema(app, connector, to, destination):
'from': '+33699999999',
'to': [to],
}
with mock.patch.object(connector, 'send_msg') as send_function:
send_patch = mock.patch(
'passerelle.apps.%s.models.%s.send_msg'
% (connector.__class__._meta.app_label, connector.__class__.__name__))
with send_patch as send_function:
app.post_json(base_path, params=payload)
connector.jobs()
assert send_function.call_args[1]['destinations'] == [destination]
@ -304,6 +310,7 @@ def test_ovh_new_api_credit(app, freezer, admin_user):
ovh_url = connector.API_URL % {'serviceName': 'sms-test42'}
with utils.mock_url(ovh_url, resp, 200) as mocked:
connector.jobs()
connector.refresh_from_db()
assert connector.credit_left == 123
resp = app.get(manager_url)