general: add basic asynchronous job infrastructure (#12469)
This commit is contained in:
parent
fa76fbc8d7
commit
5257471818
|
@ -1,6 +1,7 @@
|
|||
MAILTO=root
|
||||
|
||||
*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants availability
|
||||
*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants jobs
|
||||
17 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants hourly
|
||||
25 1 * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants daily
|
||||
47 2 * * 7 passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants weekly
|
||||
|
|
|
@ -35,7 +35,7 @@ class Command(BaseCommand):
|
|||
|
||||
|
||||
def handle(self, frequency, **options):
|
||||
if frequency not in ('hourly', 'daily', 'weekly', 'monthly', 'availability'):
|
||||
if frequency not in ('hourly', 'daily', 'weekly', 'monthly', 'availability', 'jobs'):
|
||||
raise CommandError('unknown frequency')
|
||||
errors = []
|
||||
for app in get_all_apps():
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Generated by Django 1.11.12 on 2019-02-20 09:00
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
import jsonfield.fields
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('contenttypes', '0002_remove_content_type_name'),
|
||||
('base', '0011_auto_20190205_1126'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='Job',
|
||||
fields=[
|
||||
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('resource_pk', models.PositiveIntegerField()),
|
||||
('method_name', models.CharField(max_length=50)),
|
||||
('natural_id', models.CharField(blank=True, max_length=256, null=True)),
|
||||
('parameters', jsonfield.fields.JSONField(default={})),
|
||||
('creation_timestamp', models.DateTimeField(auto_now_add=True)),
|
||||
('update_timestamp', models.DateTimeField(auto_now=True)),
|
||||
('done_timestamp', models.DateTimeField(null=True)),
|
||||
('status', models.CharField(choices=[(b'registered', 'Registered'), (b'running', 'Running'), (b'failed', 'Failed'), (b'completed', 'Completed')], default=b'registered', max_length=20)),
|
||||
('status_details', jsonfield.fields.JSONField(default={})),
|
||||
('resource_type', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='contenttypes.ContentType')),
|
||||
],
|
||||
),
|
||||
]
|
|
@ -9,6 +9,7 @@ import sys
|
|||
import traceback
|
||||
import base64
|
||||
|
||||
import django
|
||||
from django.apps import apps
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ValidationError, ObjectDoesNotExist, PermissionDenied
|
||||
|
@ -449,6 +450,55 @@ class BaseResource(models.Model):
|
|||
def monthly(self):
|
||||
pass
|
||||
|
||||
def jobs(self):
|
||||
# "jobs" cron job to run asynchronous tasks
|
||||
resource_type = ContentType.objects.get_for_model(self)
|
||||
skip_locked = {'skip_locked': True}
|
||||
if django.VERSION < (1, 11, 0):
|
||||
skip_locked = {}
|
||||
skipped_jobs = []
|
||||
while True:
|
||||
with transaction.atomic():
|
||||
# lock a job
|
||||
job = Job.objects.exclude(
|
||||
pk__in=skipped_jobs
|
||||
).filter(
|
||||
resource_type=resource_type,
|
||||
resource_pk=self.pk,
|
||||
status='registered'
|
||||
).select_for_update(**skip_locked).first()
|
||||
if not job:
|
||||
break
|
||||
job.status = 'running'
|
||||
job.save()
|
||||
# release lock
|
||||
try:
|
||||
getattr(self, job.method_name)(**job.parameters)
|
||||
except SkipJob:
|
||||
job.status = 'registered'
|
||||
skipped_jobs.append(job.id)
|
||||
except Exception as e:
|
||||
(exc_type, exc_value, tb) = sys.exc_info()
|
||||
job.status = 'failed'
|
||||
job.done_timestamp = timezone.now()
|
||||
job.status_details = {
|
||||
'error_summary': '\n'.join(traceback.format_exception_only(exc_type, exc_value)).strip(),
|
||||
}
|
||||
else:
|
||||
job.status = 'completed'
|
||||
job.done_timestamp = timezone.now()
|
||||
job.save()
|
||||
|
||||
def add_job(self, method_name, natural_id=None, **kwargs):
|
||||
resource_type = ContentType.objects.get_for_model(self)
|
||||
job = Job(resource_type=resource_type,
|
||||
resource_pk=self.pk,
|
||||
method_name=method_name,
|
||||
natural_id=natural_id,
|
||||
parameters=kwargs)
|
||||
job.save()
|
||||
return job
|
||||
|
||||
|
||||
class AccessRight(models.Model):
|
||||
codename = models.CharField(max_length=100, verbose_name='codename')
|
||||
|
@ -507,6 +557,31 @@ class AvailabilityParameters(models.Model):
|
|||
unique_together = (('resource_type', 'resource_pk'))
|
||||
|
||||
|
||||
class SkipJob(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Job(models.Model):
|
||||
resource_type = models.ForeignKey(ContentType)
|
||||
resource_pk = models.PositiveIntegerField()
|
||||
resource = fields.GenericForeignKey('resource_type', 'resource_pk')
|
||||
method_name = models.CharField(max_length=50)
|
||||
natural_id = models.CharField(max_length=256, blank=True, null=True)
|
||||
parameters = jsonfield.JSONField(default={})
|
||||
creation_timestamp = models.DateTimeField(auto_now_add=True)
|
||||
update_timestamp = models.DateTimeField(auto_now=True)
|
||||
done_timestamp = models.DateTimeField(null=True)
|
||||
status = models.CharField(
|
||||
max_length=20,
|
||||
default='registered',
|
||||
choices=(('registered', _('Registered')),
|
||||
('running', _('Running')),
|
||||
('failed', _('Failed')),
|
||||
('completed', _('Completed'))
|
||||
),
|
||||
)
|
||||
status_details = jsonfield.JSONField(default={})
|
||||
|
||||
|
||||
class ResourceLog(models.Model):
|
||||
timestamp = models.DateTimeField(auto_now_add=True)
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
|
||||
import utils
|
||||
|
||||
from passerelle.base.models import Job, SkipJob
|
||||
from .test_base_adresse import base_adresse, StreetModel
|
||||
|
||||
|
||||
@mock.patch('passerelle.utils.Request.get')
|
||||
def test_jobs(mocked_get, app, base_adresse):
|
||||
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.bz2')
|
||||
mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200)
|
||||
|
||||
job = base_adresse.add_job('update_streets_data')
|
||||
assert job.status == 'registered'
|
||||
|
||||
base_adresse.jobs()
|
||||
assert Job.objects.get(id=job.id).status == 'completed'
|
||||
assert StreetModel.objects.count() == 3
|
||||
|
||||
StreetModel.objects.all().delete()
|
||||
|
||||
job = base_adresse.add_job('update_streets_data')
|
||||
mocked_get.side_effect = Exception('hello')
|
||||
base_adresse.jobs()
|
||||
assert Job.objects.get(id=job.id).status == 'failed'
|
||||
assert Job.objects.get(id=job.id).status_details == {'error_summary': 'Exception: hello'}
|
||||
|
||||
job = base_adresse.add_job('update_streets_data')
|
||||
mocked_get.side_effect = SkipJob()
|
||||
base_adresse.jobs()
|
||||
assert Job.objects.get(id=job.id).status == 'registered'
|
Loading…
Reference in New Issue