196 lines
8.0 KiB
Python
196 lines
8.0 KiB
Python
from unittest import mock
|
|
|
|
from dns.exception import DNSException
|
|
from dns.resolver import NXDOMAIN, NoAnswer
|
|
|
|
from hobo.environment.models import Variable
|
|
from hobo.environment.utils import get_setting_variable
|
|
from hobo.maintenance.management.commands.disable_maintenance_page import Command
|
|
from hobo.maintenance.utils import check_dnswl
|
|
|
|
from .test_manager import login
|
|
|
|
|
|
def test_maintenance_middleware(app, admin_user, db, monkeypatch, settings):
|
|
app = login(app)
|
|
resp = app.get('/')
|
|
assert resp.status_code == 200
|
|
|
|
settings.MAINTENANCE_PAGE = True
|
|
resp = app.get('/', status=503)
|
|
assert 'This site is currently unavailable.' in resp.text
|
|
|
|
# check custom maintenance message
|
|
settings.MAINTENANCE_PAGE_MESSAGE = 'foobar'
|
|
resp = app.get('/', status=503)
|
|
assert 'foobar' in resp.text
|
|
|
|
settings.MAINTENANCE_PASS_THROUGH_IPS = ['127.0.0.1']
|
|
resp = app.get('/')
|
|
assert resp.status_code == 200
|
|
|
|
settings.MAINTENANCE_PASS_THROUGH_IPS = ['127.0.0.100', '127.0.0.0/16']
|
|
resp = app.get('/')
|
|
assert resp.status_code == 200
|
|
|
|
settings.MAINTENANCE_PASS_THROUGH_IPS = ['127.0.0.1/4'] # lenient ipaddress.ip_network parsing
|
|
app.get('/')
|
|
assert resp.status_code == 200
|
|
|
|
settings.MAINTENANCE_PASS_THROUGH_IPS = ['128.0.0.0/24']
|
|
resp = app.get('/', status=503)
|
|
assert 'foobar' in resp.text
|
|
|
|
settings.MAINTENANCE_PASS_THROUGH_IPS = []
|
|
resp = app.get('/', status=503)
|
|
assert 'foobar' in resp.text
|
|
|
|
settings.MAINTENANCE_PASS_THROUGH_HEADER = 'X-Entrouvert'
|
|
resp = app.get('/', headers={'X-Entrouvert': 'yes'})
|
|
assert resp.status_code == 200
|
|
|
|
settings.MAINTENANCE_PASS_THROUGH_HEADER = ''
|
|
settings.MAINTENANCE_PASS_THROUGH_DNSWL = 'dnswl.example.com'
|
|
with mock.patch('hobo.maintenance.utils.check_dnswl', return_value=True):
|
|
resp = app.get('/')
|
|
assert resp.status_code == 200
|
|
with mock.patch('hobo.maintenance.utils.check_dnswl', return_value=False):
|
|
resp = app.get('/', status=503)
|
|
assert 'foobar' in resp.text
|
|
|
|
|
|
def test_check_dnswl():
|
|
# existing dnswl answers, ipv4
|
|
with mock.patch('hobo.maintenance.utils._resolver_resolve') as mock_resolve:
|
|
result = mock.Mock()
|
|
result.address = '1.2.3.4'
|
|
result2 = mock.Mock()
|
|
result2.address = '4.5.6.7'
|
|
mock_resolve.return_value = [result, result2]
|
|
assert check_dnswl('dnswl.example.com', '127.0.0.1')
|
|
|
|
# no known dnswl, ipv4
|
|
with mock.patch('hobo.maintenance.utils._resolver_resolve') as mock_resolve:
|
|
mock_resolve.return_value = []
|
|
assert not check_dnswl('dnswl.example.com', '127.0.0.1')
|
|
|
|
# existing dnswl answers, ipv6
|
|
with mock.patch('hobo.maintenance.utils._resolver_resolve') as mock_resolve:
|
|
result = mock.Mock()
|
|
result.address = '2001:db8:3333:4444:CCCC:DDDD:EEEE:FFFF'
|
|
result2 = mock.Mock()
|
|
result2.address = '2001:db8:3333:4444:CCCC:DDDD:FFFF:FFFF'
|
|
mock_resolve.return_value = [result, result2]
|
|
assert check_dnswl('dnswl.example.com', '::1')
|
|
|
|
# no known dnswl, ipv6
|
|
with mock.patch('hobo.maintenance.utils._resolver_resolve') as mock_resolve:
|
|
mock_resolve.return_value = []
|
|
assert not check_dnswl('dnswl.example.com', '::1')
|
|
|
|
# misc exceptions
|
|
with mock.patch(
|
|
'hobo.maintenance.utils._resolver_resolve', side_effect=NXDOMAIN('DNS query name does not exist')
|
|
) as mock_resolve:
|
|
result = mock.Mock()
|
|
result.address = '1.2.3.4'
|
|
result2 = mock.Mock()
|
|
result2.address = '4.5.6.7'
|
|
mock_resolve.return_value = [result, result2]
|
|
assert not check_dnswl('dnswl.example.com', '127.0.0.1')
|
|
|
|
with mock.patch(
|
|
'hobo.maintenance.utils._resolver_resolve',
|
|
side_effect=NoAnswer('DNS response does not contain the answer'),
|
|
) as mock_resolve:
|
|
result = mock.Mock()
|
|
result.address = '1.2.3.4'
|
|
result2 = mock.Mock()
|
|
result2.address = '4.5.6.7'
|
|
mock_resolve.return_value = [result, result2]
|
|
assert not check_dnswl('dnswl.example.com', '127.0.0.1')
|
|
|
|
with mock.patch(
|
|
'hobo.maintenance.utils._resolver_resolve', side_effect=DNSException('Error while retrieving DNSWL')
|
|
) as mock_resolve:
|
|
result = mock.Mock()
|
|
result.address = '1.2.3.4'
|
|
result2 = mock.Mock()
|
|
result2.address = '4.5.6.7'
|
|
mock_resolve.return_value = [result, result2]
|
|
assert not check_dnswl('dnswl.example.com', '127.0.0.1')
|
|
|
|
|
|
def test_manage(app, admin_user, settings):
|
|
assert Variable.objects.filter(name='SETTING_MAINTENANCE_PAGE').count() == 0
|
|
assert Variable.objects.filter(name='SETTING_MAINTENANCE_MESSAGE').count() == 0
|
|
assert Variable.objects.filter(name='SETTING_MAINTENANCE_PASS_THROUGH_HEADER').count() == 0
|
|
assert Variable.objects.filter(name='TENANT_DISABLE_CRON_JOBS').count() == 0
|
|
assert not getattr(settings, 'MAINTENANCE_PASS_THROUGH_IPS', [])
|
|
|
|
login(app)
|
|
resp = app.get('/')
|
|
assert 'Maintenance' not in resp.text
|
|
settings.MAINTENANCE_PASS_THROUGH_IPS = ['127.0.0.1']
|
|
resp = app.get('/')
|
|
assert 'Maintenance' in resp.text
|
|
|
|
resp = app.get('/maintenance/')
|
|
resp.form.set('maintenance_page', True)
|
|
resp.form.set('maintenance_page_message', 'Foo')
|
|
resp.form.set('maintenance_pass_through_header', 'X-Entrouvert')
|
|
resp.form.set('disable_cron', True)
|
|
resp = resp.form.submit().follow()
|
|
assert Variable.objects.filter(name='SETTING_MAINTENANCE_PAGE').get().value == 'true'
|
|
assert Variable.objects.filter(name='SETTING_MAINTENANCE_PAGE_MESSAGE').get().value == 'Foo'
|
|
assert (
|
|
Variable.objects.filter(name='SETTING_MAINTENANCE_PASS_THROUGH_HEADER').get().value == 'X-Entrouvert'
|
|
)
|
|
assert Variable.objects.filter(name='SETTING_TENANT_DISABLE_CRON_JOBS').get().value == 'true'
|
|
|
|
resp.form.set('maintenance_page', False)
|
|
resp.form.set('maintenance_page_message', '')
|
|
resp.form.set('maintenance_pass_through_header', '')
|
|
resp.form.set('disable_cron', False)
|
|
resp = resp.form.submit().follow()
|
|
assert Variable.objects.filter(name='SETTING_MAINTENANCE_PAGE').get().value == 'false'
|
|
assert Variable.objects.filter(name='SETTING_MAINTENANCE_PAGE_MESSAGE').get().value == ''
|
|
assert Variable.objects.filter(name='SETTING_MAINTENANCE_PASS_THROUGH_HEADER').get().value == ''
|
|
assert Variable.objects.filter(name='SETTING_TENANT_DISABLE_CRON_JOBS').get().value == 'false'
|
|
|
|
# try to activate the maintenance page without pass through header
|
|
resp.form.set('maintenance_page', True)
|
|
resp.form.set('maintenance_page_message', '')
|
|
resp.form.set('maintenance_pass_through_header', '')
|
|
resp.form.set('maintenance_pass_through_dnswl', '')
|
|
resp.form.set('disable_cron', False)
|
|
resp = resp.form.submit()
|
|
assert 'No HTTP header pass through or DNSWL is configured' in resp.text
|
|
assert 'Check this box if you are sure to enable the maintenance page.' in resp.text
|
|
|
|
# check the confirmation checkbox
|
|
resp.form.set('maintenance_page', True)
|
|
resp.form.set('confirm_maintenance_page', True)
|
|
resp.form.set('maintenance_page_message', '')
|
|
resp.form.set('maintenance_pass_through_header', '')
|
|
resp.form.set('maintenance_pass_through_dnswl', '')
|
|
resp.form.set('disable_cron', False)
|
|
resp = resp.form.submit().follow()
|
|
assert Variable.objects.filter(name='SETTING_MAINTENANCE_PAGE').get().value == 'true'
|
|
assert Variable.objects.filter(name='SETTING_MAINTENANCE_PASS_THROUGH_HEADER').get().value == ''
|
|
|
|
|
|
def test_disable_maintenance_page_command(db):
|
|
maintenance_page_variable = get_setting_variable('MAINTENANCE_PAGE')
|
|
assert not bool(maintenance_page_variable.json)
|
|
command = Command()
|
|
command.handle()
|
|
maintenance_page_variable = get_setting_variable('MAINTENANCE_PAGE')
|
|
assert not bool(maintenance_page_variable.json)
|
|
maintenance_page_variable.json = True
|
|
maintenance_page_variable.save()
|
|
assert bool(maintenance_page_variable.json)
|
|
command.handle()
|
|
maintenance_page_variable = get_setting_variable('MAINTENANCE_PAGE')
|
|
assert not bool(maintenance_page_variable.json)
|