emails: restrict domains for default_from_email (#72173)
gitea-wip/hobo/pipeline/pr-main Something is wrong with the build of this commit Details
gitea/hobo/pipeline/pr-main Something is wrong with the build of this commit Details
gitea/hobo/pipeline/head Something is wrong with the build of this commit Details

This commit is contained in:
Thomas NOËL 2022-12-12 10:54:57 +01:00
parent 13f0821c66
commit 64d973c20e
4 changed files with 71 additions and 4 deletions

View File

@ -17,12 +17,13 @@ from django import forms
from django.core.validators import validate_email
from django.utils.translation import ugettext_lazy as _
from hobo.emails.validators import validate_email_address, validate_email_spf
from hobo.emails.validators import validate_email_address, validate_email_spf, validate_email_domain
class ValidEmailField(forms.EmailField):
def validate(self, value):
validate_email(value)
validate_email_domain(value)
validate_email_address(value)
validate_email_spf(value)

View File

@ -16,6 +16,7 @@
import smtplib
import socket
import urllib.parse
import dns.resolver
from django.conf import settings
@ -23,6 +24,8 @@ from django.core.exceptions import ValidationError
from django.utils.encoding import force_bytes
from django.utils.translation import ugettext_lazy as _
from hobo.environment.utils import get_operational_services
def validate_email_address(value):
if not settings.HOBO_VALIDATE_EMAIL_WITH_SMTP:
@ -75,3 +78,15 @@ def validate_email_spf(value, strict=False):
if allowed_record in spf_record:
return
raise ValidationError(_('No suitable SPF record found for %s') % email_domain)
def validate_email_domain(value):
domain = value.split('@')[-1].strip().strip('.')
domains = settings.EMAIL_FROM_ALLOWED_DOMAINS
if '*' in domains or domain in domains:
return
for service in get_operational_services():
fqdn = urllib.parse.urlparse(service.base_url).netloc.split(':')[0]
if fqdn == domain or fqdn == 'www.' + domain:
return
raise ValidationError(_('Domain %s is not allowed') % domain)

View File

@ -29,6 +29,10 @@ ALLOWED_HOSTS = []
HOBO_VALIDATE_EMAIL_WITH_SMTP = True
ALLOWED_SPF_RECORDS = []
# EMAIL_FROM_ALLOWED_DOMAINS: list of allowed domains for default_from_email.
# Use ['*'] to allow all domains.
# Note: all get_operational_services() url domains are always allowed
EMAIL_FROM_ALLOWED_DOMAINS = []
# Application definition

View File

@ -15,7 +15,7 @@ from dns.rdtypes.ANY import MX, TXT
from test_manager import login
from hobo.emails.validators import validate_email_address
from hobo.environment.models import Variable
from hobo.environment.models import Variable, ServiceBase, Combo, Wcs
from hobo.test_utils import find_free_port
@ -132,14 +132,16 @@ def test_invalid_address(client, admin_user):
assert 'Enter a valid email address' in force_text(response.content)
def test_unkown_address(client, admin_user, dns_resolver, smtp_server):
def test_unkown_address(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@unknown.com'})
assert response.status_code == 200
assert 'Email address not found' in force_text(response.content)
def test_kown_address_nospf(client, admin_user, dns_resolver, smtp_server):
def test_kown_address_nospf(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
@ -149,6 +151,7 @@ def test_kown_address_nospf(client, admin_user, dns_resolver, smtp_server):
def test_spf_allow_all_mail(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post(
'/emails/', {'default_from_email': 'john.doe@example-spf-allow-all.com'}, follow=True
@ -160,6 +163,7 @@ def test_spf_allow_all_mail(client, admin_user, dns_resolver, smtp_server, setti
def test_invalid_spf(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = ['include:example.com']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example-invalid-spf.com'})
@ -168,6 +172,7 @@ def test_invalid_spf(client, admin_user, dns_resolver, smtp_server, settings):
def test_strict_nospf(client, admin_user, dns_resolver, smtp_server, monkeypatch, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
monkeypatch.setattr('hobo.emails.validators.validate_email_spf.__defaults__', (True,))
client.post('/login/', {'username': 'admin', 'password': 'password'})
@ -177,6 +182,7 @@ def test_strict_nospf(client, admin_user, dns_resolver, smtp_server, monkeypatch
def test_valid_spf(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example-spf.com'}, follow=True)
@ -187,6 +193,7 @@ def test_valid_spf(client, admin_user, dns_resolver, smtp_server, settings):
def test_no_spf_validation(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = []
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post(
@ -198,7 +205,47 @@ def test_no_spf_validation(client, admin_user, dns_resolver, smtp_server, settin
)
def test_sender_allowed_domains(client, admin_user, dns_resolver, smtp_server, settings, monkeypatch):
settings.HOBO_VALIDATE_EMAIL_WITH_SMTP = False
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Domain example.com is not allowed' in force_text(response.content)
assert 'Emails settings have been updated.' not in force_text(response.content)
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['example.com', 'foo.bar']
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated.' in force_text(response.content)
assert 'Domain example.com is not allowed' not in force_text(response.content)
settings.EMAIL_FROM_ALLOWED_DOMAINS = []
combo = Combo(base_url='https://example.org/test')
combo.save()
monkeypatch.setattr(ServiceBase, 'is_operational', lambda x: True)
response = client.post('/emails/', {'default_from_email': 'john.doe@example.org'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated.' in force_text(response.content)
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Domain example.com is not allowed' in force_text(response.content)
response = client.post('/emails/', {'default_from_email': 'john.doe@brother.example.org'}, follow=True)
assert response.status_code == 200
assert 'Domain brother.example.org is not allowed' in force_text(response.content)
combo.base_url = 'https://www.example.com'
combo.save()
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated.' in force_text(response.content)
def test_emails_view(app, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
app = login(app)
resp = app.get('/emails/')