Use external smtp for testing, remove local smtpd.

This commit is contained in:
Sergey Lavrinenko 2015-03-29 03:35:18 +03:00
parent 98ebd58879
commit 30eda38067
23 changed files with 154 additions and 743 deletions

2
.gitignore vendored
View File

@ -1,5 +1,5 @@
local_settings.py
local_*_settings.py
local_*.py
*.py[cod]
# C extensions

View File

@ -40,6 +40,7 @@ class SMTPResponse(Response):
self.status_code = None
self.status_text = None
self.last_command = None
self.refused_recipient = {}
def set_status(self, command, code, text, **kwargs):
self.responses.append([command, code, text, kwargs])

View File

@ -95,7 +95,7 @@ class SMTPClientWithResponse(SMTP):
if len(response.refused_recipients) == len(to_addrs):
# the server refused all our recipients
self._rset()
exc = smtplib.SMTPRecipientsRefused(response.refused_recipient)
exc = smtplib.SMTPRecipientsRefused(response.refused_recipients)
response.set_exception(exc)
return response

View File

@ -1,119 +1,18 @@
# encoding: utf-8
import subprocess
import shlex
import time
import logging
import threading
import os
import os.path
import datetime
import pytest
import base64
import time
import random
import sys
import platform
from emails.compat import to_native, is_py3, to_unicode
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()
TEST_SMTP_PORT = 25125
class TestSmtpServer:
def __init__(self, host=None, port=None):
self._process = None
self.host = host or 'localhost'
self.port = port or TEST_SMTP_PORT
self.lock = threading.Lock()
def get_server(self):
if self._process is None:
CMD = 'python -m smtpd -d -n -c DebuggingServer %s:%s' % (self.host, self.port)
self._process = subprocess.Popen(shlex.split(CMD), shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
logger.error('Started test smtp server "%s", pid: %s', CMD, self._process.pid)
#print('Started test smtp server "{0}", pid: {1}'.format(CMD, self._process.pid))
time.sleep(1)
return self
def stop(self):
if self._process:
logger.error('kill process...')
self._process.terminate()
class SecureSMTPDServer(object):
def __init__(self):
self._cwd = os.path.join(os.path.dirname(__file__), 'contrib/local-smtpd')
self._process = None
self.host = 'localhost'
self.user = 'A'
self.password = 'B'
self.argv = None
def as_dict(self):
r = {'host': self.host, 'port': self.port, 'fail_silently': False, 'debug': 1}
argv = self.argv or []
if 'ssl' in argv:
r['ssl'] = True
if 'auth' in argv:
r.update({'user': self.user, 'password': self.password})
return r
def get_server(self, argv=None):
if self._process is None:
self.argv = argv or []
if 'ssl' in self.argv:
self.port = 25126
elif 'auth' in self.argv:
self.port = 25127
else:
self.port = 25125
cmd = '/bin/sh ./run-smtpd.sh'.split(' ')
if argv:
cmd.extend(argv)
self._process = subprocess.Popen(cmd, shell=False, cwd=self._cwd)
logger.error('Started test smtp server "%s", pid: %s', cmd, self._process.pid)
#print('Started test smtp server "{0}", pid: {1}'.format(CMD, self._process.pid))
time.sleep(1)
return self
def stop(self):
if self._process:
logger.error('kill process...')
self._process.terminate()
time.sleep(1)
@pytest.fixture(scope="module")
def smtp_server(request):
logger.debug('smtp_server...')
ext_server = SecureSMTPDServer()
def fin():
print ("stopping ext_server")
ext_server.stop()
request.addfinalizer(fin)
return ext_server.get_server()
@pytest.fixture(scope="module")
def smtp_server_with_auth(request):
logger.debug('smtp_server with auth...')
ext_server = SecureSMTPDServer()
def fin():
print ("stopping ext_server with auth")
ext_server.stop()
request.addfinalizer(fin)
return ext_server.get_server(['auth'])
@pytest.fixture(scope="module")
def smtp_server_with_ssl(request):
logger.debug('smtp_server with ssl...')
ext_server = SecureSMTPDServer()
def fin():
print ("stopping ext_server with auth")
ext_server.stop()
request.addfinalizer(fin)
return ext_server.get_server(['ssl'])
@pytest.fixture(scope='module')
def django_email_backend(request):
@ -125,76 +24,87 @@ def django_email_backend(request):
return get_connection()
def obsfucate(key, clear):
enc = []
for i in range(len(clear)):
key_c = key[i % len(key)]
enc_c = chr((ord(clear[i]) + ord(key_c)) % 256)
enc.append(enc_c)
return base64.urlsafe_b64encode("".join(enc))
def deobsfucate(key, enc):
dec = []
key = to_native(key)
enc = base64.urlsafe_b64decode(enc)
for i in range(len(enc)):
key_c = key[i % len(key)]
if is_py3:
c1 = enc[i]
else:
c1 = ord(enc[i])
dec_c = chr((256 + c1 - ord(key_c)) % 256)
dec.append(dec_c)
return "".join(dec)
assert 0
class SMTPTestParams:
subject_prefix = '[test-python-emails]'
subject_prefix = '[python-emails]'
def __init__(self, from_email=None, to_email=None, defaults=None, **kw):
params = {}
params = {'fail_silently': True, 'debug': 1, 'timeout': 25}
params.update(defaults or {})
params.update(kw)
params['debug'] = 1
params['timeout'] = 15
self.params = params
pwd = params.get('password')
if pwd and pwd.startswith('#e:'):
user = params.get('user')
params['password'] = deobsfucate(user, pwd[3:])
self.from_email = from_email
self.to_email = to_email
def patch_message(self, message):
# Some SMTP requires from and to emails
"""
Some SMTP requires from and to emails
"""
if self.from_email:
message._mail_from = (message._mail_from[0], self.from_email)
message.mail_from = (message.mail_from[0], self.from_email)
if self.to_email:
message.mail_to = self.to_email
# TODO: this code breaks template in subject; deal with this
message.subject = " ".join([self.subject_prefix, datetime.datetime.now().strftime('%H:%M:%S'),
message.subject])
# TODO: this code breaks template in subject; fix it
if not to_unicode(message.subject).startswith(self.subject_prefix) :
message.subject = " ".join([self.subject_prefix, message.subject,
'py%s' % sys.version[:3]])
message._headers['X-Test-Date'] = datetime.datetime.utcnow().isoformat()
message._headers['X-Python-Version'] = "%s/%s" % (platform.python_version(), platform.platform())
return message
def __str__(self):
return u'SMTPTestParams(host={0}, port={1}, user={2})'.format(self.params.get('host'),
self.params.get('port'),
self.params.get('user'))
return u'SMTPTestParams({user}@{host}:{port})'.format(host=self.params.get('host'),
port=self.params.get('port'),
user=self.params.get('user', ''))
def sleep(self):
if 'mailtrap' in self.params.get('host', ''):
t = 2 + random.randint(0, 2)
else:
t = 0.5
time.sleep(t)
@pytest.fixture(scope='module')
def smtp_servers(request):
r = []
"""
r.append(SMTPTestParams(from_email='drlavr@yandex.ru',
to_email='drlavr@yandex.ru',
fail_silently=False,
**{'host': 'mx.yandex.ru', 'port': 25, 'ssl': False}))
r.append(SMTPTestParams(from_email='drlavr+togmail@yandex.ru',
to_email='s.lavrinenko@gmail.com',
fail_silently=False,
**{'host': 'gmail-smtp-in.l.google.com', 'port': 25, 'ssl': False}))
r.append(SMTPTestParams(from_email='drlavr@yandex.ru',
to_email='s.lavrinenko@me.com',
fail_silently=False,
**{'host': 'mx3.mail.icloud.com', 'port': 25, 'ssl': False}))
"""
r.append(SMTPTestParams(from_email='drlavr@yandex.ru',
to_email='lavr@outlook.com',
fail_silently=False,
**{'host': 'mx1.hotmail.com', 'port': 25, 'ssl': False}))
try:
from .local_smtp_settings import SMTP_SETTINGS_WITH_AUTH, FROM_EMAIL, TO_EMAIL
r.append(SMTPTestParams(from_email=FROM_EMAIL,
to_email=TO_EMAIL,
fail_silently=False,
**SMTP_SETTINGS_WITH_AUTH))
from .local_smtp_severs import SERVERS
except ImportError:
pass
from .smtp_servers import SERVERS
return r
return dict([(k, SMTPTestParams(**v)) for k, v in SERVERS.items()])

View File

@ -1,17 +0,0 @@
-----BEGIN CERTIFICATE-----
MIICsjCCAhugAwIBAgIJAPJ/FysSCcu+MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwIBcNMTUwMjI1MjM1NDAyWhgPMjExNTAyMDEyMzU0MDJa
MEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJ
bnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJ
AoGBAKfPd+Qa+/z1phbnV2sivOIfkd+wvroTmthnIaMDcun1DBCfyNw8byvBIwZv
Z0Fvco+zb4eBs3ZBHsiLYi9WXdU2NmYr0mcKauDOJW0lE7eOzCUP4Bq2XBgVqf6x
TXTNUaFrwZkxTA221NsVRqK3fytcXBi3a3zIvYCcvyCjNPiPAgMBAAGjgacwgaQw
HQYDVR0OBBYEFESV2wfHVrhtIwlGDQ7IXRKZpZoeMHUGA1UdIwRuMGyAFESV2wfH
VrhtIwlGDQ7IXRKZpZoeoUmkRzBFMQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29t
ZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkggkA8n8X
KxIJy74wDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOBgQCgZOIVvCdM4sY+
OKLkxWEZr2zztafE9nHV77VXYMp8u3IOo2Iz4Ygn60SxzHEpWUUo44M9/1i7vkDQ
P0o/kPhzEVj9Fnx51YPqOakB+r9e25Enti2WkGdD0TUPC5IthcQQk0dnzwQG24mp
gfhg8GIWdqmopRbcW3DWgSqh45sB8g==
-----END CERTIFICATE-----

View File

@ -1,15 +0,0 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXQIBAAKBgQCnz3fkGvv89aYW51drIrziH5HfsL66E5rYZyGjA3Lp9QwQn8jc
PG8rwSMGb2dBb3KPs2+HgbN2QR7Ii2IvVl3VNjZmK9JnCmrgziVtJRO3jswlD+Aa
tlwYFan+sU10zVGha8GZMUwNttTbFUait38rXFwYt2t8yL2AnL8gozT4jwIDAQAB
AoGAcfo/Y0ZUuxaaDdppjNIWWru4l6dzk+028h7yQMdZ6MBQxoXQpo3BsIVI5dkK
1+37cNEeQnp8yygl4W6SbLaLmehakK485s8hGD/ZgPS+CJmyjF2jr6BSzKVEjYtH
7+eMKBgZ3MjRC8Rr+QjnUZa65Pd6YVShqWcqiSjFQic4aEECQQDcNnTdPLXkaQwR
pXFuP70arH257vXUzaLi0c+I1UXgUR++KGA4yhJWZCp86Ik7R2mkHvgBdnO7KW3B
8LXbdX9vAkEAwxTpONYcMDRXSwOrAFyuQimpBoTBu0e/c3gA37tvmaaIPpK/V4pV
wMXpxgv4eTUJxf/N+ZcqTSJt8fDSIBOI4QJBANhcrO/eQXyc9Z205sEC4QL/LTxt
G54tOPgQWw8/NLuUGVMViozhhajaG6DEPGlA3fvB7bxKLKVcrBlcLuHkDQMCQQCV
EL99fK4hb31chrr+FdPaHrdXkc3va02xz/rq+vC1+fiVx9CJ9dy85v5RJQiCpbKI
J4WeuJHMSwi0HQ6TEBpBAkAPIphfofNE8lkIpJ1ocG7687zsmZDXJdzG4RToW4Un
UxHn0Mb3b39vkKIwr47W32fSyuZ7rG7YF8D1tR1B2oPN
-----END RSA PRIVATE KEY-----

View File

@ -1,37 +0,0 @@
# encoding: utf-8
import logging
from secure_smtpd import SMTPServer, LOG_NAME
import sys
class SSLSMTPServer(SMTPServer):
def process_message(self, peer, mailfrom, rcpttos, message_data):
print(message_data)
with open('secure-smtpd.log', 'a') as f:
f.write(message_data)
f.write('\n\n')
class MyCredentialValidator(object):
def validate(self, username, password):
if username == 'A' and password == 'B':
return True
return False
logger = logging.getLogger(LOG_NAME)
logger.setLevel(logging.INFO)
params = {}
port = 25125
if 'auth' in sys.argv:
params.update({'require_authentication': True, 'credential_validator': MyCredentialValidator()})
port = 25127
if 'ssl' in sys.argv:
params.update({'ssl': True, 'certfile': 'example.crt', 'keyfile': 'example.key'})
port = 25126
if 'timeout':
params.update({'maximum_execution_time': 10.0})
server = SSLSMTPServer(('127.0.0.1', port), None, **params)
server.run()

View File

@ -1,14 +0,0 @@
#!/bin/sh
# Looks like python3 has a bug in smtpd (SMTPSenderRefused: (503, 'Error: send HELO first'...)
# So we try to start python2
if which python2.7; then
PYTHON=python2.7
elif which python2.6; then
PYTHON=python2.6
else
PYTHON=python
fi
echo "use python $PYTHON"
$PYTHON run-smtpd.py $@

View File

@ -1,15 +0,0 @@
Copyright (c) 2014, Benjamin Coe <bencoe@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any purpose
with or without fee is hereby granted, provided that the above copyright notice
and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.

View File

@ -1,4 +0,0 @@
from . import config
from .config import LOG_NAME
from .smtp_server import SMTPServer
from .proxy_server import ProxyServer

View File

@ -1,2 +0,0 @@
from . import log
from .log import LOG_NAME

View File

@ -1,33 +0,0 @@
import logging, sys
from logging.handlers import RotatingFileHandler
from logging import StreamHandler
LOG_NAME = 'secure-smtpd'
class Log(object):
def __init__(self, log_name):
self.log_name = log_name
self.logger = logging.getLogger( self.log_name )
self._remove_handlers()
self._add_handler()
self.logger.setLevel(logging.DEBUG)
def _remove_handlers(self):
for handler in self.logger.handlers:
self.logger.removeHandler(handler)
def _add_handler(self):
try:
handler = RotatingFileHandler(
'/var/log/%s.log' % self.log_name,
maxBytes=10485760,
backupCount=3
)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
self.logger.addHandler(handler)
except IOError:
self.logger.addHandler(StreamHandler(sys.stderr))
Log(LOG_NAME)

View File

@ -1,16 +0,0 @@
import time
from multiprocessing import Process, Queue
class ProcessPool(object):
def __init__(self, func, process_count=5):
self.func = func
self.process_count = process_count
self.queue = Queue()
self._create_processes()
def _create_processes(self):
for i in range(0, self.process_count):
process = Process(target=self.func, args=[self.queue])
process.daemon = True
process.start()

View File

@ -1,95 +0,0 @@
import socket
import smtplib
import secure_smtpd
from .smtp_server import SMTPServer
from .store_credentials import StoreCredentials
class ProxyServer(SMTPServer):
"""Implements an open relay. Inherits from secure_smtpd, so can handle
SSL incoming. Modifies attributes slightly:
* if "ssl" is true accepts SSL connections inbound and connects via SSL
outbound
* adds "ssl_out_only", which can be set to True when "ssl" is False so that
inbound connections are in plain text but outbound are in SSL
* adds "debug", which if True copies all inbound messages to logger.info()
* ignores any credential validators, passing any credentials upstream
"""
def __init__(self, *args, **kwargs):
self.ssl_out_only = False
if 'ssl_out_only' in kwargs:
self.ssl_out_only = kwargs.pop('ssl_out_only')
self.debug = False
if 'debug' in kwargs:
self.debug = kwargs.pop('debug')
kwargs['credential_validator'] = StoreCredentials()
SMTPServer.__init__(self, *args, **kwargs)
def process_message(self, peer, mailfrom, rcpttos, data):
if self.debug:
# ------------------------
# stolen directly from stmpd.DebuggingServer
inheaders = 1
lines = data.split('\n')
self.logger.info('---------- MESSAGE FOLLOWS ----------')
for line in lines:
# headers first
if inheaders and not line:
self.logger.info('X-Peer: %s', peer[0])
inheaders = 0
self.logger.info(line)
self.logger.info('------------ END MESSAGE ------------')
# ------------------------
# following code is direct from smtpd.PureProxy
lines = data.split('\n')
# Look for the last header
i = 0
for line in lines:
if not line:
break
i += 1
lines.insert(i, 'X-Peer: %s' % peer[0])
data = '\n'.join(lines)
self._deliver(mailfrom, rcpttos, data)
def _deliver(self, mailfrom, rcpttos, data):
# ------------------------
# following code is adapted from smtpd.PureProxy with modifications to
# handle upstream SSL
refused = {}
try:
if self.ssl or self.ssl_out_only:
s = smtplib.SMTP_SSL()
else:
s = smtplib.SMTP()
s.connect(self._remoteaddr[0], self._remoteaddr[1])
if self.credential_validator.stored:
# we had credentials passed in, use them
s.login(
self.credential_validator.username,
self.credential_validator.password
)
try:
refused = s.sendmail(mailfrom, rcpttos, data)
if refused != {}:
self.logger.error('some connections refused %s', refused)
finally:
s.quit()
except smtplib.SMTPRecipientsRefused as e:
self.logger.exception('')
refused = e.recipients
except (socket.error, smtplib.SMTPException) as e:
self.logger.exception('')
# All recipients were refused. If the exception had an associated
# error code, use it. Otherwise,fake it with a non-triggering
# exception code.
errcode = getattr(e, 'smtp_code', -1)
errmsg = getattr(e, 'smtp_error', 'ignore')
for r in rcpttos:
refused[r] = (errcode, errmsg)
return refused

View File

@ -1,161 +0,0 @@
import secure_smtpd
import smtpd, base64, secure_smtpd, asynchat, logging
from asyncore import ExitNow
from smtpd import NEWLINE, EMPTYSTRING
def decode_b64(data):
'''Wrapper for b64decode, without having to struggle with bytestrings.'''
byte_string = data.encode('utf-8')
decoded = base64.b64decode(byte_string)
return decoded.decode('utf-8')
def encode_b64(data):
'''Wrapper for b64encode, without having to struggle with bytestrings.'''
byte_string = data.encode('utf-8')
encoded = base64.b64encode(byte_string)
return encoded.decode('utf-8')
class SMTPChannel(smtpd.SMTPChannel):
def __init__(self, smtp_server, newsocket, fromaddr, require_authentication=False, credential_validator=None, map=None):
smtpd.SMTPChannel.__init__(self, smtp_server, newsocket, fromaddr)
asynchat.async_chat.__init__(self, newsocket, map=map)
self.require_authentication = require_authentication
self.authenticating = False
self.authenticated = False
self.username = None
self.password = None
self.credential_validator = credential_validator
self.logger = logging.getLogger( secure_smtpd.LOG_NAME )
def smtp_QUIT(self, arg):
self.push('221 Bye')
self.close_when_done()
raise ExitNow()
def collect_incoming_data(self, data):
if not isinstance(data, str):
# We're on python3, so we have to decode the bytestring
data = data.decode('utf-8')
self.__line.append(data)
def smtp_EHLO(self, arg):
if not arg:
self.push('501 Syntax: HELO hostname')
return
if self.__greeting:
self.push('503 Duplicate HELO/EHLO')
else:
self.push('250-%s Hello %s' % (self.__fqdn, arg))
self.push('250-AUTH LOGIN PLAIN')
self.push('250 EHLO')
def smtp_AUTH(self, arg):
if 'PLAIN' in arg:
split_args = arg.split(' ')
# second arg is Base64-encoded string of blah\0username\0password
authbits = decode_b64(split_args[1]).split('\0')
self.username = authbits[1]
self.password = authbits[2]
if self.credential_validator and self.credential_validator.validate(self.username, self.password):
self.authenticated = True
self.push('235 Authentication successful.')
else:
self.push('454 Temporary authentication failure.')
raise ExitNow()
elif 'LOGIN' in arg:
self.authenticating = True
split_args = arg.split(' ')
# Some implmentations of 'LOGIN' seem to provide the username
# along with the 'LOGIN' stanza, hence both situations are
# handled.
if len(split_args) == 2:
self.username = decode_b64(arg.split(' ')[1])
self.push('334 ' + encode_b64('Username'))
else:
self.push('334 ' + encode_b64('Username'))
elif not self.username:
self.username = decode_b64(arg)
self.push('334 ' + encode_b64('Password'))
else:
self.authenticating = False
self.password = decode_b64(arg)
if self.credential_validator and self.credential_validator.validate(self.username, self.password):
self.authenticated = True
self.push('235 Authentication successful.')
else:
self.push('454 Temporary authentication failure.')
raise ExitNow()
# This code is taken directly from the underlying smtpd.SMTPChannel
# support for AUTH is added.
def found_terminator(self):
line = EMPTYSTRING.join(self.__line)
if self.debug:
self.logger.info('found_terminator(): data: %s' % repr(line))
self.__line = []
if self.__state == self.COMMAND:
if not line:
self.push('500 Error: bad syntax')
return
method = None
i = line.find(' ')
if self.authenticating:
# If we are in an authenticating state, call the
# method smtp_AUTH.
arg = line.strip()
command = 'AUTH'
elif i < 0:
command = line.upper()
arg = None
else:
command = line[:i].upper()
arg = line[i+1:].strip()
# White list of operations that are allowed prior to AUTH.
if not command in ['AUTH', 'EHLO', 'HELO', 'NOOP', 'RSET', 'QUIT']:
if self.require_authentication and not self.authenticated:
self.push('530 Authentication required')
return
method = getattr(self, 'smtp_' + command, None)
if not method:
self.push('502 Error: command "%s" not implemented' % command)
return
method(arg)
return
else:
if self.__state != self.DATA:
self.push('451 Internal confusion')
return
# Remove extraneous carriage returns and de-transparency according
# to RFC 821, Section 4.5.2.
data = []
for text in line.split('\r\n'):
if text and text[0] == '.':
data.append(text[1:])
else:
data.append(text)
self.__data = NEWLINE.join(data)
status = self.__server.process_message(
self.__peer,
self.__mailfrom,
self.__rcpttos,
self.__data
)
self.__rcpttos = []
self.__mailfrom = None
self.__state = self.COMMAND
self.set_terminator(b'\r\n')
if not status:
self.push('250 Ok')
else:
self.push(status)

View File

@ -1,93 +0,0 @@
import secure_smtpd
import ssl, smtpd, asyncore, socket, logging, signal, time, sys
from .smtp_channel import SMTPChannel
from asyncore import ExitNow
from .process_pool import ProcessPool
from ssl import SSLError
try:
from Queue import Empty
except ImportError:
# We're on python3
from queue import Empty
class SMTPServer(smtpd.SMTPServer):
def __init__(self, localaddr, remoteaddr, ssl=False, certfile=None, keyfile=None, ssl_version=ssl.PROTOCOL_SSLv23, require_authentication=False, credential_validator=None, maximum_execution_time=30, process_count=5):
smtpd.SMTPServer.__init__(self, localaddr, remoteaddr)
self.logger = logging.getLogger( secure_smtpd.LOG_NAME )
self.certfile = certfile
self.keyfile = keyfile
self.ssl_version = ssl_version
self.subprocesses = []
self.require_authentication = require_authentication
self.credential_validator = credential_validator
self.ssl = ssl
self.maximum_execution_time = maximum_execution_time
self.process_count = process_count
self.process_pool = None
def handle_accept(self):
self.process_pool = ProcessPool(self._accept_subprocess, process_count=self.process_count)
self.close()
def _accept_subprocess(self, queue):
while True:
try:
self.socket.setblocking(1)
pair = self.accept()
map = {}
if pair is not None:
self.logger.info('_accept_subprocess(): smtp connection accepted within subprocess.')
newsocket, fromaddr = pair
newsocket.settimeout(self.maximum_execution_time)
if self.ssl:
newsocket = ssl.wrap_socket(
newsocket,
server_side=True,
certfile=self.certfile,
keyfile=self.keyfile,
ssl_version=self.ssl_version,
)
channel = SMTPChannel(
self,
newsocket,
fromaddr,
require_authentication=self.require_authentication,
credential_validator=self.credential_validator,
map=map
)
self.logger.info('_accept_subprocess(): starting asyncore within subprocess.')
asyncore.loop(map=map)
self.logger.error('_accept_subprocess(): asyncore loop exited.')
except (ExitNow, SSLError):
self._shutdown_socket(newsocket)
self.logger.info('_accept_subprocess(): smtp channel terminated asyncore.')
except Exception as e:
self._shutdown_socket(newsocket)
self.logger.error('_accept_subprocess(): uncaught exception: %s' % str(e))
def _shutdown_socket(self, s):
try:
s.shutdown(socket.SHUT_RDWR)
s.close()
except Exception as e:
self.logger.error('_shutdown_socket(): failed to cleanly shutdown socket: %s' % str(e))
def run(self):
asyncore.loop()
if hasattr(signal, 'SIGTERM'):
def sig_handler(signal,frame):
self.logger.info("Got signal %s, shutting down." % signal)
sys.exit(0)
signal.signal(signal.SIGTERM, sig_handler)
while 1:
time.sleep(1)

View File

@ -1,11 +0,0 @@
class StoreCredentials(object):
def __init__(self):
self.stored = False
self.username = None
self.password = None
def validate(self, username, password):
self.stored = True
self.username = username
self.password = password
return True

View File

@ -11,6 +11,7 @@ from emails.loader.local_store import (MsgLoader, FileSystemLoader, FileNotFound
split_template_path, BaseLoader)
from emails.compat import text_type
from emails.loader.helpers import guess_charset
from emails.exc import HTTPLoaderError
ROOT = os.path.dirname(__file__)
@ -149,6 +150,9 @@ def test_external_urls():
except ConnectionError:
# Nevermind if external site does not respond
pass
except HTTPLoaderError:
# Skip if external site does responds 500
pass
def _get_loaders():

View File

@ -9,15 +9,13 @@ TO_EMAIL = 'jbrown@hotmail.tld'
FROM_EMAIL = 'robot@company.tld'
TRAVIS_CI = os.environ.get('TRAVIS')
HAS_INTERNET_CONNECTION = not TRAVIS_CI
ROOT = os.path.dirname(__file__)
def common_email_data(**kw):
T = JinjaTemplate
data = {'charset': 'utf-8',
'subject': T('[python-emails test] Olá {{name}}'),
'subject': T('Olá {{name}}'),
'mail_from': ('LÖVÅS HÅVET', FROM_EMAIL),
'mail_to': ('Pestävä erillään', TO_EMAIL),
'html': T('<h1>Olá {{name}}!</h1><p>O Lorem Ipsum é um texto modelo da indústria tipográfica e de impressão.'),

View File

@ -10,9 +10,6 @@ import emails.packages.dkim
from .helpers import common_email_data
TRAVIS_CI = os.environ.get('TRAVIS')
HAS_INTERNET_CONNECTION = not TRAVIS_CI
PRIV_KEY = b"""-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQDKHKzbg7LwpSJVfy9h8YQciVuIiexJ6OKJcCc6akJuLx+qPJGr

View File

@ -1,10 +1,12 @@
# coding: utf-8
from __future__ import unicode_literals
import time
import random
import emails
import emails.loader
from .helpers import HAS_INTERNET_CONNECTION, common_email_data
from .helpers import common_email_data
def test_send_attachment(smtp_servers):
"""
@ -13,19 +15,19 @@ def test_send_attachment(smtp_servers):
URL = 'http://lavr.github.io/python-emails/tests/campaignmonitor-samples/sample-template/images/gallery.png'
data = common_email_data(subject='Single attachment', attachments=[emails.store.LazyHTTPFile(uri=URL), ])
m = emails.html(**data)
if HAS_INTERNET_CONNECTION:
for d in smtp_servers:
d.patch_message(m)
r = m.send(smtp=d.params)
for tag, server in smtp_servers.items():
server.patch_message(m)
r = m.send(smtp=server.params)
server.sleep()
def test_send_with_render(smtp_servers):
data = common_email_data(subject='Render with name=John')
m = emails.html(**data)
if HAS_INTERNET_CONNECTION:
for d in smtp_servers:
d.patch_message(m)
r = m.send(render={'name': u'John'}, smtp=d.params)
for tag, server in smtp_servers.items():
m = emails.html(**common_email_data(subject='Render with name=John'))
server.patch_message(m)
r = m.send(render={'name': u'John'}, smtp=server.params)
server.sleep()
def test_send_with_inline_images(smtp_servers):
@ -33,7 +35,8 @@ def test_send_with_inline_images(smtp_servers):
data = common_email_data(subject='Sample html with inline images')
del data['html']
m = emails.loader.from_url(url=url, message_params=data, images_inline=True)
if HAS_INTERNET_CONNECTION:
for d in smtp_servers:
d.patch_message(m)
r = m.send(smtp=d.params)
for tag, server in smtp_servers.items():
server.patch_message(m)
r = m.send(smtp=server.params)
server.sleep()

View File

@ -1,88 +1,75 @@
# encoding: utf-8
from __future__ import unicode_literals
import os
import logging
import pytest
import emails
from emails.backend.smtp import SMTPBackend
TRAVIS_CI = os.environ.get('TRAVIS')
HAS_INTERNET_CONNECTION = not TRAVIS_CI
SAMPLE_MESSAGE = {'html': '<p>Test from python-emails',
'text': 'Test from python-emails',
'mail_from': 's@lavr.me',
'mail_to': 'sergei-nko@yandex.ru',
'subject': 'Test from python-emails'}
'subject': 'Sample message'}
def test_send_to_unknown_host():
server = SMTPBackend(host='invalid-server.invalid-domain-42.com', port=25)
server = SMTPBackend(host='invalid-server.invalid-domain-42.com', port=2525)
response = server.sendmail(to_addrs='s@lavr.me', from_addr='s@lavr.me', msg=emails.html(**SAMPLE_MESSAGE))
server.close()
assert response.status_code is None
assert response.error is not None
assert isinstance(response.error, IOError)
assert not response.success
print("response.error.errno=", response.error.errno)
if HAS_INTERNET_CONNECTION:
if not TRAVIS_CI:
# IOError: [Errno 8] nodename nor servname provided, or not known
assert response.error.errno == 8
def test_smtp_reconnect(smtp_server):
# Simulate server disconnection
# Check that SMTPBackend will reconnect
server = SMTPBackend(host=smtp_server.host, port=smtp_server.port, debug=1)
client = server.get_client()
logging.debug('simulate socket disconnect')
client.sock.close() # simulate disconnect
response = server.sendmail(to_addrs='s@lavr.me',
from_addr='s@lavr.me',
msg=emails.html(**SAMPLE_MESSAGE))
server.close()
assert response.success
print(response)
def test_smtp_send(smtp_servers):
"""
Check SMTPBackend.sendmail
"""
for tag, server in smtp_servers.items():
print("-- test_smtp_send: %s" % server)
smtp = server.params
smtp['fail_silently'] = True
response = server.patch_message(emails.html(**SAMPLE_MESSAGE)).send(smtp=server.params)
assert response.success or response.status_code == 421 # gmail sometimes fail sending
#message.smtp_pool[smtp].get_client().quit()
server.sleep()
def test_smtp_init_error(smtp_server):
def test_smtp_send_with_reconnect(smtp_servers):
"""
Check SMTPBackend.sendmail reconnect
"""
# test error when ssl and tls arguments both set
for tag, server in smtp_servers.items():
print("-- test_smtp_reconnect: %s" % server)
params = server.params
params['fail_silently'] = True
backend = SMTPBackend(**params)
backend.get_client().sock.close() # simulate disconnect
response = backend.sendmail(to_addrs=server.to_email,
from_addr=server.from_email,
msg=server.patch_message(emails.html(**SAMPLE_MESSAGE)))
assert response.success or response.status_code == 421 # gmail sometimes fail sending
server.sleep()
def test_smtp_init_error():
"""
Test error when ssl and tls arguments both set
"""
with pytest.raises(ValueError):
SMTPBackend(host=smtp_server.host,
port=smtp_server.port,
debug=1,
ssl=True,
tls=True)
SMTPBackend(host='X', port=25, ssl=True, tls=True)
def test_smtp_empty_sendmail(smtp_server):
server = SMTPBackend(host=smtp_server.host,
port=smtp_server.port,
debug=1)
response = server.sendmail(to_addrs=[], from_addr='a@b.com', msg='')
def test_smtp_empty_sendmail():
response = SMTPBackend().sendmail(to_addrs=[], from_addr='a@b.com', msg='')
assert not response
def test_smtp_dict1(smtp_server):
response = emails.html(**SAMPLE_MESSAGE).send(smtp=smtp_server.as_dict())
print(response)
assert response.status_code == 250
assert response.success
def test_smtp_dict2(smtp_server_with_auth):
response = emails.html(**SAMPLE_MESSAGE).send(smtp=smtp_server_with_auth.as_dict())
print(response)
assert response.status_code == 250
assert response.success
def test_smtp_dict2(smtp_server_with_ssl):
smtp = smtp_server_with_ssl.as_dict()
message = emails.html(**SAMPLE_MESSAGE)
response = message.send(smtp=smtp)
print(response)
assert response.status_code == 250
assert response.success
message.smtp_pool[smtp].get_client().quit()

View File

@ -0,0 +1,24 @@
# encoding: utf-8
_from = 'python-emails@lavr.me'
_mailtrap = dict(user='324263f0d84f52b2a', password='#e:lZdnZ5iUmJOcm2Wca2c=',
host='mailtrap.io', to_email='324263f0d84f52b2a@mailtrap.io')
SERVERS = {
'gmail.com-tls': dict(from_email=_from, to_email='s.lavrinenko@gmail.com',
host='alt1.gmail-smtp-in.l.google.com', port=25, tls=True),
'mx.yandex.ru': dict(from_email=_from, to_email='drlavr@yandex.ru',
host='mx.yandex.ru', port=25),
#'mailtrap.io': dict(from_email=_from, port=25, **_mailtrap),
#'mailtrap.io-tls': dict(from_email=_from, tls=True, port=465, **_mailtrap),
'outlook.com': dict(from_email=_from, to_email='lavr@outlook.com', host='mx1.hotmail.com'),
'me.com': dict(from_email=_from, to_email='s.lavrinenko@me.com', host='mx3.mail.icloud.com'),
}