hobo/tests/test_emails.py

261 lines
11 KiB
Python

import asyncore
import random
import smtpd
import smtplib
import socket
import threading
from unittest import mock
import dns.resolver
import pytest
from django.core.exceptions import ValidationError
from django.utils.encoding import force_str
from dns import name
from dns.rdtypes.ANY import MX, TXT
from test_manager import login
from hobo.emails.validators import validate_email_address
from hobo.environment.models import Combo, ServiceBase, Variable, Wcs
from hobo.test_utils import find_free_port
@pytest.fixture
def port_available():
return find_free_port()
@pytest.fixture
def dns_resolver(monkeypatch, port_available):
def fn(value, kind):
if kind == 'MX':
mx = mock.create_autospec(MX)
mx.exchange = mock.create_autospec(name.Name)
mx.exchange.to_text = mock.MagicMock()
mx.exchange.to_text.return_value = 'localhost:%s' % port_available
mx.preference = mock.create_autospec(name.Name)
mx.preference.to_text = mock.MagicMock()
mx.preference.to_text.return_value = 10
return [mx]
if kind == 'TXT':
txt = mock.create_autospec(TXT)
txt.strings = mock.create_autospec(name.Name)
txt.strings = mock.MagicMock()
if value == 'example-spf.com':
txt.strings = [b'v=spf1 include:allowed_mx.com']
elif value == 'example-spf-allow-all.com':
txt.strings = [b'v=spf1 +all']
elif value == 'example-invalid-spf.com':
txt.strings = [b'v=spf1 include:not_allowed_mx.com']
return [txt]
monkeypatch.setattr(dns.resolver, 'query', fn)
@pytest.fixture
def smtp_server(monkeypatch, port_available):
class RecipientValidatingSMTPChannel(smtpd.SMTPChannel):
def smtp_RCPT(self, arg):
address = self._getaddr(arg)
domain = address[1].split('@')[-1][:-1]
if domain in (
'example.com',
'example-spf.com',
'example-spf-allow-all.com',
'example-invalid-spf.com',
):
self._SMTPChannel__rcpttos.append(address)
self.push('250 Ok')
else:
self.push('550 No such user here')
class MailServer(smtpd.SMTPServer):
def handle_accept(self):
conn, addr = self.accept()
channel = RecipientValidatingSMTPChannel(self, conn, addr)
server = MailServer(('localhost', port_available), None)
thread = threading.Thread(target=asyncore.loop, kwargs={'timeout': 1})
thread.start()
yield
server.close()
thread.join()
def test_validate_email_address_bad_query(monkeypatch):
def no_answer(x, y):
raise (dns.resolver.NoAnswer())
def nx_domain(x, y):
raise (dns.resolver.NXDOMAIN())
monkeypatch.setattr('dns.resolver.query', no_answer)
with pytest.raises(ValidationError) as e:
validate_email_address('foo')
assert 'Error' in str(e.value)
monkeypatch.setattr('dns.resolver.query', nx_domain)
with pytest.raises(ValidationError) as e:
validate_email_address('foo')
assert 'Error' in str(e.value)
def test_validate_email_address_socket_error(dns_resolver, monkeypatch):
with mock.patch('smtplib.SMTP') as SMTP:
SMTP.side_effect = socket.error
with pytest.raises(ValidationError) as e:
validate_email_address('john.doe@example.com')
assert 'Error while connecting' in str(e.value)
with mock.patch('smtplib.SMTP') as SMTP:
SMTP.return_value.helo.side_effect = OSError
with pytest.raises(ValidationError) as e:
validate_email_address('john.doe@example.com')
assert 'Error while connecting' in str(e.value)
with mock.patch('smtplib.SMTP') as SMTP:
SMTP.return_value.helo.return_value = 250, None
SMTP.return_value.rcpt.return_value = 550, None
SMTP.return_value.quit.side_effect = OSError
with pytest.raises(ValidationError) as e:
validate_email_address('john.doe@example.com')
assert 'Error while connecting' in str(e.value)
def test_validate_email_address_bypass(settings):
settings.HOBO_VALIDATE_EMAIL_WITH_SMTP = False
assert validate_email_address('foo') == None
def test_invalid_address(client, admin_user):
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'foobar'})
assert response.status_code == 200
assert 'Enter a valid email address' in force_str(response.content)
def test_unkown_address(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@unknown.com'})
assert response.status_code == 200
assert 'Email address not found' in force_str(response.content)
def test_kown_address_nospf(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_str(
response.content
)
def test_spf_allow_all_mail(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post(
'/emails/', {'default_from_email': 'john.doe@example-spf-allow-all.com'}, follow=True
)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_str(
response.content
)
def test_invalid_spf(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = ['include:example.com']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example-invalid-spf.com'})
assert response.status_code == 200
assert 'No suitable SPF record found' in force_str(response.content)
def test_strict_nospf(client, admin_user, dns_resolver, smtp_server, monkeypatch, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
monkeypatch.setattr('hobo.emails.validators.validate_email_spf.__defaults__', (True,))
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'No suitable SPF record found' in force_str(response.content)
def test_valid_spf(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example-spf.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_str(
response.content
)
def test_no_spf_validation(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = []
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post(
'/emails/', {'default_from_email': 'john.doe@example-invalid-spf.com'}, follow=True
)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_str(
response.content
)
def test_sender_allowed_domains(client, admin_user, dns_resolver, smtp_server, settings, monkeypatch):
settings.HOBO_VALIDATE_EMAIL_WITH_SMTP = False
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Domain example.com is not allowed' in force_str(response.content)
assert 'Emails settings have been updated.' not in force_str(response.content)
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['example.com', 'foo.bar']
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated.' in force_str(response.content)
assert 'Domain example.com is not allowed' not in force_str(response.content)
settings.EMAIL_FROM_ALLOWED_DOMAINS = []
combo = Combo(base_url='https://example.org/test')
combo.save()
monkeypatch.setattr(ServiceBase, 'is_operational', lambda x: True)
response = client.post('/emails/', {'default_from_email': 'john.doe@example.org'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated.' in force_str(response.content)
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Domain example.com is not allowed' in force_str(response.content)
response = client.post('/emails/', {'default_from_email': 'john.doe@brother.example.org'}, follow=True)
assert response.status_code == 200
assert 'Domain brother.example.org is not allowed' in force_str(response.content)
combo.base_url = 'https://www.example.com'
combo.save()
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated.' in force_str(response.content)
def test_emails_view(app, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
app = login(app)
resp = app.get('/emails/')
resp.form['default_from_email'] = 'john.doe@example.com'
resp.form['email_signature'] = 'my signature'
resp.form['global_email_prefix'] = 'my prefix'
resp.form['email_sender_name'] = 'my name'
resp = resp.form.submit()
assert Variable.objects.filter(name='default_from_email')[0].value == 'john.doe@example.com'
assert Variable.objects.filter(name='email_signature')[0].value == 'my signature'
assert Variable.objects.filter(name='global_email_prefix')[0].value == 'my prefix'
assert Variable.objects.filter(name='email_sender_name')[0].value == 'my name'