260 lines
11 KiB
Python
260 lines
11 KiB
Python
import asyncore # noqa pylint: disable=deprecated-module
|
|
import smtpd # noqa pylint: disable=deprecated-module
|
|
import socket
|
|
import threading
|
|
from unittest import mock
|
|
|
|
import dns.resolver
|
|
import pytest
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils.encoding import force_str
|
|
from dns import name
|
|
from dns.rdtypes.ANY import MX, TXT
|
|
|
|
from hobo.emails.validators import validate_email_address
|
|
from hobo.environment.models import Combo, ServiceBase, Variable
|
|
from hobo.test_utils import find_free_port
|
|
|
|
from .test_manager import login
|
|
|
|
|
|
@pytest.fixture
|
|
def port_available():
|
|
return find_free_port()
|
|
|
|
|
|
@pytest.fixture
|
|
def dns_resolver(monkeypatch, port_available):
|
|
def fn(value, kind):
|
|
if kind == 'MX':
|
|
mx = mock.create_autospec(MX)
|
|
mx.exchange = mock.create_autospec(name.Name)
|
|
mx.exchange.to_text = mock.MagicMock()
|
|
mx.exchange.to_text.return_value = 'localhost:%s' % port_available
|
|
mx.preference = mock.create_autospec(name.Name)
|
|
mx.preference.to_text = mock.MagicMock()
|
|
mx.preference.to_text.return_value = 10
|
|
return [mx]
|
|
if kind == 'TXT':
|
|
txt = mock.create_autospec(TXT)
|
|
txt.strings = mock.create_autospec(name.Name)
|
|
txt.strings = mock.MagicMock()
|
|
if value == 'example-spf.com':
|
|
txt.strings = [b'v=spf1 include:allowed_mx.com']
|
|
elif value == 'example-spf-allow-all.com':
|
|
txt.strings = [b'v=spf1 +all']
|
|
elif value == 'example-invalid-spf.com':
|
|
txt.strings = [b'v=spf1 include:not_allowed_mx.com']
|
|
return [txt]
|
|
|
|
monkeypatch.setattr(dns.resolver, 'query', fn)
|
|
|
|
|
|
@pytest.fixture
|
|
def smtp_server(monkeypatch, port_available):
|
|
class RecipientValidatingSMTPChannel(smtpd.SMTPChannel):
|
|
def smtp_RCPT(self, arg):
|
|
address = self._getaddr(arg)
|
|
domain = address[1].split('@')[-1][:-1]
|
|
if domain in (
|
|
'example.com',
|
|
'example-spf.com',
|
|
'example-spf-allow-all.com',
|
|
'example-invalid-spf.com',
|
|
):
|
|
self._SMTPChannel__rcpttos.append(address)
|
|
self.push('250 Ok')
|
|
else:
|
|
self.push('550 No such user here')
|
|
|
|
class MailServer(smtpd.SMTPServer):
|
|
def handle_accept(self):
|
|
conn, addr = self.accept()
|
|
RecipientValidatingSMTPChannel(self, conn, addr)
|
|
|
|
server = MailServer(('localhost', port_available), None)
|
|
thread = threading.Thread(target=asyncore.loop, kwargs={'timeout': 1})
|
|
thread.start()
|
|
yield
|
|
server.close()
|
|
thread.join()
|
|
|
|
|
|
def test_validate_email_address_bad_query(monkeypatch):
|
|
def no_answer(x, y):
|
|
raise (dns.resolver.NoAnswer())
|
|
|
|
def nx_domain(x, y):
|
|
raise (dns.resolver.NXDOMAIN())
|
|
|
|
monkeypatch.setattr('dns.resolver.query', no_answer)
|
|
with pytest.raises(ValidationError) as e:
|
|
validate_email_address('foo')
|
|
assert 'Error' in str(e.value)
|
|
monkeypatch.setattr('dns.resolver.query', nx_domain)
|
|
with pytest.raises(ValidationError) as e:
|
|
validate_email_address('foo')
|
|
assert 'Error' in str(e.value)
|
|
|
|
|
|
def test_validate_email_address_socket_error(dns_resolver, monkeypatch):
|
|
with mock.patch('smtplib.SMTP') as SMTP:
|
|
SMTP.side_effect = socket.error
|
|
with pytest.raises(ValidationError) as e:
|
|
validate_email_address('john.doe@example.com')
|
|
assert 'Error while connecting' in str(e.value)
|
|
|
|
with mock.patch('smtplib.SMTP') as SMTP:
|
|
SMTP.return_value.helo.side_effect = OSError
|
|
with pytest.raises(ValidationError) as e:
|
|
validate_email_address('john.doe@example.com')
|
|
assert 'Error while connecting' in str(e.value)
|
|
|
|
with mock.patch('smtplib.SMTP') as SMTP:
|
|
SMTP.return_value.helo.return_value = 250, None
|
|
SMTP.return_value.rcpt.return_value = 550, None
|
|
SMTP.return_value.quit.side_effect = OSError
|
|
with pytest.raises(ValidationError) as e:
|
|
validate_email_address('john.doe@example.com')
|
|
assert 'Error while connecting' in str(e.value)
|
|
|
|
|
|
def test_validate_email_address_bypass(settings):
|
|
settings.HOBO_VALIDATE_EMAIL_WITH_SMTP = False
|
|
assert validate_email_address('foo') is None
|
|
|
|
|
|
def test_invalid_address(client, admin_user):
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post('/emails/', {'default_from_email': 'foobar'})
|
|
assert response.status_code == 200
|
|
assert 'Enter a valid email address' in force_str(response.content)
|
|
|
|
|
|
def test_unkown_address(client, admin_user, dns_resolver, smtp_server, settings):
|
|
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@unknown.com'})
|
|
assert response.status_code == 200
|
|
assert 'Email address not found' in force_str(response.content)
|
|
|
|
|
|
def test_kown_address_nospf(client, admin_user, dns_resolver, smtp_server, settings):
|
|
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
|
|
assert response.status_code == 200
|
|
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_str(
|
|
response.content
|
|
)
|
|
|
|
|
|
def test_spf_allow_all_mail(client, admin_user, dns_resolver, smtp_server, settings):
|
|
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post(
|
|
'/emails/', {'default_from_email': 'john.doe@example-spf-allow-all.com'}, follow=True
|
|
)
|
|
assert response.status_code == 200
|
|
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_str(
|
|
response.content
|
|
)
|
|
|
|
|
|
def test_invalid_spf(client, admin_user, dns_resolver, smtp_server, settings):
|
|
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
|
|
settings.ALLOWED_SPF_RECORDS = ['include:example.com']
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@example-invalid-spf.com'})
|
|
assert response.status_code == 200
|
|
assert 'No suitable SPF record found' in force_str(response.content)
|
|
|
|
|
|
def test_strict_nospf(client, admin_user, dns_resolver, smtp_server, monkeypatch, settings):
|
|
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
|
|
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
|
|
monkeypatch.setattr('hobo.emails.validators.validate_email_spf.__defaults__', (True,))
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
|
|
assert response.status_code == 200
|
|
assert 'No suitable SPF record found' in force_str(response.content)
|
|
|
|
|
|
def test_valid_spf(client, admin_user, dns_resolver, smtp_server, settings):
|
|
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
|
|
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@example-spf.com'}, follow=True)
|
|
assert response.status_code == 200
|
|
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_str(
|
|
response.content
|
|
)
|
|
|
|
|
|
def test_no_spf_validation(client, admin_user, dns_resolver, smtp_server, settings):
|
|
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
|
|
settings.ALLOWED_SPF_RECORDS = []
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post(
|
|
'/emails/', {'default_from_email': 'john.doe@example-invalid-spf.com'}, follow=True
|
|
)
|
|
assert response.status_code == 200
|
|
assert 'Emails settings have been updated. It will take a few seconds to be effective.' in force_str(
|
|
response.content
|
|
)
|
|
|
|
|
|
def test_sender_allowed_domains(client, admin_user, dns_resolver, smtp_server, settings, monkeypatch):
|
|
settings.HOBO_VALIDATE_EMAIL_WITH_SMTP = False
|
|
|
|
client.post('/login/', {'username': 'admin', 'password': 'password'})
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
|
|
assert response.status_code == 200
|
|
assert 'Domain example.com is not allowed' in force_str(response.content)
|
|
assert 'Emails settings have been updated.' not in force_str(response.content)
|
|
|
|
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['example.com', 'foo.bar']
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
|
|
assert response.status_code == 200
|
|
assert 'Emails settings have been updated.' in force_str(response.content)
|
|
assert 'Domain example.com is not allowed' not in force_str(response.content)
|
|
|
|
settings.EMAIL_FROM_ALLOWED_DOMAINS = []
|
|
combo = Combo(base_url='https://example.org/test')
|
|
combo.save()
|
|
monkeypatch.setattr(ServiceBase, 'is_operational', lambda x: True)
|
|
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@example.org'}, follow=True)
|
|
assert response.status_code == 200
|
|
assert 'Emails settings have been updated.' in force_str(response.content)
|
|
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
|
|
assert response.status_code == 200
|
|
assert 'Domain example.com is not allowed' in force_str(response.content)
|
|
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@brother.example.org'}, follow=True)
|
|
assert response.status_code == 200
|
|
assert 'Domain brother.example.org is not allowed' in force_str(response.content)
|
|
|
|
combo.base_url = 'https://www.example.com'
|
|
combo.save()
|
|
response = client.post('/emails/', {'default_from_email': 'john.doe@example.com'}, follow=True)
|
|
assert response.status_code == 200
|
|
assert 'Emails settings have been updated.' in force_str(response.content)
|
|
|
|
|
|
def test_emails_view(app, admin_user, dns_resolver, smtp_server, settings):
|
|
settings.EMAIL_FROM_ALLOWED_DOMAINS = ['*']
|
|
settings.ALLOWED_SPF_RECORDS = ['include:allowed_mx.com']
|
|
app = login(app)
|
|
resp = app.get('/emails/')
|
|
resp.form['default_from_email'] = 'john.doe@example.com'
|
|
resp.form['email_signature'] = 'my signature'
|
|
resp.form['global_email_prefix'] = 'my prefix'
|
|
resp.form['email_sender_name'] = 'my name'
|
|
resp = resp.form.submit()
|
|
assert Variable.objects.filter(name='default_from_email')[0].value == 'john.doe@example.com'
|
|
assert Variable.objects.filter(name='email_signature')[0].value == 'my signature'
|
|
assert Variable.objects.filter(name='global_email_prefix')[0].value == 'my prefix'
|
|
assert Variable.objects.filter(name='email_sender_name')[0].value == 'my name'
|