maintenance: allow dns passlists (#71526)
This commit is contained in:
parent
f7c6b0a4d3
commit
a38d884c0b
|
@ -29,13 +29,16 @@ class MaintenanceForm(forms.Form):
|
|||
maintenance_pass_trough_header = forms.CharField(
|
||||
required=False, label=_('Maintenance HTTP header pass through')
|
||||
)
|
||||
maintenance_pass_through_dnswl = forms.CharField(
|
||||
required=False, label=_('Maintenance DNSWL pass through')
|
||||
)
|
||||
disable_cron = forms.BooleanField(required=False, label=_('Disable cron jobs'))
|
||||
|
||||
def add_confirmation_checkbox(self):
|
||||
self.add_error(
|
||||
None,
|
||||
_(
|
||||
"No HTTP header pass through is configured, you won't be able to disable the maintenance page."
|
||||
"No HTTP header pass through or DNSWL is configured, you won't be able to disable the maintenance page."
|
||||
),
|
||||
)
|
||||
self.add_error(
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
# hobo - portal to configure and deploy applications
|
||||
# Copyright (C) 2015-2023 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
import dns.exception
|
||||
import dns.resolver
|
||||
from django.core.validators import validate_ipv46_address
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _resolver_resolve(domain):
|
||||
try:
|
||||
method = dns.resolver.resolve
|
||||
except AttributeError:
|
||||
# support for dnspython 2.0.0 on bullseye, prevent deprecation warning on later versions
|
||||
method = dns.resolver.query
|
||||
return method(domain, 'A', lifetime=1)
|
||||
|
||||
|
||||
def is_valid_hostname(hostname):
|
||||
if hostname[-1] == '.':
|
||||
# strip exactly one dot from the right, if present
|
||||
hostname = hostname[:-1]
|
||||
if len(hostname) > 253:
|
||||
return False
|
||||
|
||||
labels = hostname.split('.')
|
||||
|
||||
# the TLD must be not all-numeric
|
||||
if re.match(r'[0-9]+$', labels[-1]):
|
||||
return False
|
||||
|
||||
allowed = re.compile(r'(?!-)[a-z0-9-]{1,63}(?<!-)$', re.IGNORECASE)
|
||||
return all(allowed.match(label) for label in labels)
|
||||
|
||||
|
||||
def check_dnswl(dnswl, remote_addr):
|
||||
domain = '.'.join(reversed(remote_addr.split('.'))) + '.' + dnswl
|
||||
exception = None
|
||||
log = logger.debug
|
||||
try:
|
||||
answers = _resolver_resolve(domain)
|
||||
result = any(answer.address for answer in answers)
|
||||
except dns.resolver.NXDOMAIN as e:
|
||||
exception = e
|
||||
result = False
|
||||
except dns.resolver.NoAnswer as e:
|
||||
exception = e
|
||||
result = False
|
||||
except dns.exception.DNSException as e:
|
||||
exception = e
|
||||
log = logger.warning
|
||||
result = False
|
||||
log('utils: dnswl lookup of "%s", result=%s exception=%s', domain, result, exception)
|
||||
return result
|
||||
|
||||
|
||||
class DNSWL:
|
||||
def __init__(self, domain):
|
||||
if not is_valid_hostname(domain):
|
||||
raise ValueError('%s is not a valid domain name' % domain)
|
||||
self.domain = domain
|
||||
|
||||
def __contains__(self, remote_addr):
|
||||
if not remote_addr or not isinstance(remote_addr, str):
|
||||
return False
|
||||
validate_ipv46_address(remote_addr)
|
||||
return check_dnswl(self.domain, remote_addr)
|
||||
|
||||
|
||||
def dnswl(domain):
|
||||
return DNSWL(domain)
|
|
@ -41,6 +41,10 @@ class HomeView(FormView):
|
|||
def maintenance_pass_trough_header_variable(self):
|
||||
return get_setting_variable('MAINTENANCE_PASS_THROUGH_HEADER')
|
||||
|
||||
@cached_property
|
||||
def maintenance_pass_through_dnswl_variable(self):
|
||||
return get_setting_variable('MAINTENANCE_PASS_THROUGH_DNSWL')
|
||||
|
||||
@cached_property
|
||||
def tenant_disable_cron_jobs_variable(self):
|
||||
return get_setting_variable('TENANT_DISABLE_CRON_JOBS')
|
||||
|
@ -50,6 +54,7 @@ class HomeView(FormView):
|
|||
initial['maintenance_page'] = bool(self.maintenance_page_variable.json)
|
||||
initial['maintenance_page_message'] = self.maintenance_page_message_variable.value
|
||||
initial['maintenance_pass_trough_header'] = self.maintenance_pass_trough_header_variable.value
|
||||
initial['maintenance_pass_through_dnswl'] = self.maintenance_pass_through_dnswl_variable.value
|
||||
initial['disable_cron'] = bool(self.tenant_disable_cron_jobs_variable.json)
|
||||
return initial
|
||||
|
||||
|
@ -57,6 +62,7 @@ class HomeView(FormView):
|
|||
if (
|
||||
form.cleaned_data['maintenance_page']
|
||||
and not form.cleaned_data['maintenance_pass_trough_header']
|
||||
and not form.cleaned_data['maintenance_pass_through_dnswl']
|
||||
and not form.cleaned_data['confirm_maintenance_page']
|
||||
):
|
||||
form.add_confirmation_checkbox()
|
||||
|
@ -71,6 +77,10 @@ class HomeView(FormView):
|
|||
'maintenance_pass_trough_header'
|
||||
]
|
||||
self.maintenance_pass_trough_header_variable.save()
|
||||
self.maintenance_pass_through_dnswl_variable.value = form.cleaned_data[
|
||||
'maintenance_pass_through_dnswl'
|
||||
]
|
||||
self.maintenance_pass_through_dnswl_variable.save()
|
||||
self.tenant_disable_cron_jobs_variable.json = form.cleaned_data['disable_cron']
|
||||
self.tenant_disable_cron_jobs_variable.save()
|
||||
return super().form_valid(form)
|
||||
|
|
|
@ -20,6 +20,8 @@ from django.conf import settings
|
|||
from django.template.response import TemplateResponse
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from hobo.maintenance.utils import dnswl
|
||||
|
||||
|
||||
def pass_through(request):
|
||||
remote_addr = request.META.get('REMOTE_ADDR')
|
||||
|
@ -34,6 +36,9 @@ def pass_through(request):
|
|||
return True
|
||||
except ValueError: # bad remote_addr or network syntax
|
||||
pass
|
||||
pass_through_dnswl = getattr(settings, 'MAINTENANCE_PASS_THROUGH_DNSWL', '')
|
||||
if pass_through_dnswl and remote_addr in dnswl(pass_through_dnswl):
|
||||
return True
|
||||
pass_through_header = getattr(settings, 'MAINTENANCE_PASS_THROUGH_HEADER', '')
|
||||
if pass_through_header and pass_through_header in request.headers:
|
||||
return True
|
||||
|
|
|
@ -36,7 +36,10 @@ class Home(TemplateView):
|
|||
context['has_authentic'] = bool(Authentic.objects.filter(secondary=False))
|
||||
context['has_global_title'] = Variable.objects.filter(name='global_title').exists()
|
||||
context['has_default_from_email'] = Variable.objects.filter(name='default_from_email').exists()
|
||||
context['show_maintenance_menu'] = bool(getattr(settings, 'MAINTENANCE_PASS_THROUGH_IPS', []))
|
||||
context['show_maintenance_menu'] = bool(
|
||||
getattr(settings, 'MAINTENANCE_PASS_THROUGH_IPS', [])
|
||||
or getattr(settings, 'MAINTENANCE_PASS_THROUGH_DNSWL', [])
|
||||
)
|
||||
return context
|
||||
|
||||
|
||||
|
|
|
@ -1,8 +1,13 @@
|
|||
from unittest import mock
|
||||
|
||||
from dns.exception import DNSException
|
||||
from dns.resolver import NXDOMAIN, NoAnswer
|
||||
from test_manager import login
|
||||
|
||||
from hobo.environment.models import Variable
|
||||
from hobo.environment.utils import get_setting_variable
|
||||
from hobo.maintenance.management.commands.disable_maintenance_page import Command
|
||||
from hobo.maintenance.utils import check_dnswl
|
||||
|
||||
|
||||
def test_maintenance_middleware(app, admin_user, db, monkeypatch, settings):
|
||||
|
@ -43,6 +48,77 @@ def test_maintenance_middleware(app, admin_user, db, monkeypatch, settings):
|
|||
resp = app.get('/', headers={'X-Entrouvert': 'yes'})
|
||||
assert resp.status_code == 200
|
||||
|
||||
settings.MAINTENANCE_PASS_THROUGH_HEADER = ''
|
||||
settings.MAINTENANCE_PASS_THROUGH_DNSWL = 'dnswl.example.com'
|
||||
with mock.patch('hobo.maintenance.utils.check_dnswl', return_value=True):
|
||||
resp = app.get('/')
|
||||
assert resp.status_code == 200
|
||||
with mock.patch('hobo.maintenance.utils.check_dnswl', return_value=False):
|
||||
resp = app.get('/', status=503)
|
||||
assert 'foobar' in resp.text
|
||||
|
||||
|
||||
def test_check_dnswl():
|
||||
# existing dnswl answers, ipv4
|
||||
with mock.patch('hobo.maintenance.utils._resolver_resolve') as mock_resolve:
|
||||
result = mock.Mock()
|
||||
result.address = '1.2.3.4'
|
||||
result2 = mock.Mock()
|
||||
result2.address = '4.5.6.7'
|
||||
mock_resolve.return_value = [result, result2]
|
||||
assert check_dnswl('dnswl.example.com', '127.0.0.1')
|
||||
|
||||
# no known dnswl, ipv4
|
||||
with mock.patch('hobo.maintenance.utils._resolver_resolve') as mock_resolve:
|
||||
mock_resolve.return_value = []
|
||||
assert not check_dnswl('dnswl.example.com', '127.0.0.1')
|
||||
|
||||
# existing dnswl answers, ipv6
|
||||
with mock.patch('hobo.maintenance.utils._resolver_resolve') as mock_resolve:
|
||||
result = mock.Mock()
|
||||
result.address = '2001:db8:3333:4444:CCCC:DDDD:EEEE:FFFF'
|
||||
result2 = mock.Mock()
|
||||
result2.address = '2001:db8:3333:4444:CCCC:DDDD:FFFF:FFFF'
|
||||
mock_resolve.return_value = [result, result2]
|
||||
assert check_dnswl('dnswl.example.com', '::1')
|
||||
|
||||
# no known dnswl, ipv6
|
||||
with mock.patch('hobo.maintenance.utils._resolver_resolve') as mock_resolve:
|
||||
mock_resolve.return_value = []
|
||||
assert not check_dnswl('dnswl.example.com', '::1')
|
||||
|
||||
# misc exceptions
|
||||
with mock.patch(
|
||||
'hobo.maintenance.utils._resolver_resolve', side_effect=NXDOMAIN('DNS query name does not exist')
|
||||
) as mock_resolve:
|
||||
result = mock.Mock()
|
||||
result.address = '1.2.3.4'
|
||||
result2 = mock.Mock()
|
||||
result2.address = '4.5.6.7'
|
||||
mock_resolve.return_value = [result, result2]
|
||||
assert not check_dnswl('dnswl.example.com', '127.0.0.1')
|
||||
|
||||
with mock.patch(
|
||||
'hobo.maintenance.utils._resolver_resolve',
|
||||
side_effect=NoAnswer('DNS response does not contain the answer'),
|
||||
) as mock_resolve:
|
||||
result = mock.Mock()
|
||||
result.address = '1.2.3.4'
|
||||
result2 = mock.Mock()
|
||||
result2.address = '4.5.6.7'
|
||||
mock_resolve.return_value = [result, result2]
|
||||
assert not check_dnswl('dnswl.example.com', '127.0.0.1')
|
||||
|
||||
with mock.patch(
|
||||
'hobo.maintenance.utils._resolver_resolve', side_effect=DNSException('Error while retrieving DNSWL')
|
||||
) as mock_resolve:
|
||||
result = mock.Mock()
|
||||
result.address = '1.2.3.4'
|
||||
result2 = mock.Mock()
|
||||
result2.address = '4.5.6.7'
|
||||
mock_resolve.return_value = [result, result2]
|
||||
assert not check_dnswl('dnswl.example.com', '127.0.0.1')
|
||||
|
||||
|
||||
def test_manage(app, admin_user, settings):
|
||||
assert Variable.objects.filter(name='SETTING_MAINTENANCE_PAGE').count() == 0
|
||||
|
@ -85,9 +161,10 @@ def test_manage(app, admin_user, settings):
|
|||
resp.form.set('maintenance_page', True)
|
||||
resp.form.set('maintenance_page_message', '')
|
||||
resp.form.set('maintenance_pass_trough_header', '')
|
||||
resp.form.set('maintenance_pass_through_dnswl', '')
|
||||
resp.form.set('disable_cron', False)
|
||||
resp = resp.form.submit()
|
||||
assert 'No HTTP header pass through is configured' in resp.text
|
||||
assert 'No HTTP header pass through or DNSWL is configured' in resp.text
|
||||
assert 'Check this box if you are sure to enable the maintenance page.' in resp.text
|
||||
|
||||
# check the confirmation checkbox
|
||||
|
@ -95,6 +172,7 @@ def test_manage(app, admin_user, settings):
|
|||
resp.form.set('confirm_maintenance_page', True)
|
||||
resp.form.set('maintenance_page_message', '')
|
||||
resp.form.set('maintenance_pass_trough_header', '')
|
||||
resp.form.set('maintenance_pass_through_dnswl', '')
|
||||
resp.form.set('disable_cron', False)
|
||||
resp = resp.form.submit().follow()
|
||||
assert Variable.objects.filter(name='SETTING_MAINTENANCE_PAGE').get().value == 'true'
|
||||
|
|
Loading…
Reference in New Issue