general: add basic asynchronous job infrastructure (#12469)

This commit is contained in:
Frédéric Péters 2019-02-13 12:24:06 +01:00
parent fa76fbc8d7
commit 5257471818
5 changed files with 148 additions and 1 deletions

View File

@ -1,6 +1,7 @@
MAILTO=root
*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants availability
*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants jobs
17 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants hourly
25 1 * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants daily
47 2 * * 7 passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants weekly

View File

@ -35,7 +35,7 @@ class Command(BaseCommand):
def handle(self, frequency, **options):
if frequency not in ('hourly', 'daily', 'weekly', 'monthly', 'availability'):
if frequency not in ('hourly', 'daily', 'weekly', 'monthly', 'availability', 'jobs'):
raise CommandError('unknown frequency')
errors = []
for app in get_all_apps():

View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.12 on 2019-02-20 09:00
from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
import jsonfield.fields
class Migration(migrations.Migration):
dependencies = [
('contenttypes', '0002_remove_content_type_name'),
('base', '0011_auto_20190205_1126'),
]
operations = [
migrations.CreateModel(
name='Job',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('resource_pk', models.PositiveIntegerField()),
('method_name', models.CharField(max_length=50)),
('natural_id', models.CharField(blank=True, max_length=256, null=True)),
('parameters', jsonfield.fields.JSONField(default={})),
('creation_timestamp', models.DateTimeField(auto_now_add=True)),
('update_timestamp', models.DateTimeField(auto_now=True)),
('done_timestamp', models.DateTimeField(null=True)),
('status', models.CharField(choices=[(b'registered', 'Registered'), (b'running', 'Running'), (b'failed', 'Failed'), (b'completed', 'Completed')], default=b'registered', max_length=20)),
('status_details', jsonfield.fields.JSONField(default={})),
('resource_type', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='contenttypes.ContentType')),
],
),
]

View File

@ -9,6 +9,7 @@ import sys
import traceback
import base64
import django
from django.apps import apps
from django.conf import settings
from django.core.exceptions import ValidationError, ObjectDoesNotExist, PermissionDenied
@ -449,6 +450,55 @@ class BaseResource(models.Model):
def monthly(self):
pass
def jobs(self):
# "jobs" cron job to run asynchronous tasks
resource_type = ContentType.objects.get_for_model(self)
skip_locked = {'skip_locked': True}
if django.VERSION < (1, 11, 0):
skip_locked = {}
skipped_jobs = []
while True:
with transaction.atomic():
# lock a job
job = Job.objects.exclude(
pk__in=skipped_jobs
).filter(
resource_type=resource_type,
resource_pk=self.pk,
status='registered'
).select_for_update(**skip_locked).first()
if not job:
break
job.status = 'running'
job.save()
# release lock
try:
getattr(self, job.method_name)(**job.parameters)
except SkipJob:
job.status = 'registered'
skipped_jobs.append(job.id)
except Exception as e:
(exc_type, exc_value, tb) = sys.exc_info()
job.status = 'failed'
job.done_timestamp = timezone.now()
job.status_details = {
'error_summary': '\n'.join(traceback.format_exception_only(exc_type, exc_value)).strip(),
}
else:
job.status = 'completed'
job.done_timestamp = timezone.now()
job.save()
def add_job(self, method_name, natural_id=None, **kwargs):
resource_type = ContentType.objects.get_for_model(self)
job = Job(resource_type=resource_type,
resource_pk=self.pk,
method_name=method_name,
natural_id=natural_id,
parameters=kwargs)
job.save()
return job
class AccessRight(models.Model):
codename = models.CharField(max_length=100, verbose_name='codename')
@ -507,6 +557,31 @@ class AvailabilityParameters(models.Model):
unique_together = (('resource_type', 'resource_pk'))
class SkipJob(Exception):
pass
class Job(models.Model):
resource_type = models.ForeignKey(ContentType)
resource_pk = models.PositiveIntegerField()
resource = fields.GenericForeignKey('resource_type', 'resource_pk')
method_name = models.CharField(max_length=50)
natural_id = models.CharField(max_length=256, blank=True, null=True)
parameters = jsonfield.JSONField(default={})
creation_timestamp = models.DateTimeField(auto_now_add=True)
update_timestamp = models.DateTimeField(auto_now=True)
done_timestamp = models.DateTimeField(null=True)
status = models.CharField(
max_length=20,
default='registered',
choices=(('registered', _('Registered')),
('running', _('Running')),
('failed', _('Failed')),
('completed', _('Completed'))
),
)
status_details = jsonfield.JSONField(default={})
class ResourceLog(models.Model):
timestamp = models.DateTimeField(auto_now_add=True)

37
tests/test_jobs.py Normal file
View File

@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
import os
import mock
import pytest
import utils
from passerelle.base.models import Job, SkipJob
from .test_base_adresse import base_adresse, StreetModel
@mock.patch('passerelle.utils.Request.get')
def test_jobs(mocked_get, app, base_adresse):
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.bz2')
mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200)
job = base_adresse.add_job('update_streets_data')
assert job.status == 'registered'
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
assert StreetModel.objects.count() == 3
StreetModel.objects.all().delete()
job = base_adresse.add_job('update_streets_data')
mocked_get.side_effect = Exception('hello')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'failed'
assert Job.objects.get(id=job.id).status_details == {'error_summary': 'Exception: hello'}
job = base_adresse.add_job('update_streets_data')
mocked_get.side_effect = SkipJob()
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'