diff --git a/debian/control b/debian/control index 1e9aa405..eee9f983 100644 --- a/debian/control +++ b/debian/control @@ -19,6 +19,7 @@ Depends: ${python3:Depends}, python3-gadjo, python3-django-model-utils, python3-requests, + python3-uwsgidecorators, python3-setuptools, python3-suds, python3-cmislib, diff --git a/debian/debian_config.py b/debian/debian_config.py index 1d31c12f..7e6bd329 100644 --- a/debian/debian_config.py +++ b/debian/debian_config.py @@ -6,6 +6,7 @@ import os DEBUG = False PROJECT_NAME = 'passerelle' +PASSERELLE_MANAGE_COMMAND = '/usr/bin/passerelle-manage' # # hobotization (multitenant) diff --git a/debian/passerelle.cron.d b/debian/passerelle.cron.d index fd698cdc..e69de29b 100644 --- a/debian/passerelle.cron.d +++ b/debian/passerelle.cron.d @@ -1,8 +0,0 @@ -MAILTO=root - -*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants availability -*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants jobs -17 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants hourly -25 1 * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants daily -47 2 * * 7 passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants weekly -52 3 1 * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants monthly diff --git a/debian/passerelle.cron.hourly b/debian/passerelle.cron.hourly index 7e8ef87d..e69de29b 100644 --- a/debian/passerelle.cron.hourly +++ b/debian/passerelle.cron.hourly @@ -1,3 +0,0 @@ -#! /bin/sh - -/sbin/runuser -u passerelle /usr/bin/passerelle-manage -- tenant_command clearsessions --all-tenants diff --git a/debian/passerelle.dirs b/debian/passerelle.dirs index 728a2da0..b11f881d 100644 --- a/debian/passerelle.dirs +++ b/debian/passerelle.dirs @@ -1,5 +1,6 @@ /etc/passerelle /usr/lib/passerelle /var/lib/passerelle/collectstatic +/var/lib/passerelle/spooler /var/lib/passerelle/tenants /var/log/passerelle diff --git a/debian/passerelle.init b/debian/passerelle.init index 53b0800d..d938f9de 100755 --- a/debian/passerelle.init +++ b/debian/passerelle.init @@ -38,6 +38,7 @@ GROUP=$NAME DAEMON_ARGS=${DAEMON_ARGS:-"--pidfile=$PIDFILE --uid $USER --gid $GROUP --ini /etc/$NAME/uwsgi.ini +--spooler /var/lib/$NAME/spooler/ --daemonize /var/log/uwsgi.$NAME.log"} # Load the VERBOSE setting and other rcS variables diff --git a/debian/passerelle.postinst b/debian/passerelle.postinst index 9f144eb7..9eb42b11 100644 --- a/debian/passerelle.postinst +++ b/debian/passerelle.postinst @@ -20,6 +20,7 @@ case "$1" in # ensure dirs ownership chown $USER:$GROUP /var/log/$NAME chown $USER:$GROUP /var/lib/$NAME/collectstatic + chown $USER:$GROUP /var/lib/$NAME/spooler chown $USER:$GROUP /var/lib/$NAME/tenants # create a secret file SECRET_FILE=$CONFIG_DIR/secret diff --git a/debian/passerelle.service b/debian/passerelle.service index 9931b8a7..06cc31b9 100644 --- a/debian/passerelle.service +++ b/debian/passerelle.service @@ -12,7 +12,8 @@ User=%p Group=%p ExecStartPre=/usr/bin/passerelle-manage migrate_schemas --noinput --verbosity 1 ExecStartPre=/usr/bin/passerelle-manage collectstatic --noinput -ExecStart=/usr/bin/uwsgi --ini /etc/%p/uwsgi.ini +ExecStartPre=/bin/mkdir -p /var/lib/passerelle/spooler/%m/ +ExecStart=/usr/bin/uwsgi --ini /etc/%p/uwsgi.ini --spooler /var/lib/passerelle/spooler/%m/ ExecReload=/bin/kill -HUP $MAINPID KillSignal=SIGQUIT TimeoutStartSec=0 diff --git a/debian/uwsgi.ini b/debian/uwsgi.ini index 3e9f325b..ff8f090b 100644 --- a/debian/uwsgi.ini +++ b/debian/uwsgi.ini @@ -12,6 +12,21 @@ http-socket = /run/passerelle/passerelle.sock chmod-socket = 666 vacuum = true +spooler-processes = 3 +spooler-python-import = passerelle.utils.spooler +# every five minutes +cron = -5 -1 -1 -1 -1 /usr/bin/passerelle-manage tenant_command cron --all-tenants availability +cron = -5 -1 -1 -1 -1 /usr/bin/passerelle-manage tenant_command cron --all-tenants jobs +# hourly +cron = 1 -1 -1 -1 -1 /usr/bin/passerelle-manage tenant_command clearsessions --all-tenants +cron = 17 -1 -1 -1 -1 /usr/bin/passerelle-manage tenant_command cron --all-tenants hourly +# daily +cron = 25 1 -1 -1 -1 /usr/bin/passerelle-manage tenant_command cron --all-tenants daily +# weekly +cron = 47 2 -1 -1 7 /usr/bin/passerelle-manage tenant_command cron --all-tenants weekly +# monthly +cron = 52 3 1 -1 -1 /usr/bin/passerelle-manage tenant_command cron --all-tenants monthly + master = true enable-threads = true harakiri = 120 diff --git a/passerelle/base/management/commands/runjob.py b/passerelle/base/management/commands/runjob.py new file mode 100644 index 00000000..61d26b76 --- /dev/null +++ b/passerelle/base/management/commands/runjob.py @@ -0,0 +1,41 @@ +# passerelle - uniform access to multiple data sources and services +# Copyright (C) 2021 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from django.core.management.base import BaseCommand, CommandError +from django.db import connection, transaction + +from passerelle.base.models import Job + + +class Command(BaseCommand): + '''Run a job (internal command)''' + + def add_arguments(self, parser): + parser.add_argument('--job-id', action='store', required=True) + + def handle(self, *args, **options): + skip_locked = {'skip_locked': True} + if not connection.features.has_select_for_update_skip_locked: + skip_locked = {} + with transaction.atomic(): + try: + job = Job.objects.select_for_update(**skip_locked).get(pk=options['job_id']) + except Job.DoesNotExist: + raise CommandError('missing job') + job.status = 'running' + job.save() + # release lock + job.run() diff --git a/passerelle/base/models.py b/passerelle/base/models.py index fbc9465f..5e49687c 100644 --- a/passerelle/base/models.py +++ b/passerelle/base/models.py @@ -574,7 +574,6 @@ class BaseResource(models.Model): skipped_jobs = [] while True: with transaction.atomic(): - # lock an immediately runnable job job = self.jobs_set().exclude( pk__in=skipped_jobs ).filter( @@ -587,18 +586,9 @@ class BaseResource(models.Model): job.status = 'running' job.save() # release lock - try: - getattr(self, job.method_name)(**job.parameters) - except SkipJob as e: - job.status = 'registered' - job.set_after_timestamp(e.after_timestamp) + result = job.run() + if result == 'skipped': skipped_jobs.append(job.id) - except Exception as e: - self.handle_job_error(job, sys.exc_info()) - else: - job.status = 'completed' - job.done_timestamp = timezone.now() - job.save() def add_job(self, method_name, natural_id=None, after_timestamp=None, **kwargs): resource_type = ContentType.objects.get_for_model(self) @@ -609,6 +599,7 @@ class BaseResource(models.Model): parameters=kwargs) job.set_after_timestamp(after_timestamp) job.save() + transaction.on_commit(lambda: job.run(spool=True)) return job def handle_job_error(self, job, exc_info): @@ -799,6 +790,30 @@ class Job(models.Model): self.status_details.update({'new_job_pk': new_job.pk}) self.save() + def run(self, spool=False): + if spool and self.pk: + if 'uwsgi' in sys.modules and settings.PASSERELLE_MANAGE_COMMAND: + from passerelle.utils.spooler import run_job + tenant = getattr(connection, 'tenant', None) + run_job.spool(job_id=str(self.pk), domain=getattr(tenant, 'domain_url', None)) + return + + self.status = 'running' + self.save() + try: + getattr(self.resource, self.method_name)(**self.parameters) + except SkipJob as e: + self.status = 'registered' + self.set_after_timestamp(e.after_timestamp) + self.save() + return 'skipped' + except Exception: + self.resource.handle_job_error(self, sys.exc_info()) + else: + self.status = 'completed' + self.done_timestamp = timezone.now() + self.save() + @six.python_2_unicode_compatible class ResourceLog(models.Model): diff --git a/passerelle/settings.py b/passerelle/settings.py index 111bf8ee..72bebf43 100644 --- a/passerelle/settings.py +++ b/passerelle/settings.py @@ -202,6 +202,10 @@ MELLON_USERNAME_TEMPLATE = '{attributes[name_id_content]}' MELLON_IDENTITY_PROVIDERS = [] +# management command, used to run afterjobs in uwsgi mode, +# usually /usr/bin/passerelle-manage. +PASSERELLE_MANAGE_COMMAND = None + # REQUESTS_PROXIES that can be used by requests methods # see http://docs.python-requests.org/en/latest/user/advanced/#proxies REQUESTS_PROXIES = None diff --git a/passerelle/utils/spooler.py b/passerelle/utils/spooler.py new file mode 100644 index 00000000..fb2596c3 --- /dev/null +++ b/passerelle/utils/spooler.py @@ -0,0 +1,44 @@ +# passerelle - uniform access to multiple data sources and services +# Copyright (C) 2021 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import subprocess + +from uwsgidecorators import spool + + +@spool +def run_job(args): + from django.conf import settings + + cmd_args = [ + settings.PASSERELLE_MANAGE_COMMAND, + ] + + if args.get('domain'): + # multitenant installation + cmd_args.append('tenant_command') + + cmd_args += [ + 'runjob', + '--job-id', args['job_id'] + ] + + if args.get('domain'): + # multitenant installation + cmd_args.append('--domain') + cmd_args.append(args['domain']) + + subprocess.run(cmd_args) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 805ad9fc..76796009 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -3,6 +3,8 @@ import datetime import os +from django.core.management import call_command + import mock import pytest @@ -143,3 +145,18 @@ def test_jobs(mocked_get, app, base_adresse, freezer): base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'registered' + + +@mock.patch('passerelle.utils.Request.get') +def test_runjob(mocked_get, app, base_adresse, freezer): + filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz') + with open(filepath, 'rb') as ban_file: + mocked_get.return_value = utils.FakedResponse(content=ban_file.read(), status_code=200) + + freezer.move_to('2019-01-01 00:00:00') + job = base_adresse.add_job('update_streets_data') + assert job.status == 'registered' + + call_command('runjob', '--job-id=%s' % job.pk) + assert Job.objects.get(id=job.id).status == 'completed' + assert StreetModel.objects.count() == 3 diff --git a/tests/test_mdel_ddpacs.py b/tests/test_mdel_ddpacs.py index 508b2bc5..58ca7c1e 100644 --- a/tests/test_mdel_ddpacs.py +++ b/tests/test_mdel_ddpacs.py @@ -97,6 +97,7 @@ def test_create_demand(app, resource, ddpacs_payload, freezer, sftpserver, caplo resource.outgoing_sftp = sftp.SFTP( 'sftp://john:doe@{server.host}:{server.port}/output/'.format( server=sftpserver)) + resource.save() resource.jobs() assert not content['output'] # Jump over the 6 hour wait time for retry @@ -123,6 +124,7 @@ def test_create_demand(app, resource, ddpacs_payload, freezer, sftpserver, caplo resource.incoming_sftp = sftp.SFTP( 'sftp://john:doe@{server.host}:{server.port}/input/'.format( server=sftpserver)) + resource.save() response_name, response_content = build_response_zip( reference='A-1-1', diff --git a/tests/test_sms.py b/tests/test_sms.py index 59063f04..03f80648 100644 --- a/tests/test_sms.py +++ b/tests/test_sms.py @@ -182,7 +182,10 @@ def test_sms_nostop_parameter(app, connector): 'to': ['+33688888888'], } for path in (base_path, base_path + '?nostop=1', base_path + '?nostop=foo', base_path + '?nostop'): - with mock.patch.object(connector, 'send_msg') as send_function: + send_patch = mock.patch( + 'passerelle.apps.%s.models.%s.send_msg' + % (connector.__class__._meta.app_label, connector.__class__.__name__)) + with send_patch as send_function: send_function.return_value = {} result = app.post_json(base_path, params=payload) connector.jobs() @@ -204,7 +207,10 @@ def test_send_schema(app, connector, to, destination): 'from': '+33699999999', 'to': [to], } - with mock.patch.object(connector, 'send_msg') as send_function: + send_patch = mock.patch( + 'passerelle.apps.%s.models.%s.send_msg' + % (connector.__class__._meta.app_label, connector.__class__.__name__)) + with send_patch as send_function: app.post_json(base_path, params=payload) connector.jobs() assert send_function.call_args[1]['destinations'] == [destination] @@ -304,6 +310,7 @@ def test_ovh_new_api_credit(app, freezer, admin_user): ovh_url = connector.API_URL % {'serviceName': 'sms-test42'} with utils.mock_url(ovh_url, resp, 200) as mocked: connector.jobs() + connector.refresh_from_db() assert connector.credit_left == 123 resp = app.get(manager_url)