wcs/tests/test_publisher.py

346 lines
11 KiB
Python

import io
import json
import os
import pickle
import re
import shutil
import sys
import tempfile
import time
import zipfile
import mock
import pytest
from django.core.management import call_command
from django.core.management.base import CommandError
from django.http import Http404
from django.test import override_settings
from quixote import cleanup
from wcs.qommon import get_publisher_class
from wcs.qommon.afterjobs import AfterJob
from wcs.qommon.cron import CronJob
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.publisher import Tenant
from .utilities import create_temporary_pub
def setup_module(module):
cleanup()
global pub
pub = create_temporary_pub()
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
def teardown_module(module):
shutil.rmtree(pub.APP_DIR)
def get_request():
return HTTPRequest(
None,
{
'SERVER_NAME': 'www.example.net',
'SCRIPT_NAME': '',
},
)
def test_plaintext_error():
req = get_request()
pub._set_request(req)
try:
raise Exception('foo')
except Exception:
exc_type, exc_value, tb = sys.exc_info()
req.form = {'foo': 'bar'}
assert pub.USE_LONG_TRACES is True # use long traces by default
s = pub._generate_plaintext_error(req, None, exc_type, exc_value, tb)
assert re.findall('^foo.*bar', s, re.MULTILINE)
assert re.findall('^SERVER_NAME.*www.example.net', s, re.MULTILINE)
assert re.findall('File.*?line.*?in test_plaintext_error', s)
assert re.findall(r'^>.*\d+.*s = pub._generate_plaintext_error', s, re.MULTILINE)
pub.USE_LONG_TRACES = False
s = pub._generate_plaintext_error(req, None, exc_type, exc_value, tb)
assert re.findall('^foo.*bar', s, re.MULTILINE)
assert re.findall('^SERVER_NAME.*www.example.net', s, re.MULTILINE)
assert re.findall('File.*?line.*?in test_plaintext_error', s)
assert not re.findall(r'^>.*\d+.*s = pub._generate_plaintext_error', s, re.MULTILINE)
def test_finish_failed_request():
pub.USE_LONG_TRACES = False
req = get_request()
pub._set_request(req)
body = pub.finish_failed_request()
assert '<h1>Internal Server Error</h1>' in str(body)
req = get_request()
pub._set_request(req)
req.form = {'format': 'json'}
body = pub.finish_failed_request()
assert body == '{"err": 1}'
req = get_request()
pub.config.display_exceptions = 'text'
pub._set_request(req)
try:
raise Exception()
except Exception:
body = pub.finish_failed_request()
assert 'Traceback (most recent call last)' in str(body)
assert '<div class="error-page">' not in str(body)
req = get_request()
pub.config.display_exceptions = 'text-in-html'
pub._set_request(req)
try:
raise Exception()
except Exception:
body = pub.finish_failed_request()
assert 'Traceback (most recent call last)' in str(body)
assert '<div class="error-page">' in str(body)
def test_finish_interrupted_request():
req = HTTPRequest(
io.StringIO(''),
{
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
'CONTENT_LENGTH': 'aaa',
},
)
response = pub.process_request(req)
assert b'invalid content-length header' in response.getvalue()
req = HTTPRequest(
io.StringIO(''),
{
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'CONTENT_LENGTH': '1',
},
)
response = pub.process_request(req)
assert b'Invalid request: unexpected end of request body' in response.getvalue()
req = HTTPRequest(
io.StringIO(''),
{
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
'CONTENT_TYPE': 'multipart/form-data',
'CONTENT_LENGTH': '1',
},
)
response = pub.process_request(req)
assert b'Invalid request: multipart/form-data missing boundary' in response.getvalue()
with pytest.raises(Http404):
req = HTTPRequest(
io.StringIO(''),
{
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
'PATH_INFO': '/gloubiboulga',
},
)
response = pub.process_request(req)
def test_get_tenants():
pub = create_temporary_pub()
open(os.path.join(pub.APP_DIR, 'xxx'), 'w').close()
os.mkdir(os.path.join(pub.APP_DIR, 'plop.invalid'))
hostnames = [x.hostname for x in pub.__class__.get_tenants()]
assert 'example.net' in hostnames
assert 'xxx' not in hostnames
assert 'plop.invalid' not in hostnames
def test_register_cronjobs():
pub.register_cronjobs()
assert 'apply_global_action_timeouts' in [x.function.__name__ for x in pub.cronjobs]
assert 'clean_sessions' in [x.function.__name__ for x in pub.cronjobs]
assert 'evaluate_jumps' in [x.name for x in pub.cronjobs]
def test_get_default_position():
assert pub.get_default_position() == '50.84;4.36'
def test_import_config_zip():
pub = create_temporary_pub()
pub.cfg['sp'] = {'what': 'ever'}
pub.write_cfg()
c = io.BytesIO()
with zipfile.ZipFile(c, 'w') as z:
z.writestr('config.pck', pickle.dumps({'language': {'language': 'fr'}, 'whatever': ['a', 'b', 'c']}))
c.seek(0)
pub.import_zip(c)
assert pub.cfg['language'] == {'language': 'fr'}
assert pub.cfg['whatever'] == ['a', 'b', 'c']
assert pub.cfg['sp'] == {'what': 'ever'}
c = io.BytesIO()
with zipfile.ZipFile(c, 'w') as z:
z.writestr(
'config.json', json.dumps({'language': {'language': 'en'}, 'whatever2': ['a', 'b', {'c': 'd'}]})
)
c.seek(0)
pub.import_zip(c)
assert pub.cfg['language'] == {'language': 'en'}
assert pub.cfg['sp'] == {'what': 'ever'}
def test_cron_command():
pub = create_temporary_pub()
with mock.patch('tempfile.gettempdir') as gettempdir:
gettempdir.side_effect = lambda: pub.app_dir
hostnames = ['example.net', 'foo.bar', 'something.com']
for hostname in hostnames:
if not os.path.exists(os.path.join(pub.APP_DIR, hostname)):
os.mkdir(os.path.join(pub.APP_DIR, hostname))
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
with mock.patch('wcs.qommon.publisher.QommonPublisher.get_tenants') as mock_tenants:
mock_tenants.return_value = [
Tenant(os.path.join(pub.app_dir, x)) for x in ('example.net', 'foo.bar', 'something.com')
]
call_command('cron')
assert cron_worker.call_count == 3
cron_worker.reset_mock()
call_command('cron', domain='example.net')
assert cron_worker.call_count == 1
# simulate another locked cron
from wcs.qommon.vendor import locket
lockfile = os.path.join(tempfile.gettempdir(), 'wcs-cron-in-progress.lock')
with locket.lock_file(lockfile, timeout=0):
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron') # silent by default (verbosity=0)
assert cron_worker.call_count == 0
call_command('cron', verbosity=2) # same if verbosity>0
assert cron_worker.call_count == 0
with mock.patch('wcs.qommon.management.commands.cron.JUMP_TIMEOUT_INTERVAL', -1):
with pytest.raises(CommandError, match='can not start cron job.*seems old'):
call_command('cron')
assert cron_worker.call_count == 0
# verify that the lock is released
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron', domain='example.net')
assert cron_worker.call_count == 1
# simulate a cron crash
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
cron_worker.side_effect = NotImplementedError
with pytest.raises(NotImplementedError):
call_command('cron', domain='example.net')
assert cron_worker.call_count == 1
# verify that the lock is released
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron', domain='example.net')
assert cron_worker.call_count == 1
# disable cron system
with override_settings(DISABLE_CRON_JOBS=True):
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron', domain='example.net')
assert cron_worker.call_count == 0
# run a specific job
jobs = []
def job1(pub):
jobs.append('job1')
def job2(pub):
jobs.append('job2')
def job3(pub):
jobs.append('job3')
@classmethod
def register_test_cronjobs(cls):
cls.register_cronjob(CronJob(job1, days=[10]))
cls.register_cronjob(CronJob(job2, name='job2', days=[10]))
cls.register_cronjob(CronJob(job3, name='job3', days=[10]))
with mock.patch('wcs.publisher.WcsPublisher.register_cronjobs', register_test_cronjobs):
get_publisher_class().cronjobs = []
call_command('cron', job_name='job1', domain='example.net')
assert jobs == []
get_publisher_class().cronjobs = []
call_command('cron', job_name='job2', domain='example.net')
assert jobs == ['job2']
def test_clean_afterjobs():
pub = create_temporary_pub()
job1 = AfterJob()
job1.status = 'completed'
job1.creation_time = time.time() - 3 * 3600
job1.completion_time = time.time() - 3 * 3600
job1.store()
job2 = AfterJob()
job2.status = 'completed'
job2.creation_time = time.time()
job2.completion_time = time.time()
job2.store()
job3 = AfterJob()
job3.status = 'running'
job3.creation_time = time.time() - 3 * 86400
job3.store()
pub.clean_afterjobs()
assert AfterJob.count() == 1
assert AfterJob.select()[0].id == job2.id
def test_clean_tempfiles():
pub = create_temporary_pub()
pub.clean_tempfiles()
dirname = os.path.join(pub.app_dir, 'tempfiles')
if not os.path.exists(dirname):
os.mkdir(dirname)
with open(os.path.join(dirname, 'a'), 'w') as fd:
fd.write('a')
with open(os.path.join(dirname, 'b'), 'w') as fd:
os.utime(fd.fileno(), times=(time.time() - 40 * 86400, time.time() - 40 * 86400))
pub.clean_tempfiles()
assert os.listdir(dirname) == ['a']
def test_clean_thumbnails():
pub = create_temporary_pub()
pub.clean_thumbnails()
dirname = os.path.join(pub.app_dir, 'thumbs')
if not os.path.exists(dirname):
os.mkdir(dirname)
with open(os.path.join(dirname, 'a'), 'w') as fd:
fd.write('a')
with open(os.path.join(dirname, 'b'), 'w') as fd:
os.utime(fd.fileno(), times=(time.time() - 40 * 86400, time.time() - 40 * 86400))
pub.clean_thumbnails()
assert os.listdir(dirname) == ['a']