199 lines
8.2 KiB
Python
199 lines
8.2 KiB
Python
import datetime
|
|
import os
|
|
from unittest import mock
|
|
|
|
import isodate
|
|
import pytest
|
|
from django.core.management import call_command
|
|
from django.core.management.base import CommandError
|
|
|
|
import tests.utils
|
|
from passerelle.base.models import Job, ResourceLog, SkipJob
|
|
from passerelle.utils.jsonresponse import APIError
|
|
from tests.test_base_adresse import BaseAdresse, StreetModel
|
|
|
|
|
|
@pytest.fixture
|
|
def base_adresse(db):
|
|
return tests.utils.setup_access_rights(BaseAdresse.objects.create(slug='base-adresse', zipcode='73'))
|
|
|
|
|
|
@mock.patch('passerelle.utils.Request.get')
|
|
def test_jobs(mocked_get, app, base_adresse, freezer):
|
|
Job.objects.all().delete() # remove jobs automatically added at connector creation
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz')
|
|
with open(filepath, 'rb') as ban_file:
|
|
mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200)
|
|
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
job = base_adresse.add_job('update_streets_data')
|
|
assert job.status == 'registered'
|
|
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
assert StreetModel.objects.count() == 3
|
|
|
|
# don't delete streets if bano file is empty
|
|
mocked_get.return_value = tests.utils.FakedResponse(content=b'', status_code=200)
|
|
freezer.move_to('2019-01-01 12:00:00')
|
|
job = base_adresse.add_job('update_streets_data')
|
|
assert job.status == 'registered'
|
|
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'failed'
|
|
assert Job.objects.get(id=job.id).status_details == {'error_summary': 'Exception: bano file is empty'}
|
|
assert ResourceLog.objects.all().count() == 1
|
|
assert ResourceLog.objects.all()[0].message == (
|
|
'error running update_streets_data job (bano file is empty)'
|
|
)
|
|
assert StreetModel.objects.count() == 3
|
|
|
|
with open(filepath, 'rb') as ban_file:
|
|
mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200)
|
|
|
|
StreetModel.objects.all().delete()
|
|
|
|
ResourceLog.objects.all().delete()
|
|
job = base_adresse.add_job('update_streets_data')
|
|
mocked_get.side_effect = Exception('hello')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'failed'
|
|
assert Job.objects.get(id=job.id).status_details == {'error_summary': 'Exception: hello'}
|
|
assert ResourceLog.objects.all().count() == 1
|
|
assert ResourceLog.objects.all()[0].message == 'error running update_streets_data job (hello)'
|
|
|
|
job = base_adresse.add_job('update_streets_data')
|
|
mocked_get.side_effect = SkipJob()
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
|
|
# use after_timestamp with SkipJob
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
mocked_get.side_effect = SkipJob(after_timestamp=isodate.parse_datetime('2019-01-02T00:00:00+00:00'))
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
mocked_get.side_effect = None
|
|
freezer.move_to('2019-01-01 12:00:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
freezer.move_to('2019-01-02 01:00:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
|
|
# use after_timestamp with SkipJob and seconds
|
|
job = base_adresse.add_job('update_streets_data')
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
mocked_get.side_effect = SkipJob(after_timestamp=3600)
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
mocked_get.side_effect = None
|
|
freezer.move_to('2019-01-01 00:30:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
freezer.move_to('2019-01-01 01:01:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
|
|
# use after_timestamp with SkipJob and timedelta
|
|
job = base_adresse.add_job('update_streets_data')
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
mocked_get.side_effect = SkipJob(after_timestamp=datetime.timedelta(seconds=3600))
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
mocked_get.side_effect = None
|
|
freezer.move_to('2019-01-01 00:30:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
freezer.move_to('2019-01-01 01:01:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
|
|
# use after_timestamp with add_job
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
job = base_adresse.add_job(
|
|
'update_streets_data', after_timestamp=isodate.parse_datetime('2019-01-02T00:00:00+00:00')
|
|
)
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
freezer.move_to('2019-01-02 01:00:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
|
|
# use after_timestamp with add_job and seconds
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
job = base_adresse.add_job('update_streets_data', after_timestamp=3600)
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
freezer.move_to('2019-01-01 01:01:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
|
|
# use after_timestamp with add_job and seconds
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
job = base_adresse.add_job('update_streets_data', after_timestamp=datetime.timedelta(seconds=3600))
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
freezer.move_to('2019-01-01 01:01:00')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
|
|
# don't run jobs if connector is down
|
|
StreetModel.objects.all().delete()
|
|
with mock.patch('passerelle.apps.base_adresse.models.BaseAdresse.down') as down:
|
|
down.side_effect = lambda: True
|
|
with open(filepath, 'rb') as ban_file:
|
|
mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200)
|
|
job = base_adresse.add_job('update_streets_data')
|
|
assert job.status == 'registered'
|
|
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'registered'
|
|
|
|
|
|
@mock.patch('passerelle.utils.Request.get')
|
|
def test_runjob(mocked_get, app, base_adresse, freezer):
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz')
|
|
with open(filepath, 'rb') as ban_file:
|
|
mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200)
|
|
|
|
freezer.move_to('2019-01-01 00:00:00')
|
|
job = base_adresse.add_job('update_streets_data')
|
|
assert job.status == 'registered'
|
|
|
|
call_command('runjob', '--job-id=%s' % job.pk)
|
|
assert Job.objects.get(id=job.id).status == 'completed'
|
|
assert StreetModel.objects.count() == 3
|
|
|
|
with pytest.raises(CommandError) as e:
|
|
call_command('runjob', '--job-id=%s' % job.pk)
|
|
assert 'cannot run job, status is completed' in str(e.value)
|
|
|
|
date = datetime.date(year=2019, month=1, day=2)
|
|
job = base_adresse.add_job('update_streets_data', after_timestamp=date)
|
|
|
|
with pytest.raises(CommandError) as e:
|
|
call_command('runjob', '--job-id=%s' % job.pk)
|
|
assert 'cannot run job, should be run after 2019-01-02 00:00:00+00:00' in str(e.value)
|
|
|
|
|
|
@mock.patch('passerelle.utils.Request.get')
|
|
def test_jobs_api_error_log_level(mocked_get, app, base_adresse, freezer):
|
|
Job.objects.all().delete() # remove jobs automatically added at connector creation
|
|
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz')
|
|
with open(filepath, 'rb') as ban_file:
|
|
mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200)
|
|
|
|
job = base_adresse.add_job('update_streets_data')
|
|
mocked_get.side_effect = Exception('hello')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'failed'
|
|
assert ResourceLog.objects.all().count() == 1
|
|
assert ResourceLog.objects.all()[0].level == 'error'
|
|
|
|
job = base_adresse.add_job('update_streets_data')
|
|
mocked_get.side_effect = APIError('hello')
|
|
base_adresse.jobs()
|
|
assert Job.objects.get(id=job.id).status == 'failed'
|
|
assert ResourceLog.objects.all().count() == 2
|
|
assert ResourceLog.objects.all()[1].level == 'warning'
|