138 lines
5.5 KiB
Python
138 lines
5.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import datetime
|
|
import os
|
|
|
|
import mock
|
|
import pytest
|
|
|
|
import utils
|
|
|
|
from passerelle.base.models import Job, SkipJob, ResourceLog
|
|
from .test_base_adresse import base_adresse, StreetModel
|
|
|
|
|
|
@mock.patch('passerelle.utils.Request.get')
|
|
def test_jobs(mocked_get, app, base_adresse, freezer):
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.bz2')
|
|
mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200)
|
|
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
job = base_adresse.add_job('update_streets_data')
|
|
assert job.status == 'registered'
|
|
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
assert StreetModel.objects.count() == 3
|
|
|
|
# don't delete streets if bano file is empty
|
|
mocked_get.return_value = utils.FakedResponse(content='', status_code=200)
|
|
freezer.move_to('2019-01-01 12:00:00')
|
|
job = base_adresse.add_job('update_streets_data')
|
|
assert job.status == 'registered'
|
|
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'failed'
|
|
assert Job.objects.get(id=job.id).status_details == {'error_summary': 'Exception: bano file is empty'}
|
|
assert ResourceLog.objects.all().count() == 1
|
|
assert ResourceLog.objects.all()[0].message == (
|
|
'error running update_streets_data job (bano file is empty)')
|
|
assert StreetModel.objects.count() == 3
|
|
|
|
mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200)
|
|
|
|
StreetModel.objects.all().delete()
|
|
|
|
ResourceLog.objects.all().delete()
|
|
job = base_adresse.add_job('update_streets_data')
|
|
mocked_get.side_effect = Exception('hello')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'failed'
|
|
assert Job.objects.get(id=job.id).status_details == {'error_summary': 'Exception: hello'}
|
|
assert ResourceLog.objects.all().count() == 1
|
|
assert ResourceLog.objects.all()[0].message == 'error running update_streets_data job (hello)'
|
|
|
|
job = base_adresse.add_job('update_streets_data')
|
|
mocked_get.side_effect = SkipJob()
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
|
|
# use after_timestamp with SkipJob
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
mocked_get.side_effect = SkipJob(after_timestamp='2019-01-02 00:00:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
mocked_get.side_effect = None
|
|
freezer.move_to('2019-01-01 12:00:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
freezer.move_to('2019-01-02 01:00:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
|
|
# use after_timestamp with SkipJob and seconds
|
|
job = base_adresse.add_job('update_streets_data')
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
mocked_get.side_effect = SkipJob(after_timestamp=3600)
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
mocked_get.side_effect = None
|
|
freezer.move_to('2019-01-01 00:30:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
freezer.move_to('2019-01-01 01:01:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
|
|
# use after_timestamp with SkipJob and timedelta
|
|
job = base_adresse.add_job('update_streets_data')
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
mocked_get.side_effect = SkipJob(after_timestamp=datetime.timedelta(seconds=3600))
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
mocked_get.side_effect = None
|
|
freezer.move_to('2019-01-01 00:30:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
freezer.move_to('2019-01-01 01:01:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
|
|
# use after_timestamp with add_job
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
job = base_adresse.add_job('update_streets_data', after_timestamp='2019-01-02 00:00:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
freezer.move_to('2019-01-02 01:00:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
|
|
# use after_timestamp with add_job and seconds
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
job = base_adresse.add_job('update_streets_data', after_timestamp=3600)
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
freezer.move_to('2019-01-01 01:01:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
|
|
# use after_timestamp with add_job and seconds
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
job = base_adresse.add_job('update_streets_data', after_timestamp=datetime.timedelta(seconds=3600))
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
freezer.move_to('2019-01-01 01:01:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
|
|
# don't run jobs if connector is down
|
|
StreetModel.objects.all().delete()
|
|
with mock.patch('passerelle.apps.base_adresse.models.BaseAdresse.down') as down:
|
|
down.side_effect = lambda: True
|
|
mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200)
|
|
job = base_adresse.add_job('update_streets_data')
|
|
assert job.status == 'registered'
|
|
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|