misc: add after_timestamp to run Job later (#36215)

The after_timestamp can be set:
- when adding the job with:

    self.add_job(..., after_timestamp=datetime(...))

- when skipping a job with:

    raise SkipJob(after_timestamp=datetime(...))
This commit is contained in:
Benjamin Dauvergne 2019-09-21 11:06:39 +02:00
parent 95904dbaae
commit d629c50e2e
3 changed files with 111 additions and 5 deletions

View File

@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.20 on 2019-09-21 08:47
from __future__ import unicode_literals
from django.db import migrations, models
import passerelle.base.models
class Migration(migrations.Migration):
dependencies = [
('base', '0014_auto_20190820_0914'),
]
operations = [
migrations.AddField(
model_name='job',
name='after_timestamp',
field=models.DateTimeField(null=True),
),
]

View File

@ -501,10 +501,11 @@ class BaseResource(models.Model):
skipped_jobs = []
while True:
with transaction.atomic():
# lock a runnable job
# lock an immediately runnable job
job = self.jobs_set().exclude(
pk__in=skipped_jobs
).filter(
Q(after_timestamp__isnull=True) | Q(after_timestamp__lt=timezone.now()),
status='registered'
).select_for_update(**skip_locked
).order_by('pk')[:1].first()
@ -515,8 +516,9 @@ class BaseResource(models.Model):
# release lock
try:
getattr(self, job.method_name)(**job.parameters)
except SkipJob:
except SkipJob as e:
job.status = 'registered'
job.set_after_timestamp(e.after_timestamp)
skipped_jobs.append(job.id)
except Exception as e:
self.handle_job_error(job, sys.exc_info())
@ -525,13 +527,14 @@ class BaseResource(models.Model):
job.done_timestamp = timezone.now()
job.save()
def add_job(self, method_name, natural_id=None, **kwargs):
def add_job(self, method_name, natural_id=None, after_timestamp=None, **kwargs):
resource_type = ContentType.objects.get_for_model(self)
job = Job(resource_type=resource_type,
resource_pk=self.pk,
method_name=method_name,
natural_id=natural_id,
parameters=kwargs)
job.set_after_timestamp(after_timestamp)
job.save()
return job
@ -644,7 +647,9 @@ class AvailabilityParameters(models.Model):
class SkipJob(Exception):
pass
def __init__(self, after_timestamp=None):
self.after_timestamp = after_timestamp
super(SkipJob, self).__init__()
class Job(models.Model):
@ -657,6 +662,7 @@ class Job(models.Model):
creation_timestamp = models.DateTimeField(auto_now_add=True)
update_timestamp = models.DateTimeField(auto_now=True)
done_timestamp = models.DateTimeField(null=True)
after_timestamp = models.DateTimeField(null=True)
status = models.CharField(
max_length=20,
default='registered',
@ -668,6 +674,16 @@ class Job(models.Model):
)
status_details = jsonfield.JSONField(default={})
def set_after_timestamp(self, value):
if isinstance(value, datetime.datetime):
self.after_timestamp = value
elif isinstance(value, six.integer_types + (float,)):
self.after_timestamp = timezone.now() + datetime.timedelta(seconds=value)
elif isinstance(value, datetime.timedelta):
self.after_timestamp = timezone.now() + value
else:
self.after_timestamp = value
class ResourceLog(models.Model):
timestamp = models.DateTimeField(auto_now_add=True)

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import datetime
import os
import mock
@ -12,7 +13,7 @@ from .test_base_adresse import base_adresse, StreetModel
@mock.patch('passerelle.utils.Request.get')
def test_jobs(mocked_get, app, base_adresse):
def test_jobs(mocked_get, app, base_adresse, freezer):
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.bz2')
mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200)
@ -39,6 +40,74 @@ def test_jobs(mocked_get, app, base_adresse):
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
# use after_timestamp with SkipJob
freezer.move_to('2019-01-01 00:00:00')
mocked_get.side_effect = SkipJob(after_timestamp='2019-01-02 00:00:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
mocked_get.side_effect = None
freezer.move_to('2019-01-01 12:00:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
freezer.move_to('2019-01-02 01:00:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
# use after_timestamp with SkipJob and seconds
job = base_adresse.add_job('update_streets_data')
freezer.move_to('2019-01-01 00:00:00')
mocked_get.side_effect = SkipJob(after_timestamp=3600)
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
mocked_get.side_effect = None
freezer.move_to('2019-01-01 00:30:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
freezer.move_to('2019-01-01 01:01:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
# use after_timestamp with SkipJob and timedelta
job = base_adresse.add_job('update_streets_data')
freezer.move_to('2019-01-01 00:00:00')
mocked_get.side_effect = SkipJob(after_timestamp=datetime.timedelta(seconds=3600))
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
mocked_get.side_effect = None
freezer.move_to('2019-01-01 00:30:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
freezer.move_to('2019-01-01 01:01:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
# use after_timestamp with add_job
freezer.move_to('2019-01-01 00:00:00')
job = base_adresse.add_job('update_streets_data', after_timestamp='2019-01-02 00:00:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
freezer.move_to('2019-01-02 01:00:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
# use after_timestamp with add_job and seconds
freezer.move_to('2019-01-01 00:00:00')
job = base_adresse.add_job('update_streets_data', after_timestamp=3600)
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
freezer.move_to('2019-01-01 01:01:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
# use after_timestamp with add_job and seconds
freezer.move_to('2019-01-01 00:00:00')
job = base_adresse.add_job('update_streets_data', after_timestamp=datetime.timedelta(seconds=3600))
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
freezer.move_to('2019-01-01 01:01:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
# don't run jobs if connector is down
StreetModel.objects.all().delete()
with mock.patch('passerelle.apps.base_adresse.models.BaseAdresse.down') as down: