hobo/tests/test_emails.py

260 lines
11 KiB
Python

import asyncore # noqa pylint: disable=deprecated-module
import smtpd # noqa pylint: disable=deprecated-module
import socket
import threading
from unittest import mock
import dns.resolver
import pytest
from django.core.exceptions import ValidationError
from django.utils.encoding import force_str
from dns import name
from dns.rdtypes.ANY import MX, TXT
from hobo.emails.validators import validate_email_address
from hobo.environment.models import Combo, ServiceBase, Variable
from hobo.test_utils import find_free_port
from .test_manager import login
@pytest.fixture
def port_available():
return find_free_port()
@pytest.fixture
def dns_resolver(monkeypatch, port_available):
def fn(value, kind):
if kind == 'MX':
mx = mock.create_autospec(MX)
mx.exchange = mock.create_autospec(name.Name)
mx.exchange.to_text = mock.MagicMock()
mx.exchange.to_text.return_value = 'localhost:%s' % port_available
mx.preference = mock.create_autospec(name.Name)
mx.preference.to_text = mock.MagicMock()
mx.preference.to_text.return_value = 10
return [mx]
if kind == 'TXT':
txt = mock.create_autospec(TXT)
txt.strings = mock.create_autospec(name.Name)
txt.strings = mock.MagicMock()
if value == 'example-spf.com':
txt.strings = [b'v=spf1 include:allowed_mx.com']
elif value == 'example-spf-allow-all.com':
txt.strings = [b'v=spf1 +all']
elif value == 'example-invalid-spf.com':
txt.strings = [b'v=spf1 include:not_allowed_mx.com']
return [txt]
monkeypatch.setattr(dns.resolver, 'query', fn)
@pytest.fixture
def smtp_server(monkeypatch, port_available):
class RecipientValidatingSMTPChannel(smtpd.SMTPChannel):
def smtp_RCPT(self, arg):
address = self._getaddr(arg)
domain = address[1].split('@')[-1][:-1]
if domain in (
'example.com',
'example-spf.com',
'example-spf-allow-all.com',
'example-invalid-spf.com',
):
self._SMTPChannel__rcpttos.append(address)
self.push('250 Ok')
else:
self.push('550 No such user here')
class MailServer(smtpd.SMTPServer):
def handle_accept(self):
conn, addr = self.accept()
RecipientValidatingSMTPChannel(self, conn, addr)
server = MailServer(('localhost', port_available), None)
thread = threading.Thread(target=asyncore.loop, kwargs={'timeout': 1})
thread.start()
yield
server.close()
thread.join()
def test_validate_email_address_bad_query(monkeypatch):
def no_answer(x, y):
raise (dns.resolver.NoAnswer())
def nx_domain(x, y):
raise (dns.resolver.NXDOMAIN())
monkeypatch.setattr('dns.resolver.query', no_answer)
with pytest.raises(ValidationError) as e:
validate_email_address('foo')
assert 'Error' in str(e.value)
monkeypatch.setattr('dns.resolver.query', nx_domain)
with pytest.raises(ValidationError) as e:
validate_email_address('foo')
assert 'Error' in str(e.value)
def test_validate_email_address_socket_error(dns_resolver, monkeypatch):
with mock.patch('smtplib.SMTP') as SMTP:
SMTP.side_effect = socket.error
with pytest.raises(ValidationError) as e:
validate_email_address('john.doe@example.com')
assert 'Error while connecting' in str(e.value)
with mock.patch('smtplib.SMTP') as SMTP:
SMTP.return_value.helo.side_effect = OSError
with pytest.raises(ValidationError) as e:
validate_email_address('john.doe@example.com')
assert 'Error while connecting' in str(e.value)
with mock.patch('smtplib.SMTP') as SMTP:
SMTP.return_value.helo.return_value = 250, None
SMTP.return_value.rcpt.return_value = 550, None
SMTP.return_value.quit.side_effect = OSError
with pytest.raises(ValidationError) as e:
validate_email_address('john.doe@example.com')
assert 'Error while connecting' in str(e.value)
def test_validate_email_address_bypass(settings):
settings.HOBO_VALIDATE_EMAIL_WITH_SMTP = False
assert validate_email_address('foo') is None
def test_invalid_address(client, admin_user):
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'foobar'})
assert response.status_code == 200
assert 'Enter a valid email address' in force_str(response.content)
def test_unkown_address(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@unknown.com'})
assert response.status_code == 200
assert 'Email address not found' in force_str(response.content)
def test_kown_address_nospf(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_str(
response.content
)
def test_spf_allow_all_mail(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post(
'/emails/', {'default_from_email': 'john.doe@example-spf-allow-all.com'}, follow=True
)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_str(
response.content
)
def test_invalid_spf(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = ['include:example.com']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example-invalid-spf.com'})
assert response.status_code == 200
assert 'No suitable SPF record found' in force_str(response.content)
def test_strict_nospf(client, admin_user, dns_resolver, smtp_server, monkeypatch, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
monkeypatch.setattr('hobo.emails.validators.validate_email_spf.__defaults__', (True,))
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'No suitable SPF record found' in force_str(response.content)
def test_valid_spf(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example-spf.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_str(
response.content
)
def test_no_spf_validation(client, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = []
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post(
'/emails/', {'default_from_email': 'john.doe@example-invalid-spf.com'}, follow=True
)
assert response.status_code == 200
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_str(
response.content
)
def test_sender_allowed_domains(client, admin_user, dns_resolver, smtp_server, settings, monkeypatch):
settings.HOBO_VALIDATE_EMAIL_WITH_SMTP = False
client.post('/login/', {'username': 'admin', 'password': 'password'})
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Domain example.com is not allowed' in force_str(response.content)
assert 'Emails settings have been updated.' not in force_str(response.content)
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['example.com', 'foo.bar']
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated.' in force_str(response.content)
assert 'Domain example.com is not allowed' not in force_str(response.content)
settings.EMAIL_FROM_ALLOWED_DOMAINS = []
combo = Combo(base_url='https://example.org/test')
combo.save()
monkeypatch.setattr(ServiceBase, 'is_operational', lambda x: True)
response = client.post('/emails/', {'default_from_email': 'john.doe@example.org'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated.' in force_str(response.content)
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Domain example.com is not allowed' in force_str(response.content)
response = client.post('/emails/', {'default_from_email': 'john.doe@brother.example.org'}, follow=True)
assert response.status_code == 200
assert 'Domain brother.example.org is not allowed' in force_str(response.content)
combo.base_url = 'https://www.example.com'
combo.save()
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
assert response.status_code == 200
assert 'Emails settings have been updated.' in force_str(response.content)
def test_emails_view(app, admin_user, dns_resolver, smtp_server, settings):
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
app = login(app)
resp = app.get('/emails/')
resp.form['default_from_email'] = 'john.doe@example.com'
resp.form['email_signature'] = 'my signature'
resp.form['global_email_prefix'] = 'my prefix'
resp.form['email_sender_name'] = 'my name'
resp = resp.form.submit()
assert Variable.objects.filter(name='default_from_email')[0].value == 'john.doe@example.com'
assert Variable.objects.filter(name='email_signature')[0].value == 'my signature'
assert Variable.objects.filter(name='global_email_prefix')[0].value == 'my prefix'
assert Variable.objects.filter(name='email_sender_name')[0].value == 'my name'