From 5257471818eedc214dd16dd9bc56f444dfeecae9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20P=C3=A9ters?= Date: Wed, 13 Feb 2019 12:24:06 +0100 Subject: [PATCH] general: add basic asynchronous job infrastructure (#12469) --- debian/passerelle.cron.d | 1 + passerelle/base/management/commands/cron.py | 2 +- passerelle/base/migrations/0012_job.py | 34 ++++++++++ passerelle/base/models.py | 75 +++++++++++++++++++++ tests/test_jobs.py | 37 ++++++++++ 5 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 passerelle/base/migrations/0012_job.py create mode 100644 tests/test_jobs.py diff --git a/debian/passerelle.cron.d b/debian/passerelle.cron.d index d72cd8c3..fd698cdc 100644 --- a/debian/passerelle.cron.d +++ b/debian/passerelle.cron.d @@ -1,6 +1,7 @@ MAILTO=root */5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants availability +*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants jobs 17 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants hourly 25 1 * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants daily 47 2 * * 7 passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants weekly diff --git a/passerelle/base/management/commands/cron.py b/passerelle/base/management/commands/cron.py index de4bff66..302f5a1c 100644 --- a/passerelle/base/management/commands/cron.py +++ b/passerelle/base/management/commands/cron.py @@ -35,7 +35,7 @@ class Command(BaseCommand): def handle(self, frequency, **options): - if frequency not in ('hourly', 'daily', 'weekly', 'monthly', 'availability'): + if frequency not in ('hourly', 'daily', 'weekly', 'monthly', 'availability', 'jobs'): raise CommandError('unknown frequency') errors = [] for app in get_all_apps(): diff --git a/passerelle/base/migrations/0012_job.py b/passerelle/base/migrations/0012_job.py new file mode 100644 index 00000000..fe85b6a9 --- /dev/null +++ b/passerelle/base/migrations/0012_job.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.12 on 2019-02-20 09:00 +from __future__ import unicode_literals + +from django.db import migrations, models +import django.db.models.deletion +import jsonfield.fields + + +class Migration(migrations.Migration): + + dependencies = [ + ('contenttypes', '0002_remove_content_type_name'), + ('base', '0011_auto_20190205_1126'), + ] + + operations = [ + migrations.CreateModel( + name='Job', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('resource_pk', models.PositiveIntegerField()), + ('method_name', models.CharField(max_length=50)), + ('natural_id', models.CharField(blank=True, max_length=256, null=True)), + ('parameters', jsonfield.fields.JSONField(default={})), + ('creation_timestamp', models.DateTimeField(auto_now_add=True)), + ('update_timestamp', models.DateTimeField(auto_now=True)), + ('done_timestamp', models.DateTimeField(null=True)), + ('status', models.CharField(choices=[(b'registered', 'Registered'), (b'running', 'Running'), (b'failed', 'Failed'), (b'completed', 'Completed')], default=b'registered', max_length=20)), + ('status_details', jsonfield.fields.JSONField(default={})), + ('resource_type', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='contenttypes.ContentType')), + ], + ), + ] diff --git a/passerelle/base/models.py b/passerelle/base/models.py index 6904dcbb..d92f0f78 100644 --- a/passerelle/base/models.py +++ b/passerelle/base/models.py @@ -9,6 +9,7 @@ import sys import traceback import base64 +import django from django.apps import apps from django.conf import settings from django.core.exceptions import ValidationError, ObjectDoesNotExist, PermissionDenied @@ -449,6 +450,55 @@ class BaseResource(models.Model): def monthly(self): pass + def jobs(self): + # "jobs" cron job to run asynchronous tasks + resource_type = ContentType.objects.get_for_model(self) + skip_locked = {'skip_locked': True} + if django.VERSION < (1, 11, 0): + skip_locked = {} + skipped_jobs = [] + while True: + with transaction.atomic(): + # lock a job + job = Job.objects.exclude( + pk__in=skipped_jobs + ).filter( + resource_type=resource_type, + resource_pk=self.pk, + status='registered' + ).select_for_update(**skip_locked).first() + if not job: + break + job.status = 'running' + job.save() + # release lock + try: + getattr(self, job.method_name)(**job.parameters) + except SkipJob: + job.status = 'registered' + skipped_jobs.append(job.id) + except Exception as e: + (exc_type, exc_value, tb) = sys.exc_info() + job.status = 'failed' + job.done_timestamp = timezone.now() + job.status_details = { + 'error_summary': '\n'.join(traceback.format_exception_only(exc_type, exc_value)).strip(), + } + else: + job.status = 'completed' + job.done_timestamp = timezone.now() + job.save() + + def add_job(self, method_name, natural_id=None, **kwargs): + resource_type = ContentType.objects.get_for_model(self) + job = Job(resource_type=resource_type, + resource_pk=self.pk, + method_name=method_name, + natural_id=natural_id, + parameters=kwargs) + job.save() + return job + class AccessRight(models.Model): codename = models.CharField(max_length=100, verbose_name='codename') @@ -507,6 +557,31 @@ class AvailabilityParameters(models.Model): unique_together = (('resource_type', 'resource_pk')) +class SkipJob(Exception): + pass + + +class Job(models.Model): + resource_type = models.ForeignKey(ContentType) + resource_pk = models.PositiveIntegerField() + resource = fields.GenericForeignKey('resource_type', 'resource_pk') + method_name = models.CharField(max_length=50) + natural_id = models.CharField(max_length=256, blank=True, null=True) + parameters = jsonfield.JSONField(default={}) + creation_timestamp = models.DateTimeField(auto_now_add=True) + update_timestamp = models.DateTimeField(auto_now=True) + done_timestamp = models.DateTimeField(null=True) + status = models.CharField( + max_length=20, + default='registered', + choices=(('registered', _('Registered')), + ('running', _('Running')), + ('failed', _('Failed')), + ('completed', _('Completed')) + ), + ) + status_details = jsonfield.JSONField(default={}) + class ResourceLog(models.Model): timestamp = models.DateTimeField(auto_now_add=True) diff --git a/tests/test_jobs.py b/tests/test_jobs.py new file mode 100644 index 00000000..6042657a --- /dev/null +++ b/tests/test_jobs.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- + +import os + +import mock +import pytest + +import utils + +from passerelle.base.models import Job, SkipJob +from .test_base_adresse import base_adresse, StreetModel + + +@mock.patch('passerelle.utils.Request.get') +def test_jobs(mocked_get, app, base_adresse): + filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.bz2') + mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200) + + job = base_adresse.add_job('update_streets_data') + assert job.status == 'registered' + + base_adresse.jobs() + assert Job.objects.get(id=job.id).status == 'completed' + assert StreetModel.objects.count() == 3 + + StreetModel.objects.all().delete() + + job = base_adresse.add_job('update_streets_data') + mocked_get.side_effect = Exception('hello') + base_adresse.jobs() + assert Job.objects.get(id=job.id).status == 'failed' + assert Job.objects.get(id=job.id).status_details == {'error_summary': 'Exception: hello'} + + job = base_adresse.add_job('update_streets_data') + mocked_get.side_effect = SkipJob() + base_adresse.jobs() + assert Job.objects.get(id=job.id).status == 'registered'