passerelle/tests/test_jobs.py

199 lines
8.2 KiB
Python

import datetime
import os
from unittest import mock
import isodate
import pytest
from django.core.management import call_command
from django.core.management.base import CommandError
import tests.utils
from passerelle.base.models import Job, ResourceLog, SkipJob
from passerelle.utils.jsonresponse import APIError
from tests.test_base_adresse import BaseAdresse, StreetModel
@pytest.fixture
def base_adresse(db):
return tests.utils.setup_access_rights(BaseAdresse.objects.create(slug='base-adresse', zipcode='73'))
@mock.patch('passerelle.utils.Request.get')
def test_jobs(mocked_get, app, base_adresse, freezer):
Job.objects.all().delete() # remove jobs automatically added at connector creation
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz')
with open(filepath, 'rb') as ban_file:
mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200)
freezer.move_to('2019-01-01 00:00:00')
job = base_adresse.add_job('update_streets_data')
assert job.status == 'registered'
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
assert StreetModel.objects.count() == 3
# don't delete streets if bano file is empty
mocked_get.return_value = tests.utils.FakedResponse(content=b'', status_code=200)
freezer.move_to('2019-01-01 12:00:00')
job = base_adresse.add_job('update_streets_data')
assert job.status == 'registered'
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'failed'
assert Job.objects.get(id=job.id).status_details == {'error_summary': 'Exception: bano file is empty'}
assert ResourceLog.objects.all().count() == 1
assert ResourceLog.objects.all()[0].message == (
'error running update_streets_data job (bano file is empty)'
)
assert StreetModel.objects.count() == 3
with open(filepath, 'rb') as ban_file:
mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200)
StreetModel.objects.all().delete()
ResourceLog.objects.all().delete()
job = base_adresse.add_job('update_streets_data')
mocked_get.side_effect = Exception('hello')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'failed'
assert Job.objects.get(id=job.id).status_details == {'error_summary': 'Exception: hello'}
assert ResourceLog.objects.all().count() == 1
assert ResourceLog.objects.all()[0].message == 'error running update_streets_data job (hello)'
job = base_adresse.add_job('update_streets_data')
mocked_get.side_effect = SkipJob()
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
# use after_timestamp with SkipJob
freezer.move_to('2019-01-01 00:00:00')
mocked_get.side_effect = SkipJob(after_timestamp=isodate.parse_datetime('2019-01-02T00:00:00+00:00'))
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
mocked_get.side_effect = None
freezer.move_to('2019-01-01 12:00:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
freezer.move_to('2019-01-02 01:00:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
# use after_timestamp with SkipJob and seconds
job = base_adresse.add_job('update_streets_data')
freezer.move_to('2019-01-01 00:00:00')
mocked_get.side_effect = SkipJob(after_timestamp=3600)
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
mocked_get.side_effect = None
freezer.move_to('2019-01-01 00:30:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
freezer.move_to('2019-01-01 01:01:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
# use after_timestamp with SkipJob and timedelta
job = base_adresse.add_job('update_streets_data')
freezer.move_to('2019-01-01 00:00:00')
mocked_get.side_effect = SkipJob(after_timestamp=datetime.timedelta(seconds=3600))
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
mocked_get.side_effect = None
freezer.move_to('2019-01-01 00:30:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
freezer.move_to('2019-01-01 01:01:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
# use after_timestamp with add_job
freezer.move_to('2019-01-01 00:00:00')
job = base_adresse.add_job(
'update_streets_data', after_timestamp=isodate.parse_datetime('2019-01-02T00:00:00+00:00')
)
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
freezer.move_to('2019-01-02 01:00:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
# use after_timestamp with add_job and seconds
freezer.move_to('2019-01-01 00:00:00')
job = base_adresse.add_job('update_streets_data', after_timestamp=3600)
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
freezer.move_to('2019-01-01 01:01:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
# use after_timestamp with add_job and seconds
freezer.move_to('2019-01-01 00:00:00')
job = base_adresse.add_job('update_streets_data', after_timestamp=datetime.timedelta(seconds=3600))
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
freezer.move_to('2019-01-01 01:01:00')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'completed'
# don't run jobs if connector is down
StreetModel.objects.all().delete()
with mock.patch('passerelle.apps.base_adresse.models.BaseAdresse.down') as down:
down.side_effect = lambda: True
with open(filepath, 'rb') as ban_file:
mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200)
job = base_adresse.add_job('update_streets_data')
assert job.status == 'registered'
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'registered'
@mock.patch('passerelle.utils.Request.get')
def test_runjob(mocked_get, app, base_adresse, freezer):
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz')
with open(filepath, 'rb') as ban_file:
mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200)
freezer.move_to('2019-01-01 00:00:00')
job = base_adresse.add_job('update_streets_data')
assert job.status == 'registered'
call_command('runjob', '--job-id=%s' % job.pk)
assert Job.objects.get(id=job.id).status == 'completed'
assert StreetModel.objects.count() == 3
with pytest.raises(CommandError) as e:
call_command('runjob', '--job-id=%s' % job.pk)
assert 'cannot run job, status is completed' in str(e.value)
date = datetime.date(year=2019, month=1, day=2)
job = base_adresse.add_job('update_streets_data', after_timestamp=date)
with pytest.raises(CommandError) as e:
call_command('runjob', '--job-id=%s' % job.pk)
assert 'cannot run job, should be run after 2019-01-02 00:00:00+00:00' in str(e.value)
@mock.patch('passerelle.utils.Request.get')
def test_jobs_api_error_log_level(mocked_get, app, base_adresse, freezer):
Job.objects.all().delete() # remove jobs automatically added at connector creation
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz')
with open(filepath, 'rb') as ban_file:
mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200)
job = base_adresse.add_job('update_streets_data')
mocked_get.side_effect = Exception('hello')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'failed'
assert ResourceLog.objects.all().count() == 1
assert ResourceLog.objects.all()[0].level == 'error'
job = base_adresse.add_job('update_streets_data')
mocked_get.side_effect = APIError('hello')
base_adresse.jobs()
assert Job.objects.get(id=job.id).status == 'failed'
assert ResourceLog.objects.all().count() == 2
assert ResourceLog.objects.all()[1].level == 'warning'