wcs/tests/test_publisher.py

250 lines
8.4 KiB
Python

import json
import re
import sys
import pickle
import shutil
import os
import zipfile
import mock
import pytest
from django.conf import settings
from django.core.management import call_command
from django.core.management.base import CommandError
from django.http import Http404
from django.test import override_settings
from django.utils import six
from django.utils.six import BytesIO, StringIO
from quixote import cleanup
from wcs.qommon import get_publisher_class
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.cron import CronJob
from utilities import create_temporary_pub
def setup_module(module):
cleanup()
global pub
pub = create_temporary_pub()
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
def teardown_module(module):
shutil.rmtree(pub.APP_DIR)
def get_request():
return HTTPRequest(None, {
'SERVER_NAME': 'www.example.net',
'SCRIPT_NAME': '',
})
def test_plaintext_error():
req = get_request()
pub._set_request(req)
try:
raise Exception('foo')
except:
exc_type, exc_value, tb = sys.exc_info()
req.form = {'foo': 'bar'}
assert pub.USE_LONG_TRACES == True # use long traces by default
s = pub._generate_plaintext_error(req, None, exc_type, exc_value, tb)
assert re.findall('^foo.*bar', s, re.MULTILINE)
assert re.findall('^SERVER_NAME.*www.example.net', s, re.MULTILINE)
assert re.findall('File.*?line.*?in test_plaintext_error', s)
assert re.findall('^>.*\d+.*s = pub._generate_plaintext_error', s, re.MULTILINE)
pub.USE_LONG_TRACES = False
s = pub._generate_plaintext_error(req, None, exc_type, exc_value, tb)
assert re.findall('^foo.*bar', s, re.MULTILINE)
assert re.findall('^SERVER_NAME.*www.example.net', s, re.MULTILINE)
assert re.findall('File.*?line.*?in test_plaintext_error', s)
assert not re.findall('^>.*\d+.*s = pub._generate_plaintext_error', s, re.MULTILINE)
def test_finish_failed_request():
pub.USE_LONG_TRACES = False
try:
raise Exception('foo')
except:
exc_type, exc_value, tb = sys.exc_info()
req = get_request()
pub._set_request(req)
body = pub.finish_failed_request()
assert '<h1>Internal Server Error</h1>' in str(body)
req = get_request()
pub._set_request(req)
req.form = {'format': 'json'}
body = pub.finish_failed_request()
assert body == '{"err": 1}'
req = get_request()
pub.config.display_exceptions = 'text'
pub._set_request(req)
try:
raise Exception()
except:
body = pub.finish_failed_request()
assert 'Traceback (most recent call last)' in str(body)
assert '<div class="error-page">' not in str(body)
req = get_request()
pub.config.display_exceptions = 'text-in-html'
pub._set_request(req)
try:
raise Exception()
except:
body = pub.finish_failed_request()
assert 'Traceback (most recent call last)' in str(body)
assert '<div class="error-page">' in str(body)
def test_finish_interrupted_request():
req = HTTPRequest(StringIO(''), {
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
'CONTENT_LENGTH': 'aaa',
})
response = pub.process_request(req)
assert 'invalid content-length header' in str(response)
req = HTTPRequest(StringIO(''), {
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'CONTENT_LENGTH': '1',
})
response = pub.process_request(req)
assert 'Invalid request: unexpected end of request body' in str(response)
req = HTTPRequest(StringIO(''), {
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
'CONTENT_TYPE': 'multipart/form-data',
'CONTENT_LENGTH': '1',
})
response = pub.process_request(req)
assert 'Invalid request: multipart/form-data missing boundary' in str(response)
with pytest.raises(Http404):
req = HTTPRequest(StringIO(''), {
'SERVER_NAME': 'example.net',
'SCRIPT_NAME': '',
'PATH_INFO': '/gloubiboulga',
})
response = pub.process_request(req)
def test_get_tenants():
pub = create_temporary_pub()
open(os.path.join(pub.APP_DIR, 'xxx'), 'w').close()
os.mkdir(os.path.join(pub.APP_DIR, 'plop.invalid'))
tenants = list(pub.__class__.get_tenants())
assert 'example.net' in tenants
assert not 'xxx' in tenants
assert not 'plop.invalid' in tenants
def test_register_cronjobs():
assert not pub.cronjobs
pub.register_cronjobs()
assert 'apply_global_action_timeouts' in [x.function.__name__ for x in pub.cronjobs]
assert 'clean_sessions' in [x.function.__name__ for x in pub.cronjobs]
def test_get_default_position():
assert pub.get_default_position() == '50.84;4.36'
def test_import_config_zip():
pub = create_temporary_pub()
pub.cfg['sp'] = {'what': 'ever'}
pub.write_cfg()
c = BytesIO()
z = zipfile.ZipFile(c, 'w')
z.writestr('config.pck', pickle.dumps(
{'language': {'language': 'fr'},
'whatever': ['a', 'b', 'c']}))
z.close()
c.seek(0)
pub.import_zip(c)
assert pub.cfg['language'] == {'language': 'fr'}
assert pub.cfg['whatever'] == ['a', 'b', 'c']
assert pub.cfg['sp'] == {'what': 'ever'}
c = BytesIO()
z = zipfile.ZipFile(c, 'w')
z.writestr('config.json', json.dumps(
{'language': {'language': 'en'},
'whatever2': ['a', 'b', {'c': 'd'}]}))
z.close()
c.seek(0)
pub.import_zip(c)
assert pub.cfg['language'] == {'language': 'en'}
assert pub.cfg['sp'] == {'what': 'ever'}
if six.PY2:
assert not isinstance(pub.cfg['language'], unicode)
assert not isinstance(pub.cfg['whatever2'][-1]['c'], unicode)
def test_cron_command():
pub = create_temporary_pub()
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron')
assert cron_worker.call_count == 1
# simulate another locked cron
import tempfile
from wcs.qommon.vendor import locket
lockfile = os.path.join(tempfile.gettempdir(), 'wcs-cron-in-progress.lock')
with locket.lock_file(lockfile, timeout=0):
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron') # silent by default (verbosity=0)
assert cron_worker.call_count == 0
call_command('cron', verbosity=2) # same if verbosity>0
assert cron_worker.call_count == 0
with mock.patch('wcs.qommon.management.commands.cron.JUMP_TIMEOUT_INTERVAL', -1):
with pytest.raises(CommandError, match='can not start cron job.*seems old'):
call_command('cron')
assert cron_worker.call_count == 0
# verify that the lock is released
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron')
assert cron_worker.call_count == 1
# simulate a cron crash
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
cron_worker.side_effect = NotImplementedError
with pytest.raises(NotImplementedError):
call_command('cron')
assert cron_worker.call_count == 1
# verify that the lock is released
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron')
assert cron_worker.call_count == 1
# disable cron system
with override_settings(DISABLE_CRON_JOBS=True):
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
call_command('cron')
assert cron_worker.call_count == 0
# run a specific job
jobs = []
def job1(pub):
jobs.append('job1')
def job2(pub):
jobs.append('job2')
def job3(pub):
jobs.append('job3')
@classmethod
def register_test_cronjobs(cls):
cls.register_cronjob(CronJob(job1, days=[10]))
cls.register_cronjob(CronJob(job2, name='job2', days=[10]))
cls.register_cronjob(CronJob(job3, name='job3', days=[10]))
with mock.patch('wcs.publisher.WcsPublisher.register_cronjobs', register_test_cronjobs):
get_publisher_class().cronjobs = []
call_command('cron', job_name='job1')
assert jobs == []
get_publisher_class().cronjobs = []
call_command('cron', job_name='job2')
assert jobs == ['job2']