smtp tests with auth and ssl added.

local smtp server updated (no more lamson)
This commit is contained in:
Sergey Lavrinenko 2015-02-26 02:56:43 +03:00
parent 3062b03a3e
commit 74bc1de650
34 changed files with 585 additions and 235 deletions

View File

@ -40,59 +40,81 @@ class TestSmtpServer:
self._process.terminate()
class TestLamsonSmtpServer:
class SecureSMTPDServer(object):
def __init__(self):
import sys
sys.path.insert(0, os.path.dirname(__file__))
import lamsondebuggingsmtpinstance
import lamsondebuggingsmtpinstance.config.settings
self.lamsondir = os.path.dirname(lamsondebuggingsmtpinstance.__file__)
settings = lamsondebuggingsmtpinstance.config.settings
self.host = settings.receiver_config['host']
self.port = settings.receiver_config['port']
self.lock = threading.Lock()
self._started = False
self._cwd = os.path.join(os.path.dirname(__file__), 'contrib/local-smtpd')
self._process = None
self.host = 'localhost'
self.user = 'A'
self.password = 'B'
self.argv = None
def _lamson_command(self, lamson_params):
r = subprocess.call("lamson {0}".format( lamson_params ), shell=True, cwd=self.lamsondir)
print("_lamson_command '{0}' return code is {1}".format(lamson_params, r))
def as_dict(self):
r = {'host': self.host, 'port': self.port, 'fail_silently': False, 'debug': 1}
argv = self.argv or []
if 'ssl' in argv:
r['ssl'] = True
if 'auth' in argv:
r.update({'user': self.user, 'password': self.password})
return r
def _start_lamson(self):
if not self._started:
self._stop_lamson() # just is case
logger.debug('stop lamson...')
return self._lamson_command('start -FORCE')
def _stop_lamson(self):
return self._lamson_command('stop')
def get_server(self):
self._start_lamson()
time.sleep(1)
def get_server(self, argv=None):
if self._process is None:
self.argv = argv or []
if 'ssl' in self.argv:
self.port = 25126
elif 'auth' in self.argv:
self.port = 25127
else:
self.port = 25125
cmd = '/bin/sh ./run.sh'.split(' ')
if argv:
cmd.extend(argv)
self._process = subprocess.Popen(cmd, shell=False, cwd=self._cwd)
logger.error('Started test smtp server "%s", pid: %s', cmd, self._process.pid)
#print('Started test smtp server "{0}", pid: {1}'.format(CMD, self._process.pid))
time.sleep(1)
return self
def stop(self):
if self._started:
logger.debug('stop lamson...')
self._start_lamson()
if self._process:
logger.error('kill process...')
self._process.terminate()
time.sleep(1)
@pytest.fixture(scope="module")
def smtp_server(request):
logger.debug('smtp_server...')
try:
import lamson
ext_server = TestLamsonSmtpServer()
except ImportError:
ext_server = TestSmtpServer()
ext_server = SecureSMTPDServer()
def fin():
print ("stopping ext_server")
ext_server.stop()
request.addfinalizer(fin)
return ext_server.get_server() #host, ext_server.port)
return ext_server.get_server()
@pytest.fixture(scope="module")
def smtp_server_with_auth(request):
logger.debug('smtp_server with auth...')
ext_server = SecureSMTPDServer()
def fin():
print ("stopping ext_server with auth")
ext_server.stop()
request.addfinalizer(fin)
return ext_server.get_server(['auth'])
@pytest.fixture(scope="module")
def smtp_server_with_ssl(request):
logger.debug('smtp_server with ssl...')
ext_server = SecureSMTPDServer()
def fin():
print ("stopping ext_server with auth")
ext_server.stop()
request.addfinalizer(fin)
return ext_server.get_server(['ssl'])
@pytest.fixture(scope='module')
def django_email_backend(request):

View File

@ -0,0 +1,17 @@
-----BEGIN CERTIFICATE-----
MIICsjCCAhugAwIBAgIJAPJ/FysSCcu+MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwIBcNMTUwMjI1MjM1NDAyWhgPMjExNTAyMDEyMzU0MDJa
MEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJ
bnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJ
AoGBAKfPd+Qa+/z1phbnV2sivOIfkd+wvroTmthnIaMDcun1DBCfyNw8byvBIwZv
Z0Fvco+zb4eBs3ZBHsiLYi9WXdU2NmYr0mcKauDOJW0lE7eOzCUP4Bq2XBgVqf6x
TXTNUaFrwZkxTA221NsVRqK3fytcXBi3a3zIvYCcvyCjNPiPAgMBAAGjgacwgaQw
HQYDVR0OBBYEFESV2wfHVrhtIwlGDQ7IXRKZpZoeMHUGA1UdIwRuMGyAFESV2wfH
VrhtIwlGDQ7IXRKZpZoeoUmkRzBFMQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29t
ZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkggkA8n8X
KxIJy74wDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOBgQCgZOIVvCdM4sY+
OKLkxWEZr2zztafE9nHV77VXYMp8u3IOo2Iz4Ygn60SxzHEpWUUo44M9/1i7vkDQ
P0o/kPhzEVj9Fnx51YPqOakB+r9e25Enti2WkGdD0TUPC5IthcQQk0dnzwQG24mp
gfhg8GIWdqmopRbcW3DWgSqh45sB8g==
-----END CERTIFICATE-----

View File

@ -0,0 +1,15 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXQIBAAKBgQCnz3fkGvv89aYW51drIrziH5HfsL66E5rYZyGjA3Lp9QwQn8jc
PG8rwSMGb2dBb3KPs2+HgbN2QR7Ii2IvVl3VNjZmK9JnCmrgziVtJRO3jswlD+Aa
tlwYFan+sU10zVGha8GZMUwNttTbFUait38rXFwYt2t8yL2AnL8gozT4jwIDAQAB
AoGAcfo/Y0ZUuxaaDdppjNIWWru4l6dzk+028h7yQMdZ6MBQxoXQpo3BsIVI5dkK
1+37cNEeQnp8yygl4W6SbLaLmehakK485s8hGD/ZgPS+CJmyjF2jr6BSzKVEjYtH
7+eMKBgZ3MjRC8Rr+QjnUZa65Pd6YVShqWcqiSjFQic4aEECQQDcNnTdPLXkaQwR
pXFuP70arH257vXUzaLi0c+I1UXgUR++KGA4yhJWZCp86Ik7R2mkHvgBdnO7KW3B
8LXbdX9vAkEAwxTpONYcMDRXSwOrAFyuQimpBoTBu0e/c3gA37tvmaaIPpK/V4pV
wMXpxgv4eTUJxf/N+ZcqTSJt8fDSIBOI4QJBANhcrO/eQXyc9Z205sEC4QL/LTxt
G54tOPgQWw8/NLuUGVMViozhhajaG6DEPGlA3fvB7bxKLKVcrBlcLuHkDQMCQQCV
EL99fK4hb31chrr+FdPaHrdXkc3va02xz/rq+vC1+fiVx9CJ9dy85v5RJQiCpbKI
J4WeuJHMSwi0HQ6TEBpBAkAPIphfofNE8lkIpJ1ocG7687zsmZDXJdzG4RToW4Un
UxHn0Mb3b39vkKIwr47W32fSyuZ7rG7YF8D1tR1B2oPN
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,37 @@
# encoding: utf-8
import logging
from secure_smtpd import SMTPServer, LOG_NAME
import sys
class SSLSMTPServer(SMTPServer):
def process_message(self, peer, mailfrom, rcpttos, message_data):
print(message_data)
with open('secure-smtpd.log', 'a') as f:
f.write(message_data)
f.write('\n\n')
class MyCredentialValidator(object):
def validate(self, username, password):
if username == 'A' and password == 'B':
return True
return False
logger = logging.getLogger(LOG_NAME)
logger.setLevel(logging.INFO)
params = {}
port = 25125
if 'auth' in sys.argv:
params.update({'require_authentication': True, 'credential_validator': MyCredentialValidator()})
port = 25127
if 'ssl' in sys.argv:
params.update({'ssl': True, 'certfile': 'example.crt', 'keyfile': 'example.key'})
port = 25126
if 'timeout':
params.update({'maximum_execution_time': 10.0})
server = SSLSMTPServer(('127.0.0.1', port), None, **params)
server.run()

View File

@ -0,0 +1,9 @@
#!/bin/sh
# Looks like python3 has a bug in smtpd (SMTPSenderRefused: (503, 'Error: send HELO first'...)
# So we try to start python2
PYTHON=python2.7
# next line doesn't works actually. TODO: fix it
(which python2.7 && export PYTHON=python2.7) || (which python2.6 && export PYTHON=python2.6) || export PYTHON=python
echo "$PYTHON run.py $@"
$PYTHON run.py $@

View File

@ -0,0 +1,15 @@
Copyright (c) 2014, Benjamin Coe <bencoe@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any purpose
with or without fee is hereby granted, provided that the above copyright notice
and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.

View File

@ -0,0 +1,4 @@
from . import config
from .config import LOG_NAME
from .smtp_server import SMTPServer
from .proxy_server import ProxyServer

View File

@ -0,0 +1,2 @@
from . import log
from .log import LOG_NAME

View File

@ -0,0 +1,33 @@
import logging, sys
from logging.handlers import RotatingFileHandler
from logging import StreamHandler
LOG_NAME = 'secure-smtpd'
class Log(object):
def __init__(self, log_name):
self.log_name = log_name
self.logger = logging.getLogger( self.log_name )
self._remove_handlers()
self._add_handler()
self.logger.setLevel(logging.DEBUG)
def _remove_handlers(self):
for handler in self.logger.handlers:
self.logger.removeHandler(handler)
def _add_handler(self):
try:
handler = RotatingFileHandler(
'/var/log/%s.log' % self.log_name,
maxBytes=10485760,
backupCount=3
)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
self.logger.addHandler(handler)
except IOError:
self.logger.addHandler(StreamHandler(sys.stderr))
Log(LOG_NAME)

View File

@ -0,0 +1,16 @@
import time
from multiprocessing import Process, Queue
class ProcessPool(object):
def __init__(self, func, process_count=5):
self.func = func
self.process_count = process_count
self.queue = Queue()
self._create_processes()
def _create_processes(self):
for i in range(0, self.process_count):
process = Process(target=self.func, args=[self.queue])
process.daemon = True
process.start()

View File

@ -0,0 +1,95 @@
import socket
import smtplib
import secure_smtpd
from .smtp_server import SMTPServer
from .store_credentials import StoreCredentials
class ProxyServer(SMTPServer):
"""Implements an open relay. Inherits from secure_smtpd, so can handle
SSL incoming. Modifies attributes slightly:
* if "ssl" is true accepts SSL connections inbound and connects via SSL
outbound
* adds "ssl_out_only", which can be set to True when "ssl" is False so that
inbound connections are in plain text but outbound are in SSL
* adds "debug", which if True copies all inbound messages to logger.info()
* ignores any credential validators, passing any credentials upstream
"""
def __init__(self, *args, **kwargs):
self.ssl_out_only = False
if 'ssl_out_only' in kwargs:
self.ssl_out_only = kwargs.pop('ssl_out_only')
self.debug = False
if 'debug' in kwargs:
self.debug = kwargs.pop('debug')
kwargs['credential_validator'] = StoreCredentials()
SMTPServer.__init__(self, *args, **kwargs)
def process_message(self, peer, mailfrom, rcpttos, data):
if self.debug:
# ------------------------
# stolen directly from stmpd.DebuggingServer
inheaders = 1
lines = data.split('\n')
self.logger.info('---------- MESSAGE FOLLOWS ----------')
for line in lines:
# headers first
if inheaders and not line:
self.logger.info('X-Peer: %s', peer[0])
inheaders = 0
self.logger.info(line)
self.logger.info('------------ END MESSAGE ------------')
# ------------------------
# following code is direct from smtpd.PureProxy
lines = data.split('\n')
# Look for the last header
i = 0
for line in lines:
if not line:
break
i += 1
lines.insert(i, 'X-Peer: %s' % peer[0])
data = '\n'.join(lines)
self._deliver(mailfrom, rcpttos, data)
def _deliver(self, mailfrom, rcpttos, data):
# ------------------------
# following code is adapted from smtpd.PureProxy with modifications to
# handle upstream SSL
refused = {}
try:
if self.ssl or self.ssl_out_only:
s = smtplib.SMTP_SSL()
else:
s = smtplib.SMTP()
s.connect(self._remoteaddr[0], self._remoteaddr[1])
if self.credential_validator.stored:
# we had credentials passed in, use them
s.login(
self.credential_validator.username,
self.credential_validator.password
)
try:
refused = s.sendmail(mailfrom, rcpttos, data)
if refused != {}:
self.logger.error('some connections refused %s', refused)
finally:
s.quit()
except smtplib.SMTPRecipientsRefused as e:
self.logger.exception('')
refused = e.recipients
except (socket.error, smtplib.SMTPException) as e:
self.logger.exception('')
# All recipients were refused. If the exception had an associated
# error code, use it. Otherwise,fake it with a non-triggering
# exception code.
errcode = getattr(e, 'smtp_code', -1)
errmsg = getattr(e, 'smtp_error', 'ignore')
for r in rcpttos:
refused[r] = (errcode, errmsg)
return refused

View File

@ -0,0 +1,161 @@
import secure_smtpd
import smtpd, base64, secure_smtpd, asynchat, logging
from asyncore import ExitNow
from smtpd import NEWLINE, EMPTYSTRING
def decode_b64(data):
'''Wrapper for b64decode, without having to struggle with bytestrings.'''
byte_string = data.encode('utf-8')
decoded = base64.b64decode(byte_string)
return decoded.decode('utf-8')
def encode_b64(data):
'''Wrapper for b64encode, without having to struggle with bytestrings.'''
byte_string = data.encode('utf-8')
encoded = base64.b64encode(byte_string)
return encoded.decode('utf-8')
class SMTPChannel(smtpd.SMTPChannel):
def __init__(self, smtp_server, newsocket, fromaddr, require_authentication=False, credential_validator=None, map=None):
smtpd.SMTPChannel.__init__(self, smtp_server, newsocket, fromaddr)
asynchat.async_chat.__init__(self, newsocket, map=map)
self.require_authentication = require_authentication
self.authenticating = False
self.authenticated = False
self.username = None
self.password = None
self.credential_validator = credential_validator
self.logger = logging.getLogger( secure_smtpd.LOG_NAME )
def smtp_QUIT(self, arg):
self.push('221 Bye')
self.close_when_done()
raise ExitNow()
def collect_incoming_data(self, data):
if not isinstance(data, str):
# We're on python3, so we have to decode the bytestring
data = data.decode('utf-8')
self.__line.append(data)
def smtp_EHLO(self, arg):
if not arg:
self.push('501 Syntax: HELO hostname')
return
if self.__greeting:
self.push('503 Duplicate HELO/EHLO')
else:
self.push('250-%s Hello %s' % (self.__fqdn, arg))
self.push('250-AUTH LOGIN PLAIN')
self.push('250 EHLO')
def smtp_AUTH(self, arg):
if 'PLAIN' in arg:
split_args = arg.split(' ')
# second arg is Base64-encoded string of blah\0username\0password
authbits = decode_b64(split_args[1]).split('\0')
self.username = authbits[1]
self.password = authbits[2]
if self.credential_validator and self.credential_validator.validate(self.username, self.password):
self.authenticated = True
self.push('235 Authentication successful.')
else:
self.push('454 Temporary authentication failure.')
raise ExitNow()
elif 'LOGIN' in arg:
self.authenticating = True
split_args = arg.split(' ')
# Some implmentations of 'LOGIN' seem to provide the username
# along with the 'LOGIN' stanza, hence both situations are
# handled.
if len(split_args) == 2:
self.username = decode_b64(arg.split(' ')[1])
self.push('334 ' + encode_b64('Username'))
else:
self.push('334 ' + encode_b64('Username'))
elif not self.username:
self.username = decode_b64(arg)
self.push('334 ' + encode_b64('Password'))
else:
self.authenticating = False
self.password = decode_b64(arg)
if self.credential_validator and self.credential_validator.validate(self.username, self.password):
self.authenticated = True
self.push('235 Authentication successful.')
else:
self.push('454 Temporary authentication failure.')
raise ExitNow()
# This code is taken directly from the underlying smtpd.SMTPChannel
# support for AUTH is added.
def found_terminator(self):
line = EMPTYSTRING.join(self.__line)
if self.debug:
self.logger.info('found_terminator(): data: %s' % repr(line))
self.__line = []
if self.__state == self.COMMAND:
if not line:
self.push('500 Error: bad syntax')
return
method = None
i = line.find(' ')
if self.authenticating:
# If we are in an authenticating state, call the
# method smtp_AUTH.
arg = line.strip()
command = 'AUTH'
elif i < 0:
command = line.upper()
arg = None
else:
command = line[:i].upper()
arg = line[i+1:].strip()
# White list of operations that are allowed prior to AUTH.
if not command in ['AUTH', 'EHLO', 'HELO', 'NOOP', 'RSET', 'QUIT']:
if self.require_authentication and not self.authenticated:
self.push('530 Authentication required')
return
method = getattr(self, 'smtp_' + command, None)
if not method:
self.push('502 Error: command "%s" not implemented' % command)
return
method(arg)
return
else:
if self.__state != self.DATA:
self.push('451 Internal confusion')
return
# Remove extraneous carriage returns and de-transparency according
# to RFC 821, Section 4.5.2.
data = []
for text in line.split('\r\n'):
if text and text[0] == '.':
data.append(text[1:])
else:
data.append(text)
self.__data = NEWLINE.join(data)
status = self.__server.process_message(
self.__peer,
self.__mailfrom,
self.__rcpttos,
self.__data
)
self.__rcpttos = []
self.__mailfrom = None
self.__state = self.COMMAND
self.set_terminator(b'\r\n')
if not status:
self.push('250 Ok')
else:
self.push(status)

View File

@ -0,0 +1,93 @@
import secure_smtpd
import ssl, smtpd, asyncore, socket, logging, signal, time, sys
from .smtp_channel import SMTPChannel
from asyncore import ExitNow
from .process_pool import ProcessPool
from ssl import SSLError
try:
from Queue import Empty
except ImportError:
# We're on python3
from queue import Empty
class SMTPServer(smtpd.SMTPServer):
def __init__(self, localaddr, remoteaddr, ssl=False, certfile=None, keyfile=None, ssl_version=ssl.PROTOCOL_SSLv23, require_authentication=False, credential_validator=None, maximum_execution_time=30, process_count=5):
smtpd.SMTPServer.__init__(self, localaddr, remoteaddr)
self.logger = logging.getLogger( secure_smtpd.LOG_NAME )
self.certfile = certfile
self.keyfile = keyfile
self.ssl_version = ssl_version
self.subprocesses = []
self.require_authentication = require_authentication
self.credential_validator = credential_validator
self.ssl = ssl
self.maximum_execution_time = maximum_execution_time
self.process_count = process_count
self.process_pool = None
def handle_accept(self):
self.process_pool = ProcessPool(self._accept_subprocess, process_count=self.process_count)
self.close()
def _accept_subprocess(self, queue):
while True:
try:
self.socket.setblocking(1)
pair = self.accept()
map = {}
if pair is not None:
self.logger.info('_accept_subprocess(): smtp connection accepted within subprocess.')
newsocket, fromaddr = pair
newsocket.settimeout(self.maximum_execution_time)
if self.ssl:
newsocket = ssl.wrap_socket(
newsocket,
server_side=True,
certfile=self.certfile,
keyfile=self.keyfile,
ssl_version=self.ssl_version,
)
channel = SMTPChannel(
self,
newsocket,
fromaddr,
require_authentication=self.require_authentication,
credential_validator=self.credential_validator,
map=map
)
self.logger.info('_accept_subprocess(): starting asyncore within subprocess.')
asyncore.loop(map=map)
self.logger.error('_accept_subprocess(): asyncore loop exited.')
except (ExitNow, SSLError):
self._shutdown_socket(newsocket)
self.logger.info('_accept_subprocess(): smtp channel terminated asyncore.')
except Exception as e:
self._shutdown_socket(newsocket)
self.logger.error('_accept_subprocess(): uncaught exception: %s' % str(e))
def _shutdown_socket(self, s):
try:
s.shutdown(socket.SHUT_RDWR)
s.close()
except Exception as e:
self.logger.error('_shutdown_socket(): failed to cleanly shutdown socket: %s' % str(e))
def run(self):
asyncore.loop()
if hasattr(signal, 'SIGTERM'):
def sig_handler(signal,frame):
self.logger.info("Got signal %s, shutting down." % signal)
sys.exit(0)
signal.signal(signal.SIGTERM, sig_handler)
while 1:
time.sleep(1)

View File

@ -0,0 +1,11 @@
class StoreCredentials(object):
def __init__(self):
self.stored = False
self.username = None
self.password = None
def validate(self, username, password):
self.stored = True
self.username = username
self.password = password
return True

View File

@ -1,2 +0,0 @@
This is simple Lamson application for python-emails testsuite.
No any additional logic, just a copy of example from Quick Start.

View File

@ -1,24 +0,0 @@
from lamson.routing import route, route_like, stateless
@route("(address)@(host)", address=".+")
def START(message, address=None, host=None):
return NEW_USER
@route_like(START)
def NEW_USER(message, address=None, host=None):
return NEW_USER
@route_like(START)
def END(message, address=None, host=None):
return NEW_USER(message, address, host)
@route_like(START)
@stateless
def FORWARD(message, address=None, host=None):
#relay.deliver(message)
pass

View File

@ -1,27 +0,0 @@
from config import settings
from lamson.routing import Router
from lamson.server import Relay, SMTPReceiver
from lamson import view, queue
import logging
import logging.config
import jinja2
logging.config.fileConfig("config/logging.conf")
# the relay host to actually send the final message to
settings.relay = Relay(host=settings.relay_config['host'],
port=settings.relay_config['port'], debug=1)
# where to listen for incoming messages
settings.receiver = SMTPReceiver(settings.receiver_config['host'],
settings.receiver_config['port'])
Router.defaults(**settings.router_defaults)
Router.load(settings.handlers)
Router.RELOAD=True
Router.UNDELIVERABLE_QUEUE=queue.Queue("run/undeliverable")
view.LOADER = jinja2.Environment(
loader=jinja2.PackageLoader(settings.template_config['dir'],
settings.template_config['module']))

View File

@ -1,31 +0,0 @@
[loggers]
keys=root,routing
[handlers]
keys=fileHandler
[formatters]
keys=defaultFormatter
[logger_root]
level=DEBUG
handlers=fileHandler
[logger_routing]
level=DEBUG
handlers=fileHandler
qualname=routing
propagate=0
[handler_fileHandler]
# this works using FileHandler
class=FileHandler
# If you have Python2.6 you can use this and it will work when you use logrotate
#class=WatchedFileHandler
level=DEBUG
formatter=defaultFormatter
args=("logs/lamson.log",)
[formatter_defaultFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt=

View File

@ -1,17 +0,0 @@
# This file contains python variables that configure Lamson for email processing.
import logging
# You may add additional parameters such as `username' and `password' if your
# relay server requires authentication, `starttls' (boolean) or `ssl' (boolean)
# for secure connections.
relay_config = {'host': 'localhost', 'port': 8825}
receiver_config = {'host': 'localhost', 'port': 8823}
handlers = ['app.handlers.sample']
router_defaults = {'host': '.+'}
template_config = {'dir': 'app', 'module': 'templates'}
# the config/boot.py will turn these values into variables set in settings

View File

@ -1,34 +0,0 @@
[loggers]
keys=root,routing
[handlers]
keys=stdoutHandler,stderrHandler
[formatters]
keys=defaultFormatter
[logger_root]
level=DEBUG
handlers=stdoutHandler
[logger_routing]
level=DEBUG
handlers=stderrHandler
qualname=routing
propagate=0
[handler_stdoutHandler]
class=StreamHandler
level=DEBUG
formatter=defaultFormatter
args=(sys.stdout,)
[handler_stderrHandler]
class=StreamHandler
level=DEBUG
formatter=defaultFormatter
args=(sys.stderr,)
[formatter_defaultFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt=

View File

@ -1,35 +0,0 @@
import logging
import logging.config
from lamson import view
from lamson.routing import Router
from lamson.server import Relay
import jinja2
from config import settings
logging.config.fileConfig("config/test_logging.conf")
# the relay host to actually send the final message to (set debug=1 to see what
# the relay is saying to the log server).
settings.relay = Relay(host=settings.relay_config['host'],
port=settings.relay_config['port'], debug=0)
settings.receiver = None
Router.defaults(**settings.router_defaults)
Router.load(settings.handlers)
Router.RELOAD=True
Router.LOG_EXCEPTIONS=False
view.LOADER = jinja2.Environment(
loader=jinja2.PackageLoader(settings.template_config['dir'],
settings.template_config['module']))
# if you have pyenchant and enchant installed then the template tests will do
# spell checking for you, but you need to tell pyenchant where to find itself
# if 'PYENCHANT_LIBRARY_PATH' not in os.environ:
# os.environ['PYENCHANT_LIBRARY_PATH'] = '/opt/local/lib/libenchant.dylib'

View File

@ -1 +0,0 @@
Do not remove this folder

View File

@ -22,46 +22,39 @@ def test_send_to_unknow_host():
assert response.error.errno==8
SAMPLE_MESSAGE = {'html': '<p>Test from python-emails',
'mail_from': 's@lavr.me',
'mail_to': 'sergei-nko@yandex.ru',
'subject': 'Test from python-emails'}
def test_smtp_reconnect(smtp_server):
# Simulate server disconnection
# Check that SMTPBackend will reconnect
message_params = {'html':'<p>Test from python-emails',
'mail_from': 's@lavr.me',
'mail_to': 'sergei-nko@yandex.ru',
'subject': 'Test from python-emails'}
server = SMTPBackend(host=smtp_server.host, port=smtp_server.port, debug=1)
server.open()
logging.debug('simulate socket disconnect')
server.connection.sock.close() # simulate disconnect
response = server.sendmail(to_addrs='s@lavr.me',
from_addr='s@lavr.me',
msg=emails.html(**message_params) )
msg=emails.html(**SAMPLE_MESSAGE))
print(response)
def test_smtp_dict(smtp_server):
message_params = {'html':'<p>Test from python-emails',
'mail_from': 's@lavr.me',
'mail_to': 'sergei-nko@yandex.ru',
'subject': 'Test from python-emails'}
response = emails.html(**message_params).send( smtp={'host':smtp_server.host, 'port':smtp_server.port} )
def test_smtp_dict1(smtp_server):
response = emails.html(**SAMPLE_MESSAGE).send(smtp=smtp_server.as_dict())
print(response)
assert response.status_code == 250
def test_smtp_dict2(smtp_server_with_auth):
response = emails.html(**SAMPLE_MESSAGE).send(smtp=smtp_server_with_auth.as_dict())
print(response)
assert response.status_code == 250
if __name__=="__main__":
import sys
import logging
sys.path.insert(0, '..')
logging.basicConfig(level=logging.DEBUG)
test_send_to_unknow_host()
from conftest import TestLamsonSmtpServer
smtp_server = TestLamsonSmtpServer().get_server()
test_smtp_reconnect(smtp_server)
def test_smtp_dict2(smtp_server_with_ssl):
response = emails.html(**SAMPLE_MESSAGE).send(smtp=smtp_server_with_ssl.as_dict())
print(response)
assert response.status_code == 250

View File

@ -2,5 +2,4 @@
--requirement=tests-base.txt
django==1.6
lamson
ordereddict

View File

@ -1,5 +1,4 @@
--requirement=base.txt
--requirement=tests-base.txt
django
lamson
django