add default_from_email checks (#24519)

This commit is contained in:
Christophe Siraut 2019-03-29 10:17:59 +01:00
parent 9f715b356a
commit 1b9704742f
6 changed files with 217 additions and 3 deletions

3
debian/control vendored
View File

@ -17,7 +17,8 @@ Depends: ${misc:Depends},
python-apt,
python-memcache,
python-prometheus-client,
python-djangorestframework
python-djangorestframework,
python-dnspython
Recommends: python-django (>= 1.8),
python-gadjo,
python-django-mellon (>= 1.2.22.26),

View File

@ -13,13 +13,22 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django import forms
from django.core.validators import validate_email
from django.utils.translation import ugettext_lazy as _
from hobo.emails.validators import validate_email_address, validate_email_spf
class ValidEmailField(forms.EmailField):
def validate(self, value):
validate_email(value)
validate_email_address(value)
validate_email_spf(value)
class EmailsForm(forms.Form):
default_from_email = forms.EmailField(label=_('Default From'))
default_from_email = ValidEmailField(label=_('Default From'))
email_signature = forms.CharField(label=_('Signature'), required=False,
widget=forms.Textarea)

64
hobo/emails/validators.py Normal file
View File

@ -0,0 +1,64 @@
# hobo - portal to configure and deploy applications
# Copyright (C) 2015-2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import dns.resolver
import smtplib
import socket
from django.conf import settings
from django.core.exceptions import ValidationError
from django.utils.translation import ugettext_lazy as _
def validate_email_address(value):
email_domain = value.split('@')[-1]
try:
mx_server = dns.resolver.query(email_domain, 'MX')[0].exchange.to_text()
except dns.resolver.NXDOMAIN as e:
raise ValidationError(_('Error: %s') % str(e))
except dns.resolver.NoAnswer as e:
raise ValidationError(_('Error: %s') % str(e))
smtp = smtplib.SMTP(timeout=30)
try:
smtp.connect(mx_server)
except socket.error as e:
raise ValidationError(_('Error while connecting to %s: %s') % (mx_server, str(e)))
status, msg = smtp.helo()
if status != 250:
smtp.quit()
raise ValidationError(_('Error while connecting to %s: %s') % (mx_server, msg))
smtp.mail('')
status, msg = smtp.rcpt(value)
if status == 250:
smtp.quit()
return
smtp.quit()
raise ValidationError(_('Email address not found on %s') % mx_server)
def validate_email_spf(value, strict=False):
allowed_records = settings.ALLOWED_SPF_RECORDS
email_domain = value.split('@')[-1]
txt_records = sum([r.strings for r in dns.resolver.query(email_domain, 'TXT')], [])
spf_records = [x for x in txt_records if x.startswith('v=spf1 ')]
if not strict and not spf_records:
return
for spf_record in spf_records:
if '+all' in spf_record:
return
for allowed_record in allowed_records:
if allowed_record in spf_record:
return
raise ValidationError(_('No matching SPF record for %s') % email_domain)

View File

@ -26,6 +26,7 @@ DEBUG = True
ALLOWED_HOSTS = []
ALLOWED_SPF_RECORDS = []
# Application definition

138
tests/test_emails.py Normal file
View File

@ -0,0 +1,138 @@
import pytest
import asyncore
import dns.resolver
from dns import name
from dns.rdtypes.ANY import MX, TXT
import mock
import smtplib
import smtpd
import socket
import threading
from django.core.exceptions import ValidationError
from hobo.emails.validators import validate_email_address
@pytest.fixture
def dns_resolver(monkeypatch):
def fn(value, kind):
if kind == 'MX':
mx = mock.create_autospec(MX)
mx.exchange = mock.create_autospec(name.Name)
mx.exchange.to_text = mock.MagicMock()
mx.exchange.to_text.return_value = 'localhost:10025'
return [mx]
if kind == 'TXT':
txt = mock.create_autospec(TXT)
txt.strings = mock.create_autospec(name.Name)
txt.strings = mock.MagicMock()
if value == 'example-spf.com':
txt.strings = ['v=spf1 include:allowed_mx.com']
elif value == 'example-spf-allow-all.com':
txt.strings = ['v=spf1 +all']
elif value == 'example-invalid-spf.com':
txt.strings = ['v=spf1 include:not_allowed_mx.com']
return [txt]
monkeypatch.setattr(dns.resolver, 'query', fn)
@pytest.fixture
def smtp_server(monkeypatch):
class RecipientValidatingSMTPChannel(smtpd.SMTPChannel):
def smtp_RCPT(self, arg):
address = self._SMTPChannel__getaddr('TO:', arg)
domain = address.split('@')[-1]
if domain in ('example.com', 'example-spf.com', 'example-spf-allow-all.com', 'example-invalid-spf.com'):
self._SMTPChannel__rcpttos.append(address)
self.push('250 Ok')
else:
self.push('550 No such user here')
class MailServer(smtpd.SMTPServer):
def handle_accept(self):
conn, addr = self.accept()
channel = RecipientValidatingSMTPChannel(self, conn, addr)
server = MailServer(('localhost', 10025), None)
thread = threading.Thread(target=asyncore.loop, kwargs={'timeout': 1})
thread.start()
yield
server.close()
thread.join()
def test_validate_email_address_bad_query(monkeypatch):
def no_answer(x, y):
raise(dns.resolver.NoAnswer())
def nx_domain(x, y):
raise(dns.resolver.NXDOMAIN())
monkeypatch.setattr('dns.resolver.query', no_answer)
with pytest.raises(ValidationError) as e:
validate_email_address('foo')
assert 'Error' in str(e.value)
monkeypatch.setattr('dns.resolver.query', nx_domain)
with pytest.raises(ValidationError) as e:
validate_email_address('foo')
assert 'Error' in str(e.value)
def test_validate_email_address_socket_error(dns_resolver, monkeypatch):
def socket_error(x, y):
raise(socket.error())
monkeypatch.setattr('smtplib.SMTP.connect', socket_error)
with pytest.raises(ValidationError) as e:
validate_email_address('john.doe@example.com')
assert 'Error' in str(e.value)
def test_invalid_address(client, admin_user):
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'foobar'})
assert response.status_code == 200
assert 'Enter a valid email address' in response.content
def test_unkown_address(client, admin_user, dns_resolver, smtp_server):
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@unknown.com'})
assert response.status_code == 200
assert 'Email address not found' in response.content
def test_kown_address_nospf(client, admin_user, dns_resolver, smtp_server):
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in response.content
def test_spf_allow_all_mail(client, admin_user, dns_resolver, smtp_server, settings):
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example-spf-allow-all.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in response.content
def test_invalid_spf(client, admin_user, dns_resolver, smtp_server, settings):
settings.ALLOWED_SPF_RECORDS = ['include:example.com']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example-invalid-spf.com'})
assert response.status_code == 200
assert 'No matching SPF record' in response.content
def test_strict_nospf(client, admin_user, dns_resolver, smtp_server, monkeypatch):
monkeypatch.setattr('hobo.emails.validators.validate_email_spf.__defaults__', (True,))
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'No matching SPF record' in response.content
def test_valid_spf(client, admin_user, dns_resolver, smtp_server, settings):
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example-spf.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in response.content

View File

@ -55,6 +55,7 @@ deps:
http://git.entrouvert.org/debian/django-tenant-schemas.git/snapshot/django-tenant-schemas-master.tar.gz
httmock
requests
dnspython
commands =
./getlasso.sh
hobo: py.test {env:COVERAGE:} {env:NOMIGRATIONS:} {posargs:tests/}