hobo/tests/test_emails.py

139 lines
5.7 KiB
Python

import pytest
import asyncore
import dns.resolver
from dns import name
from dns.rdtypes.ANY import MX, TXT
import mock
import smtplib
import smtpd
import socket
import threading
from django.core.exceptions import ValidationError
from hobo.emails.validators import validate_email_address
@pytest.fixture
def dns_resolver(monkeypatch):
def fn(value, kind):
if kind == 'MX':
mx = mock.create_autospec(MX)
mx.exchange = mock.create_autospec(name.Name)
mx.exchange.to_text = mock.MagicMock()
mx.exchange.to_text.return_value = 'localhost:10025'
return [mx]
if kind == 'TXT':
txt = mock.create_autospec(TXT)
txt.strings = mock.create_autospec(name.Name)
txt.strings = mock.MagicMock()
if value == 'example-spf.com':
txt.strings = ['v=spf1 include:allowed_mx.com']
elif value == 'example-spf-allow-all.com':
txt.strings = ['v=spf1 +all']
elif value == 'example-invalid-spf.com':
txt.strings = ['v=spf1 include:not_allowed_mx.com']
return [txt]
monkeypatch.setattr(dns.resolver, 'query', fn)
@pytest.fixture
def smtp_server(monkeypatch):
class RecipientValidatingSMTPChannel(smtpd.SMTPChannel):
def smtp_RCPT(self, arg):
address = self._SMTPChannel__getaddr('TO:', arg)
domain = address.split('@')[-1]
if domain in ('example.com', 'example-spf.com', 'example-spf-allow-all.com', 'example-invalid-spf.com'):
self._SMTPChannel__rcpttos.append(address)
self.push('250 Ok')
else:
self.push('550 No such user here')
class MailServer(smtpd.SMTPServer):
def handle_accept(self):
conn, addr = self.accept()
channel = RecipientValidatingSMTPChannel(self, conn, addr)
server = MailServer(('localhost', 10025), None)
thread = threading.Thread(target=asyncore.loop, kwargs={'timeout': 1})
thread.start()
yield
server.close()
thread.join()
def test_validate_email_address_bad_query(monkeypatch):
def no_answer(x, y):
raise(dns.resolver.NoAnswer())
def nx_domain(x, y):
raise(dns.resolver.NXDOMAIN())
monkeypatch.setattr('dns.resolver.query', no_answer)
with pytest.raises(ValidationError) as e:
validate_email_address('foo')
assert 'Error' in str(e.value)
monkeypatch.setattr('dns.resolver.query', nx_domain)
with pytest.raises(ValidationError) as e:
validate_email_address('foo')
assert 'Error' in str(e.value)
def test_validate_email_address_socket_error(dns_resolver, monkeypatch):
def socket_error(x, y):
raise(socket.error())
monkeypatch.setattr('smtplib.SMTP.connect', socket_error)
with pytest.raises(ValidationError) as e:
validate_email_address('john.doe@example.com')
assert 'Error while connecting' in str(e.value)
def test_invalid_address(client, admin_user):
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'foobar'})
assert response.status_code == 200
assert 'Enter a valid email address' in response.content
def test_unkown_address(client, admin_user, dns_resolver, smtp_server):
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@unknown.com'})
assert response.status_code == 200
assert 'Email address not found' in response.content
def test_kown_address_nospf(client, admin_user, dns_resolver, smtp_server):
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in response.content
def test_spf_allow_all_mail(client, admin_user, dns_resolver, smtp_server, settings):
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example-spf-allow-all.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in response.content
def test_invalid_spf(client, admin_user, dns_resolver, smtp_server, settings):
settings.ALLOWED_SPF_RECORDS = ['include:example.com']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example-invalid-spf.com'})
assert response.status_code == 200
assert 'No suitable SPF record found' in response.content
def test_strict_nospf(client, admin_user, dns_resolver, smtp_server, monkeypatch):
monkeypatch.setattr('hobo.emails.validators.validate_email_spf.__defaults__', (True,))
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'No suitable SPF record found' in response.content
def test_valid_spf(client, admin_user, dns_resolver, smtp_server, settings):
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example-spf.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in response.content