216 lines
7.4 KiB
Python
216 lines
7.4 KiB
Python
import cPickle
|
|
import json
|
|
import re
|
|
import sys
|
|
import shutil
|
|
import StringIO
|
|
import os
|
|
import zipfile
|
|
|
|
import mock
|
|
import pytest
|
|
|
|
from django.conf import settings
|
|
from django.core.management import call_command
|
|
from django.core.management.base import CommandError
|
|
from django.test import override_settings
|
|
from quixote import cleanup
|
|
from wcs.qommon.http_request import HTTPRequest
|
|
|
|
from utilities import create_temporary_pub
|
|
|
|
def setup_module(module):
|
|
cleanup()
|
|
global pub
|
|
pub = create_temporary_pub()
|
|
pub.cfg['language'] = {'language': 'en'}
|
|
pub.write_cfg()
|
|
|
|
def teardown_module(module):
|
|
shutil.rmtree(pub.APP_DIR)
|
|
|
|
def get_request():
|
|
return HTTPRequest(None, {
|
|
'SERVER_NAME': 'www.example.net',
|
|
'SCRIPT_NAME': '',
|
|
})
|
|
|
|
def test_plaintext_error():
|
|
req = get_request()
|
|
pub._set_request(req)
|
|
try:
|
|
raise Exception('foo')
|
|
except:
|
|
exc_type, exc_value, tb = sys.exc_info()
|
|
req.form = {'foo': 'bar'}
|
|
assert pub.USE_LONG_TRACES == True # use long traces by default
|
|
s = pub._generate_plaintext_error(req, None, exc_type, exc_value, tb)
|
|
assert re.findall('^foo.*bar', s, re.MULTILINE)
|
|
assert re.findall('^SERVER_NAME.*www.example.net', s, re.MULTILINE)
|
|
assert re.findall('File.*?line.*?in test_plaintext_error', s)
|
|
assert re.findall('^>.*\d+.*s = pub._generate_plaintext_error', s, re.MULTILINE)
|
|
|
|
pub.USE_LONG_TRACES = False
|
|
s = pub._generate_plaintext_error(req, None, exc_type, exc_value, tb)
|
|
assert re.findall('^foo.*bar', s, re.MULTILINE)
|
|
assert re.findall('^SERVER_NAME.*www.example.net', s, re.MULTILINE)
|
|
assert re.findall('File.*?line.*?in test_plaintext_error', s)
|
|
assert not re.findall('^>.*\d+.*s = pub._generate_plaintext_error', s, re.MULTILINE)
|
|
|
|
def test_finish_failed_request():
|
|
req = get_request()
|
|
pub._set_request(req)
|
|
body = pub.finish_failed_request()
|
|
assert '<h1>Internal Server Error</h1>' in body
|
|
|
|
req = get_request()
|
|
pub._set_request(req)
|
|
req.form = {'format': 'json'}
|
|
body = pub.finish_failed_request()
|
|
assert body == '{"err": 1}'
|
|
|
|
req = get_request()
|
|
pub.config.display_exceptions = 'text'
|
|
pub._set_request(req)
|
|
try:
|
|
raise Exception()
|
|
except:
|
|
body = pub.finish_failed_request()
|
|
assert 'Traceback (most recent call last)' in str(body)
|
|
assert '<div class="error-page">' not in str(body)
|
|
|
|
req = get_request()
|
|
pub.config.display_exceptions = 'text-in-html'
|
|
pub._set_request(req)
|
|
try:
|
|
raise Exception()
|
|
except:
|
|
body = pub.finish_failed_request()
|
|
assert 'Traceback (most recent call last)' in str(body)
|
|
assert '<div class="error-page">' in str(body)
|
|
|
|
def test_finish_interrupted_request():
|
|
req = HTTPRequest(StringIO.StringIO(''), {
|
|
'SERVER_NAME': 'example.net',
|
|
'SCRIPT_NAME': '',
|
|
'CONTENT_LENGTH': 'aaa',
|
|
})
|
|
response = pub.process_request(req)
|
|
assert 'invalid content-length header' in str(response)
|
|
req = HTTPRequest(StringIO.StringIO(''), {
|
|
'SERVER_NAME': 'example.net',
|
|
'SCRIPT_NAME': '',
|
|
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
|
|
'CONTENT_LENGTH': '1',
|
|
})
|
|
response = pub.process_request(req)
|
|
assert 'Invalid request: unexpected end of request body' in str(response)
|
|
req = HTTPRequest(StringIO.StringIO(''), {
|
|
'SERVER_NAME': 'example.net',
|
|
'SCRIPT_NAME': '',
|
|
'CONTENT_TYPE': 'multipart/form-data',
|
|
'CONTENT_LENGTH': '1',
|
|
})
|
|
response = pub.process_request(req)
|
|
assert 'Invalid request: multipart/form-data missing boundary' in str(response)
|
|
req = HTTPRequest(StringIO.StringIO(''), {
|
|
'SERVER_NAME': 'example.net',
|
|
'SCRIPT_NAME': '',
|
|
'PATH_INFO': '/gloubiboulga',
|
|
})
|
|
response = pub.process_request(req)
|
|
assert '<p>The requested link' in str(response)
|
|
|
|
def test_get_tenants():
|
|
pub = create_temporary_pub()
|
|
open(os.path.join(pub.APP_DIR, 'xxx'), 'w').close()
|
|
os.mkdir(os.path.join(pub.APP_DIR, 'plop.invalid'))
|
|
tenants = list(pub.__class__.get_tenants())
|
|
assert 'example.net' in tenants
|
|
assert not 'xxx' in tenants
|
|
assert not 'plop.invalid' in tenants
|
|
|
|
def test_register_cronjobs():
|
|
assert not pub.cronjobs
|
|
pub.register_cronjobs()
|
|
assert 'apply_global_action_timeouts' in [x.function.func_name for x in pub.cronjobs]
|
|
assert 'clean_sessions' in [x.function.func_name for x in pub.cronjobs]
|
|
|
|
def test_get_default_position():
|
|
assert pub.get_default_position() == '50.84;4.36'
|
|
|
|
def test_import_config_zip():
|
|
pub = create_temporary_pub()
|
|
pub.cfg['sp'] = {'what': 'ever'}
|
|
pub.write_cfg()
|
|
|
|
c = StringIO.StringIO()
|
|
z = zipfile.ZipFile(c, 'w')
|
|
z.writestr('config.pck', cPickle.dumps(
|
|
{'language': {'language': 'fr'},
|
|
'whatever': ['a', 'b', 'c']}))
|
|
z.close()
|
|
c.seek(0)
|
|
|
|
pub.import_zip(c)
|
|
assert pub.cfg['language'] == {'language': 'fr'}
|
|
assert pub.cfg['whatever'] == ['a', 'b', 'c']
|
|
assert pub.cfg['sp'] == {'what': 'ever'}
|
|
|
|
c = StringIO.StringIO()
|
|
z = zipfile.ZipFile(c, 'w')
|
|
z.writestr('config.json', json.dumps(
|
|
{'language': {'language': 'en'},
|
|
'whatever2': ['a', 'b', {'c': 'd'}]}))
|
|
z.close()
|
|
c.seek(0)
|
|
|
|
pub.import_zip(c)
|
|
assert pub.cfg['language'] == {'language': 'en'}
|
|
assert pub.cfg['sp'] == {'what': 'ever'}
|
|
assert not isinstance(pub.cfg['language'], unicode)
|
|
assert not isinstance(pub.cfg['whatever2'][-1]['c'], unicode)
|
|
|
|
def test_cron_command():
|
|
pub = create_temporary_pub()
|
|
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
|
|
call_command('cron')
|
|
assert cron_worker.call_count == 1
|
|
|
|
# simulate another locked cron
|
|
import tempfile
|
|
from qommon.vendor import locket
|
|
lockfile = os.path.join(tempfile.gettempdir(), 'wcs-cron-in-progress.lock')
|
|
with locket.lock_file(lockfile, timeout=0):
|
|
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
|
|
call_command('cron') # silent by default (verbosity=0)
|
|
assert cron_worker.call_count == 0
|
|
call_command('cron', verbosity=2) # same if verbosity>0
|
|
assert cron_worker.call_count == 0
|
|
with mock.patch('wcs.qommon.management.commands.cron.JUMP_TIMEOUT_INTERVAL', -1):
|
|
with pytest.raises(CommandError, match='can not start cron job.*seems old'):
|
|
call_command('cron')
|
|
assert cron_worker.call_count == 0
|
|
|
|
# verify that the lock is released
|
|
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
|
|
call_command('cron')
|
|
assert cron_worker.call_count == 1
|
|
|
|
# simulate a cron crash
|
|
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
|
|
cron_worker.side_effect = NotImplementedError
|
|
with pytest.raises(NotImplementedError):
|
|
call_command('cron')
|
|
assert cron_worker.call_count == 1
|
|
# verify that the lock is released
|
|
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
|
|
call_command('cron')
|
|
assert cron_worker.call_count == 1
|
|
|
|
# disable cron system
|
|
with override_settings(DISABLE_CRON_JOBS=True):
|
|
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
|
|
call_command('cron')
|
|
assert cron_worker.call_count == 0
|