From 64d973c20e5b53ed9051a1113df976c550fc5346 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20NO=C3=8BL?= Date: Mon, 12 Dec 2022 10:54:57 +0100 Subject: [PATCH] emails: restrict domains for default_from_email (#72173) --- hobo/emails/forms.py | 3 ++- hobo/emails/validators.py | 15 +++++++++++ hobo/settings.py | 4 +++ tests/test_emails.py | 53 ++++++++++++++++++++++++++++++++++++--- 4 files changed, 71 insertions(+), 4 deletions(-) diff --git a/hobo/emails/forms.py b/hobo/emails/forms.py index 4c838a4..6884a91 100644 --- a/hobo/emails/forms.py +++ b/hobo/emails/forms.py @@ -17,12 +17,13 @@ from django import forms from django.core.validators import validate_email from django.utils.translation import ugettext_lazy as _ -from hobo.emails.validators import validate_email_address, validate_email_spf +from hobo.emails.validators import validate_email_address, validate_email_spf, validate_email_domain class ValidEmailField(forms.EmailField): def validate(self, value): validate_email(value) + validate_email_domain(value) validate_email_address(value) validate_email_spf(value) diff --git a/hobo/emails/validators.py b/hobo/emails/validators.py index d2ec203..eb66893 100644 --- a/hobo/emails/validators.py +++ b/hobo/emails/validators.py @@ -16,6 +16,7 @@ import smtplib import socket +import urllib.parse import dns.resolver from django.conf import settings @@ -23,6 +24,8 @@ from django.core.exceptions import ValidationError from django.utils.encoding import force_bytes from django.utils.translation import ugettext_lazy as _ +from hobo.environment.utils import get_operational_services + def validate_email_address(value): if not settings.HOBO_VALIDATE_EMAIL_WITH_SMTP: @@ -75,3 +78,15 @@ def validate_email_spf(value, strict=False): if allowed_record in spf_record: return raise ValidationError(_('No suitable SPF record found for %s') % email_domain) + + +def validate_email_domain(value): + domain = value.split('@')[-1].strip().strip('.') + domains = settings.EMAIL_FROM_ALLOWED_DOMAINS + if '*' in domains or domain in domains: + return + for service in get_operational_services(): + fqdn = urllib.parse.urlparse(service.base_url).netloc.split(':')[0] + if fqdn == domain or fqdn == 'www.' + domain: + return + raise ValidationError(_('Domain %s is not allowed') % domain) diff --git a/hobo/settings.py b/hobo/settings.py index 2335ad2..a6875be 100644 --- a/hobo/settings.py +++ b/hobo/settings.py @@ -29,6 +29,10 @@ ALLOWED_HOSTS = [] HOBO_VALIDATE_EMAIL_WITH_SMTP = True ALLOWED_SPF_RECORDS = [] +# EMAIL_FROM_ALLOWED_DOMAINS: list of allowed domains for default_from_email. +# Use ['*'] to allow all domains. +# Note: all get_operational_services() url domains are always allowed +EMAIL_FROM_ALLOWED_DOMAINS = [] # Application definition diff --git a/tests/test_emails.py b/tests/test_emails.py index 779641b..71cd7aa 100644 --- a/tests/test_emails.py +++ b/tests/test_emails.py @@ -15,7 +15,7 @@ from dns.rdtypes.ANY import MX, TXT from test_manager import login from hobo.emails.validators import validate_email_address -from hobo.environment.models import Variable +from hobo.environment.models import Variable, ServiceBase, Combo, Wcs from hobo.test_utils import find_free_port @@ -132,14 +132,16 @@ def test_invalid_address(client, admin_user): assert 'Enter a valid email address' in force_text(response.content) -def test_unkown_address(client, admin_user, dns_resolver, smtp_server): +def test_unkown_address(client, admin_user, dns_resolver, smtp_server, settings): + settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*'] client.post('/login/', {'username': 'admin', 'password': 'password'}) response = client.post('/emails/', {'default_from_email': 'john.doe@unknown.com'}) assert response.status_code == 200 assert 'Email address not found' in force_text(response.content) -def test_kown_address_nospf(client, admin_user, dns_resolver, smtp_server): +def test_kown_address_nospf(client, admin_user, dns_resolver, smtp_server, settings): + settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*'] client.post('/login/', {'username': 'admin', 'password': 'password'}) response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True) assert response.status_code == 200 @@ -149,6 +151,7 @@ def test_kown_address_nospf(client, admin_user, dns_resolver, smtp_server): def test_spf_allow_all_mail(client, admin_user, dns_resolver, smtp_server, settings): + settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*'] client.post('/login/', {'username': 'admin', 'password': 'password'}) response = client.post( '/emails/', {'default_from_email': 'john.doe@example-spf-allow-all.com'}, follow=True @@ -160,6 +163,7 @@ def test_spf_allow_all_mail(client, admin_user, dns_resolver, smtp_server, setti def test_invalid_spf(client, admin_user, dns_resolver, smtp_server, settings): + settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*'] settings.ALLOWED_SPF_RECORDS = ['include:example.com'] client.post('/login/', {'username': 'admin', 'password': 'password'}) response = client.post('/emails/', {'default_from_email': 'john.doe@example-invalid-spf.com'}) @@ -168,6 +172,7 @@ def test_invalid_spf(client, admin_user, dns_resolver, smtp_server, settings): def test_strict_nospf(client, admin_user, dns_resolver, smtp_server, monkeypatch, settings): + settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*'] settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com'] monkeypatch.setattr('hobo.emails.validators.validate_email_spf.__defaults__', (True,)) client.post('/login/', {'username': 'admin', 'password': 'password'}) @@ -177,6 +182,7 @@ def test_strict_nospf(client, admin_user, dns_resolver, smtp_server, monkeypatch def test_valid_spf(client, admin_user, dns_resolver, smtp_server, settings): + settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*'] settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com'] client.post('/login/', {'username': 'admin', 'password': 'password'}) response = client.post('/emails/', {'default_from_email': 'john.doe@example-spf.com'}, follow=True) @@ -187,6 +193,7 @@ def test_valid_spf(client, admin_user, dns_resolver, smtp_server, settings): def test_no_spf_validation(client, admin_user, dns_resolver, smtp_server, settings): + settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*'] settings.ALLOWED_SPF_RECORDS = [] client.post('/login/', {'username': 'admin', 'password': 'password'}) response = client.post( @@ -198,7 +205,47 @@ def test_no_spf_validation(client, admin_user, dns_resolver, smtp_server, settin ) +def test_sender_allowed_domains(client, admin_user, dns_resolver, smtp_server, settings, monkeypatch): + settings.HOBO_VALIDATE_EMAIL_WITH_SMTP = False + + client.post('/login/', {'username': 'admin', 'password': 'password'}) + response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True) + assert response.status_code == 200 + assert 'Domain example.com is not allowed' in force_text(response.content) + assert 'Emails settings have been updated.' not in force_text(response.content) + + settings.EMAIL_FROM_ALLOWED_DOMAINS = ['example.com', 'foo.bar'] + response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True) + assert response.status_code == 200 + assert 'Emails settings have been updated.' in force_text(response.content) + assert 'Domain example.com is not allowed' not in force_text(response.content) + + settings.EMAIL_FROM_ALLOWED_DOMAINS = [] + combo = Combo(base_url='https://example.org/test') + combo.save() + monkeypatch.setattr(ServiceBase, 'is_operational', lambda x: True) + + response = client.post('/emails/', {'default_from_email': 'john.doe@example.org'}, follow=True) + assert response.status_code == 200 + assert 'Emails settings have been updated.' in force_text(response.content) + + response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True) + assert response.status_code == 200 + assert 'Domain example.com is not allowed' in force_text(response.content) + + response = client.post('/emails/', {'default_from_email': 'john.doe@brother.example.org'}, follow=True) + assert response.status_code == 200 + assert 'Domain brother.example.org is not allowed' in force_text(response.content) + + combo.base_url = 'https://www.example.com' + combo.save() + response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True) + assert response.status_code == 200 + assert 'Emails settings have been updated.' in force_text(response.content) + + def test_emails_view(app, admin_user, dns_resolver, smtp_server, settings): + settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*'] settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com'] app = login(app) resp = app.get('/emails/')