maintenance: allow dns passlists (#71526)

This commit is contained in:
Paul Marillonnet 2023-08-22 15:28:59 +02:00
parent 7c77f4d569
commit 30503058cc
6 changed files with 190 additions and 3 deletions

View File

@ -29,13 +29,16 @@ class MaintenanceForm(forms.Form):
maintenance_pass_trough_header = forms.CharField(
required=False, label=_('Maintenance HTTP header pass through')
)
maintenance_pass_through_dnswl = forms.CharField(
required=False, label=_('Maintenance DNSWL pass through')
)
disable_cron = forms.BooleanField(required=False, label=_('Disable cron jobs'))
def add_confirmation_checkbox(self):
self.add_error(
None,
_(
"No HTTP header pass through is configured, you won't be able to disable the maintenance page."
"No HTTP header pass through or DNSWL is configured, you won't be able to disable the maintenance page."
),
)
self.add_error(

88
hobo/maintenance/utils.py Normal file
View File

@ -0,0 +1,88 @@
# hobo - portal to configure and deploy applications
# Copyright (C) 2015-2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import re
import dns.exception
import dns.resolver
from django.core.validators import validate_ipv46_address
logger = logging.getLogger(__name__)
def _resolver_resolve(domain):
try:
method = dns.resolver.resolve
except AttributeError:
# support for dnspython 2.0.0 on bullseye, prevent deprecation warning on later versions
method = dns.resolver.query
return method(domain, 'A', lifetime=1)
def is_valid_hostname(hostname):
if hostname[-1] == '.':
# strip exactly one dot from the right, if present
hostname = hostname[:-1]
if len(hostname) > 253:
return False
labels = hostname.split('.')
# the TLD must be not all-numeric
if re.match(r'[0-9]+$', labels[-1]):
return False
allowed = re.compile(r'(?!-)[a-z0-9-]{1,63}(?<!-)$', re.IGNORECASE)
return all(allowed.match(label) for label in labels)
def check_dnswl(dnswl, remote_addr):
domain = '.'.join(reversed(remote_addr.split('.'))) + '.' + dnswl
exception = None
log = logger.debug
try:
answers = _resolver_resolve(domain)
result = any(answer.address for answer in answers)
except dns.resolver.NXDOMAIN as e:
exception = e
result = False
except dns.resolver.NoAnswer as e:
exception = e
result = False
except dns.exception.DNSException as e:
exception = e
log = logger.warning
result = False
log('utils: dnswl lookup of "%s", result=%s exception=%s', domain, result, exception)
return result
class DNSWL:
def __init__(self, domain):
if not is_valid_hostname(domain):
raise ValueError('%s is not a valid domain name' % domain)
self.domain = domain
def __contains__(self, remote_addr):
if not remote_addr or not isinstance(remote_addr, str):
return False
validate_ipv46_address(remote_addr)
return check_dnswl(self.domain, remote_addr)
def dnswl(domain):
return DNSWL(domain)

View File

@ -41,6 +41,10 @@ class HomeView(FormView):
def maintenance_pass_trough_header_variable(self):
return get_setting_variable('MAINTENANCE_PASS_THROUGH_HEADER')
@cached_property
def maintenance_pass_through_dnswl_variable(self):
return get_setting_variable('MAINTENANCE_PASS_THROUGH_DNSWL')
@cached_property
def tenant_disable_cron_jobs_variable(self):
return get_setting_variable('TENANT_DISABLE_CRON_JOBS')
@ -50,6 +54,7 @@ class HomeView(FormView):
initial['maintenance_page'] = bool(self.maintenance_page_variable.json)
initial['maintenance_page_message'] = self.maintenance_page_message_variable.value
initial['maintenance_pass_trough_header'] = self.maintenance_pass_trough_header_variable.value
initial['maintenance_pass_through_dnswl'] = self.maintenance_pass_through_dnswl_variable.value
initial['disable_cron'] = bool(self.tenant_disable_cron_jobs_variable.json)
return initial
@ -57,6 +62,7 @@ class HomeView(FormView):
if (
form.cleaned_data['maintenance_page']
and not form.cleaned_data['maintenance_pass_trough_header']
and not form.cleaned_data['maintenance_pass_through_dnswl']
and not form.cleaned_data['confirm_maintenance_page']
):
form.add_confirmation_checkbox()
@ -71,6 +77,10 @@ class HomeView(FormView):
'maintenance_pass_trough_header'
]
self.maintenance_pass_trough_header_variable.save()
self.maintenance_pass_through_dnswl_variable.value = form.cleaned_data[
'maintenance_pass_through_dnswl'
]
self.maintenance_pass_through_dnswl_variable.save()
self.tenant_disable_cron_jobs_variable.json = form.cleaned_data['disable_cron']
self.tenant_disable_cron_jobs_variable.save()
return super().form_valid(form)

View File

@ -20,6 +20,8 @@ from django.conf import settings
from django.template.response import TemplateResponse
from django.utils.translation import gettext as _
from hobo.maintenance.utils import dnswl
def pass_through(request):
remote_addr = request.META.get('REMOTE_ADDR')
@ -34,6 +36,9 @@ def pass_through(request):
return True
except ValueError: # bad remote_addr or network syntax
pass
pass_through_dnswl = getattr(settings, 'MAINTENANCE_PASS_THROUGH_DNSWL', '')
if pass_through_dnswl and remote_addr in dnswl(pass_through_dnswl):
return True
pass_through_header = getattr(settings, 'MAINTENANCE_PASS_THROUGH_HEADER', '')
if pass_through_header and pass_through_header in request.headers:
return True

View File

@ -36,7 +36,10 @@ class Home(TemplateView):
context['has_authentic'] = bool(Authentic.objects.filter(secondary=False))
context['has_global_title'] = Variable.objects.filter(name='global_title').exists()
context['has_default_from_email'] = Variable.objects.filter(name='default_from_email').exists()
context['show_maintenance_menu'] = bool(getattr(settings, 'MAINTENANCE_PASS_THROUGH_IPS', []))
context['show_maintenance_menu'] = bool(
getattr(settings, 'MAINTENANCE_PASS_THROUGH_IPS', [])
or getattr(settings, 'MAINTENANCE_PASS_THROUGH_DNSWL', [])
)
return context

View File

@ -1,8 +1,13 @@
from unittest import mock
from dns.exception import DNSException
from dns.resolver import NXDOMAIN, NoAnswer
from test_manager import login
from hobo.environment.models import Variable
from hobo.environment.utils import get_setting_variable
from hobo.maintenance.management.commands.disable_maintenance_page import Command
from hobo.maintenance.utils import check_dnswl
def test_maintenance_middleware(app, admin_user, db, monkeypatch, settings):
@ -43,6 +48,77 @@ def test_maintenance_middleware(app, admin_user, db, monkeypatch, settings):
resp = app.get('/', headers={'X-Entrouvert': 'yes'})
assert resp.status_code == 200
settings.MAINTENANCE_PASS_THROUGH_HEADER = ''
settings.MAINTENANCE_PASS_THROUGH_DNSWL = 'dnswl.example.com'
with mock.patch('hobo.maintenance.utils.check_dnswl', return_value=True):
resp = app.get('/')
assert resp.status_code == 200
with mock.patch('hobo.maintenance.utils.check_dnswl', return_value=False):
resp = app.get('/', status=503)
assert 'foobar' in resp.text
def test_check_dnswl():
# existing dnswl answers, ipv4
with mock.patch('hobo.maintenance.utils._resolver_resolve') as mock_resolve:
result = mock.Mock()
result.address = '1.2.3.4'
result2 = mock.Mock()
result2.address = '4.5.6.7'
mock_resolve.return_value = [result, result2]
assert check_dnswl('dnswl.example.com', '127.0.0.1')
# no known dnswl, ipv4
with mock.patch('hobo.maintenance.utils._resolver_resolve') as mock_resolve:
mock_resolve.return_value = []
assert not check_dnswl('dnswl.example.com', '127.0.0.1')
# existing dnswl answers, ipv6
with mock.patch('hobo.maintenance.utils._resolver_resolve') as mock_resolve:
result = mock.Mock()
result.address = '2001:db8:3333:4444:CCCC:DDDD:EEEE:FFFF'
result2 = mock.Mock()
result2.address = '2001:db8:3333:4444:CCCC:DDDD:FFFF:FFFF'
mock_resolve.return_value = [result, result2]
assert check_dnswl('dnswl.example.com', '::1')
# no known dnswl, ipv6
with mock.patch('hobo.maintenance.utils._resolver_resolve') as mock_resolve:
mock_resolve.return_value = []
assert not check_dnswl('dnswl.example.com', '::1')
# misc exceptions
with mock.patch(
'hobo.maintenance.utils._resolver_resolve', side_effect=NXDOMAIN('DNS query name does not exist')
) as mock_resolve:
result = mock.Mock()
result.address = '1.2.3.4'
result2 = mock.Mock()
result2.address = '4.5.6.7'
mock_resolve.return_value = [result, result2]
assert not check_dnswl('dnswl.example.com', '127.0.0.1')
with mock.patch(
'hobo.maintenance.utils._resolver_resolve',
side_effect=NoAnswer('DNS response does not contain the answer'),
) as mock_resolve:
result = mock.Mock()
result.address = '1.2.3.4'
result2 = mock.Mock()
result2.address = '4.5.6.7'
mock_resolve.return_value = [result, result2]
assert not check_dnswl('dnswl.example.com', '127.0.0.1')
with mock.patch(
'hobo.maintenance.utils._resolver_resolve', side_effect=DNSException('Error while retrieving DNSWL')
) as mock_resolve:
result = mock.Mock()
result.address = '1.2.3.4'
result2 = mock.Mock()
result2.address = '4.5.6.7'
mock_resolve.return_value = [result, result2]
assert not check_dnswl('dnswl.example.com', '127.0.0.1')
def test_manage(app, admin_user, settings):
assert Variable.objects.filter(name='SETTING_MAINTENANCE_PAGE').count() == 0
@ -85,9 +161,10 @@ def test_manage(app, admin_user, settings):
resp.form.set('maintenance_page', True)
resp.form.set('maintenance_page_message', '')
resp.form.set('maintenance_pass_trough_header', '')
resp.form.set('maintenance_pass_through_dnswl', '')
resp.form.set('disable_cron', False)
resp = resp.form.submit()
assert 'No HTTP header pass through is configured' in resp.text
assert 'No HTTP header pass through or DNSWL is configured' in resp.text
assert 'Check this box if you are sure to enable the maintenance page.' in resp.text
# check the confirmation checkbox
@ -95,6 +172,7 @@ def test_manage(app, admin_user, settings):
resp.form.set('confirm_maintenance_page', True)
resp.form.set('maintenance_page_message', '')
resp.form.set('maintenance_pass_trough_header', '')
resp.form.set('maintenance_pass_through_dnswl', '')
resp.form.set('disable_cron', False)
resp = resp.form.submit().follow()
assert Variable.objects.filter(name='SETTING_MAINTENANCE_PAGE').get().value == 'true'