import datetime import os from unittest import mock import isodate import pytest from django.core.management import call_command from django.core.management.base import CommandError import tests.utils from passerelle.base.models import Job, ResourceLog, SkipJob from passerelle.utils.jsonresponse import APIError from tests.test_base_adresse import BaseAdresse, StreetModel @pytest.fixture def base_adresse(db): return tests.utils.setup_access_rights(BaseAdresse.objects.create(slug='base-adresse', zipcode='73')) @mock.patch('passerelle.utils.Request.get') def test_jobs(mocked_get, app, base_adresse, freezer): Job.objects.all().delete() # remove jobs automatically added at connector creation filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz') with open(filepath, 'rb') as ban_file: mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200) freezer.move_to('2019-01-01 00:00:00') job = base_adresse.add_job('update_streets_data') assert job.status == 'registered' base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'completed' assert StreetModel.objects.count() == 3 # don't delete streets if bano file is empty mocked_get.return_value = tests.utils.FakedResponse(content=b'', status_code=200) freezer.move_to('2019-01-01 12:00:00') job = base_adresse.add_job('update_streets_data') assert job.status == 'registered' base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'failed' assert Job.objects.get(id=job.id).status_details == {'error_summary': 'Exception: bano file is empty'} assert ResourceLog.objects.all().count() == 1 assert ResourceLog.objects.all()[0].message == ( 'error running update_streets_data job (bano file is empty)' ) assert StreetModel.objects.count() == 3 with open(filepath, 'rb') as ban_file: mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200) StreetModel.objects.all().delete() ResourceLog.objects.all().delete() job = base_adresse.add_job('update_streets_data') mocked_get.side_effect = Exception('hello') base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'failed' assert Job.objects.get(id=job.id).status_details == {'error_summary': 'Exception: hello'} assert ResourceLog.objects.all().count() == 1 assert ResourceLog.objects.all()[0].message == 'error running update_streets_data job (hello)' job = base_adresse.add_job('update_streets_data') mocked_get.side_effect = SkipJob() base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'registered' # use after_timestamp with SkipJob freezer.move_to('2019-01-01 00:00:00') mocked_get.side_effect = SkipJob(after_timestamp=isodate.parse_datetime('2019-01-02T00:00:00+00:00')) base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'registered' mocked_get.side_effect = None freezer.move_to('2019-01-01 12:00:00') base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'registered' freezer.move_to('2019-01-02 01:00:00') base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'completed' # use after_timestamp with SkipJob and seconds job = base_adresse.add_job('update_streets_data') freezer.move_to('2019-01-01 00:00:00') mocked_get.side_effect = SkipJob(after_timestamp=3600) base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'registered' mocked_get.side_effect = None freezer.move_to('2019-01-01 00:30:00') base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'registered' freezer.move_to('2019-01-01 01:01:00') base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'completed' # use after_timestamp with SkipJob and timedelta job = base_adresse.add_job('update_streets_data') freezer.move_to('2019-01-01 00:00:00') mocked_get.side_effect = SkipJob(after_timestamp=datetime.timedelta(seconds=3600)) base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'registered' mocked_get.side_effect = None freezer.move_to('2019-01-01 00:30:00') base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'registered' freezer.move_to('2019-01-01 01:01:00') base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'completed' # use after_timestamp with add_job freezer.move_to('2019-01-01 00:00:00') job = base_adresse.add_job( 'update_streets_data', after_timestamp=isodate.parse_datetime('2019-01-02T00:00:00+00:00') ) base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'registered' freezer.move_to('2019-01-02 01:00:00') base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'completed' # use after_timestamp with add_job and seconds freezer.move_to('2019-01-01 00:00:00') job = base_adresse.add_job('update_streets_data', after_timestamp=3600) base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'registered' freezer.move_to('2019-01-01 01:01:00') base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'completed' # use after_timestamp with add_job and seconds freezer.move_to('2019-01-01 00:00:00') job = base_adresse.add_job('update_streets_data', after_timestamp=datetime.timedelta(seconds=3600)) base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'registered' freezer.move_to('2019-01-01 01:01:00') base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'completed' # don't run jobs if connector is down StreetModel.objects.all().delete() with mock.patch('passerelle.apps.base_adresse.models.BaseAdresse.down') as down: down.side_effect = lambda: True with open(filepath, 'rb') as ban_file: mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200) job = base_adresse.add_job('update_streets_data') assert job.status == 'registered' base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'registered' @mock.patch('passerelle.utils.Request.get') def test_runjob(mocked_get, app, base_adresse, freezer): filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz') with open(filepath, 'rb') as ban_file: mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200) freezer.move_to('2019-01-01 00:00:00') job = base_adresse.add_job('update_streets_data') assert job.status == 'registered' call_command('runjob', '--job-id=%s' % job.pk) assert Job.objects.get(id=job.id).status == 'completed' assert StreetModel.objects.count() == 3 with pytest.raises(CommandError) as e: call_command('runjob', '--job-id=%s' % job.pk) assert 'cannot run job, status is completed' in str(e.value) date = datetime.date(year=2019, month=1, day=2) job = base_adresse.add_job('update_streets_data', after_timestamp=date) with pytest.raises(CommandError) as e: call_command('runjob', '--job-id=%s' % job.pk) assert 'cannot run job, should be run after 2019-01-02 00:00:00+00:00' in str(e.value) @mock.patch('passerelle.utils.Request.get') def test_jobs_api_error_log_level(mocked_get, app, base_adresse, freezer): Job.objects.all().delete() # remove jobs automatically added at connector creation filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz') with open(filepath, 'rb') as ban_file: mocked_get.return_value = tests.utils.FakedResponse(content=ban_file.read(), status_code=200) job = base_adresse.add_job('update_streets_data') mocked_get.side_effect = Exception('hello') base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'failed' assert ResourceLog.objects.all().count() == 1 assert ResourceLog.objects.all()[0].level == 'error' job = base_adresse.add_job('update_streets_data') mocked_get.side_effect = APIError('hello') base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'failed' assert ResourceLog.objects.all().count() == 2 assert ResourceLog.objects.all()[1].level == 'warning'