tests: set publisher app dir as temporary directory

This commit is contained in:
Frédéric Péters 2020-06-28 11:25:38 +02:00
parent 8630f05a50
commit 2b562bcb1e
1 changed files with 60 additions and 57 deletions

View File

@ -3,6 +3,7 @@ import re
import sys
import pickle
import shutil
import tempfile
import os
import zipfile
@ -193,66 +194,68 @@ def test_import_config_zip():
def test_cron_command():
pub = create_temporary_pub()
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron')
assert cron_worker.call_count == 1
with mock.patch('tempfile.gettempdir') as gettempdir:
gettempdir.side_effect = lambda: pub.app_dir
# simulate another locked cron
import tempfile
from wcs.qommon.vendor import locket
lockfile = os.path.join(tempfile.gettempdir(), 'wcs-cron-in-progress.lock')
with locket.lock_file(lockfile, timeout=0):
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron') # silent by default (verbosity=0)
assert cron_worker.call_count == 0
call_command('cron', verbosity=2) # same if verbosity>0
assert cron_worker.call_count == 0
with mock.patch('wcs.qommon.management.commands.cron.JUMP_TIMEOUT_INTERVAL', -1):
with pytest.raises(CommandError, match='can not start cron job.*seems old'):
call_command('cron')
assert cron_worker.call_count == 0
# verify that the lock is released
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron')
assert cron_worker.call_count == 1
# simulate a cron crash
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
cron_worker.side_effect = NotImplementedError
with pytest.raises(NotImplementedError):
call_command('cron')
assert cron_worker.call_count == 1
# verify that the lock is released
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron')
assert cron_worker.call_count == 1
# disable cron system
with override_settings(DISABLE_CRON_JOBS=True):
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron')
assert cron_worker.call_count == 0
assert cron_worker.call_count == 1
# run a specific job
jobs = []
def job1(pub):
jobs.append('job1')
def job2(pub):
jobs.append('job2')
def job3(pub):
jobs.append('job3')
# simulate another locked cron
from wcs.qommon.vendor import locket
lockfile = os.path.join(tempfile.gettempdir(), 'wcs-cron-in-progress.lock')
with locket.lock_file(lockfile, timeout=0):
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron') # silent by default (verbosity=0)
assert cron_worker.call_count == 0
call_command('cron', verbosity=2) # same if verbosity>0
assert cron_worker.call_count == 0
with mock.patch('wcs.qommon.management.commands.cron.JUMP_TIMEOUT_INTERVAL', -1):
with pytest.raises(CommandError, match='can not start cron job.*seems old'):
call_command('cron')
assert cron_worker.call_count == 0
@classmethod
def register_test_cronjobs(cls):
cls.register_cronjob(CronJob(job1, days=[10]))
cls.register_cronjob(CronJob(job2, name='job2', days=[10]))
cls.register_cronjob(CronJob(job3, name='job3', days=[10]))
# verify that the lock is released
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron')
assert cron_worker.call_count == 1
with mock.patch('wcs.publisher.WcsPublisher.register_cronjobs', register_test_cronjobs):
get_publisher_class().cronjobs = []
call_command('cron', job_name='job1')
assert jobs == []
get_publisher_class().cronjobs = []
call_command('cron', job_name='job2')
assert jobs == ['job2']
# simulate a cron crash
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
cron_worker.side_effect = NotImplementedError
with pytest.raises(NotImplementedError):
call_command('cron')
assert cron_worker.call_count == 1
# verify that the lock is released
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron')
assert cron_worker.call_count == 1
# disable cron system
with override_settings(DISABLE_CRON_JOBS=True):
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron')
assert cron_worker.call_count == 0
# run a specific job
jobs = []
def job1(pub):
jobs.append('job1')
def job2(pub):
jobs.append('job2')
def job3(pub):
jobs.append('job3')
@classmethod
def register_test_cronjobs(cls):
cls.register_cronjob(CronJob(job1, days=[10]))
cls.register_cronjob(CronJob(job2, name='job2', days=[10]))
cls.register_cronjob(CronJob(job3, name='job3', days=[10]))
with mock.patch('wcs.publisher.WcsPublisher.register_cronjobs', register_test_cronjobs):
get_publisher_class().cronjobs = []
call_command('cron', job_name='job1')
assert jobs == []
get_publisher_class().cronjobs = []
call_command('cron', job_name='job2')
assert jobs == ['job2']