219 lines
8.6 KiB
Python
219 lines
8.6 KiB
Python
import asyncore
|
|
import random
|
|
import smtpd
|
|
import smtplib
|
|
import socket
|
|
import threading
|
|
from unittest import mock
|
|
|
|
import dns.resolver
|
|
import pytest
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils.encoding import force_text
|
|
from dns import name
|
|
from dns.rdtypes.ANY import MX, TXT
|
|
from test_manager import login
|
|
|
|
from hobo.emails.validators import validate_email_address
|
|
from hobo.environment.models import Variable
|
|
|
|
|
|
@pytest.fixture
|
|
def port_available():
|
|
errno = 0
|
|
while not errno:
|
|
port = random.randint(49152, 65534)
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
errno = sock.connect_ex(('127.0.0.1', port))
|
|
sock.close()
|
|
return port
|
|
|
|
|
|
@pytest.fixture
|
|
def dns_resolver(monkeypatch, port_available):
|
|
def fn(value, kind):
|
|
if kind == 'MX':
|
|
mx = mock.create_autospec(MX)
|
|
mx.exchange = mock.create_autospec(name.Name)
|
|
mx.exchange.to_text = mock.MagicMock()
|
|
mx.exchange.to_text.return_value = 'localhost:%s' % port_available
|
|
mx.preference = mock.create_autospec(name.Name)
|
|
mx.preference.to_text = mock.MagicMock()
|
|
mx.preference.to_text.return_value = 10
|
|
return [mx]
|
|
if kind == 'TXT':
|
|
txt = mock.create_autospec(TXT)
|
|
txt.strings = mock.create_autospec(name.Name)
|
|
txt.strings = mock.MagicMock()
|
|
if value == 'example-spf.com':
|
|
txt.strings = [b'v=spf1 include:allowed_mx.com']
|
|
elif value == 'example-spf-allow-all.com':
|
|
txt.strings = [b'v=spf1 +all']
|
|
elif value == 'example-invalid-spf.com':
|
|
txt.strings = [b'v=spf1 include:not_allowed_mx.com']
|
|
return [txt]
|
|
|
|
monkeypatch.setattr(dns.resolver, 'query', fn)
|
|
|
|
|
|
@pytest.fixture
|
|
def smtp_server(monkeypatch, port_available):
|
|
class RecipientValidatingSMTPChannel(smtpd.SMTPChannel):
|
|
def smtp_RCPT(self, arg):
|
|
address = self._getaddr(arg)
|
|
domain = address[1].split('@')[-1][:-1]
|
|
if domain in (
|
|
'example.com',
|
|
'example-spf.com',
|
|
'example-spf-allow-all.com',
|
|
'example-invalid-spf.com',
|
|
):
|
|
self._SMTPChannel__rcpttos.append(address)
|
|
self.push('250 Ok')
|
|
else:
|
|
self.push('550 No such user here')
|
|
|
|
class MailServer(smtpd.SMTPServer):
|
|
def handle_accept(self):
|
|
conn, addr = self.accept()
|
|
channel = RecipientValidatingSMTPChannel(self, conn, addr)
|
|
|
|
server = MailServer(('localhost', port_available), None)
|
|
thread = threading.Thread(target=asyncore.loop, kwargs={'timeout': 1})
|
|
thread.start()
|
|
yield
|
|
server.close()
|
|
thread.join()
|
|
|
|
|
|
def test_validate_email_address_bad_query(monkeypatch):
|
|
def no_answer(x, y):
|
|
raise (dns.resolver.NoAnswer())
|
|
|
|
def nx_domain(x, y):
|
|
raise (dns.resolver.NXDOMAIN())
|
|
|
|
monkeypatch.setattr('dns.resolver.query', no_answer)
|
|
with pytest.raises(ValidationError) as e:
|
|
validate_email_address('foo')
|
|
assert 'Error' in str(e.value)
|
|
monkeypatch.setattr('dns.resolver.query', nx_domain)
|
|
with pytest.raises(ValidationError) as e:
|
|
validate_email_address('foo')
|
|
assert 'Error' in str(e.value)
|
|
|
|
|
|
def test_validate_email_address_socket_error(dns_resolver, monkeypatch):
|
|
with mock.patch('smtplib.SMTP') as SMTP:
|
|
SMTP.side_effect = socket.error
|
|
with pytest.raises(ValidationError) as e:
|
|
validate_email_address('john.doe@example.com')
|
|
assert 'Error while connecting' in str(e.value)
|
|
|
|
with mock.patch('smtplib.SMTP') as SMTP:
|
|
SMTP.return_value.helo.side_effect = OSError
|
|
with pytest.raises(ValidationError) as e:
|
|
validate_email_address('john.doe@example.com')
|
|
assert 'Error while connecting' in str(e.value)
|
|
|
|
with mock.patch('smtplib.SMTP') as SMTP:
|
|
SMTP.return_value.helo.return_value = 250, None
|
|
SMTP.return_value.rcpt.return_value = 550, None
|
|
SMTP.return_value.quit.side_effect = OSError
|
|
with pytest.raises(ValidationError) as e:
|
|
validate_email_address('john.doe@example.com')
|
|
assert 'Error while connecting' in str(e.value)
|
|
|
|
|
|
def test_validate_email_address_bypass(settings):
|
|
settings.HOBO_VALIDATE_EMAIL_WITH_SMTP = False
|
|
assert validate_email_address('foo') == None
|
|
|
|
|
|
def test_invalid_address(client, admin_user):
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post('/emails/', {'default_from_email': 'foobar'})
|
|
assert response.status_code == 200
|
|
assert 'Enter a valid email address' in force_text(response.content)
|
|
|
|
|
|
def test_unkown_address(client, admin_user, dns_resolver, smtp_server):
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@unknown.com'})
|
|
assert response.status_code == 200
|
|
assert 'Email address not found' in force_text(response.content)
|
|
|
|
|
|
def test_kown_address_nospf(client, admin_user, dns_resolver, smtp_server):
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
|
|
assert response.status_code == 200
|
|
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_text(
|
|
response.content
|
|
)
|
|
|
|
|
|
def test_spf_allow_all_mail(client, admin_user, dns_resolver, smtp_server, settings):
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post(
|
|
'/emails/', {'default_from_email': 'john.doe@example-spf-allow-all.com'}, follow=True
|
|
)
|
|
assert response.status_code == 200
|
|
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_text(
|
|
response.content
|
|
)
|
|
|
|
|
|
def test_invalid_spf(client, admin_user, dns_resolver, smtp_server, settings):
|
|
settings.ALLOWED_SPF_RECORDS = ['include:example.com']
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@example-invalid-spf.com'})
|
|
assert response.status_code == 200
|
|
assert 'No suitable SPF record found' in force_text(response.content)
|
|
|
|
|
|
def test_strict_nospf(client, admin_user, dns_resolver, smtp_server, monkeypatch, settings):
|
|
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
|
|
monkeypatch.setattr('hobo.emails.validators.validate_email_spf.__defaults__', (True,))
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
|
|
assert response.status_code == 200
|
|
assert 'No suitable SPF record found' in force_text(response.content)
|
|
|
|
|
|
def test_valid_spf(client, admin_user, dns_resolver, smtp_server, settings):
|
|
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@example-spf.com'}, follow=True)
|
|
assert response.status_code == 200
|
|
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_text(
|
|
response.content
|
|
)
|
|
|
|
|
|
def test_no_spf_validation(client, admin_user, dns_resolver, smtp_server, settings):
|
|
settings.ALLOWED_SPF_RECORDS = []
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post(
|
|
'/emails/', {'default_from_email': 'john.doe@example-invalid-spf.com'}, follow=True
|
|
)
|
|
assert response.status_code == 200
|
|
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_text(
|
|
response.content
|
|
)
|
|
|
|
|
|
def test_emails_view(app, admin_user, dns_resolver, smtp_server, settings):
|
|
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
|
|
app = login(app)
|
|
resp = app.get('/emails/')
|
|
resp.form['default_from_email'] = 'john.doe@example.com'
|
|
resp.form['email_signature'] = 'my signature'
|
|
resp.form['global_email_prefix'] = 'my prefix'
|
|
resp.form['email_sender_name'] = 'my name'
|
|
resp = resp.form.submit()
|
|
assert Variable.objects.filter(name='default_from_email')[0].value == 'john.doe@example.com'
|
|
assert Variable.objects.filter(name='email_signature')[0].value == 'my signature'
|
|
assert Variable.objects.filter(name='global_email_prefix')[0].value == 'my prefix'
|
|
assert Variable.objects.filter(name='email_sender_name')[0].value == 'my name'
|