commit
7cfd7321ba
|
@ -190,7 +190,6 @@ TODO
|
|||
- Increase test coverage
|
||||
- Feature: load message from rfc2822
|
||||
- Feature: export message to directory or zipfile
|
||||
- Performance: Patch pydkim for performance: i.e. preload key only once
|
||||
- Distribution: deb package (`debianization example <https://github.com/lavr/python-emails-debian/>`_)
|
||||
- Distribution: rpm package
|
||||
- Other: Flask extension
|
||||
|
|
|
@ -67,10 +67,9 @@ def _from_filebased_source(store, index_file=None, message_params=None, **kwargs
|
|||
try:
|
||||
index_file_name = store.find_index_file(index_file)
|
||||
except FileNotFound:
|
||||
# reraise another exception
|
||||
raise IndexFileNotFound('html file not found')
|
||||
|
||||
dirname, _ = os.path.split(index_file_name)
|
||||
dirname, index_file_name = os.path.split(index_file_name)
|
||||
if dirname:
|
||||
store.base_path = dirname
|
||||
|
||||
|
@ -81,35 +80,25 @@ def _from_filebased_source(store, index_file=None, message_params=None, **kwargs
|
|||
**kwargs)
|
||||
|
||||
|
||||
def from_directory(directory, **kwargs):
|
||||
return _from_filebased_source(store=local_store.FileSystemLoader(searchpath=directory), **kwargs)
|
||||
def from_directory(directory, loader_cls=None, **kwargs):
|
||||
loader_cls = loader_cls or local_store.FileSystemLoader
|
||||
return _from_filebased_source(store=loader_cls(searchpath=directory), **kwargs)
|
||||
|
||||
|
||||
def from_file(filename, **kwargs):
|
||||
return from_directory(directory=os.path.dirname(filename), index_file=os.path.basename(filename), **kwargs)
|
||||
|
||||
|
||||
def from_zip(zip_file, **kwargs):
|
||||
return _from_filebased_source(store=local_store.ZipLoader(file=zip_file), **kwargs)
|
||||
def from_zip(zip_file, loader_cls=None, **kwargs):
|
||||
loader_cls = loader_cls or local_store.ZipLoader
|
||||
return _from_filebased_source(store=loader_cls(file=zip_file), **kwargs)
|
||||
|
||||
|
||||
def from_rfc822(msg, message_params=None, **kw):
|
||||
|
||||
# Warning: from_rfc822 is for demo purposes only
|
||||
# TODO: Implement attachment loading
|
||||
|
||||
store = local_store.MsgLoader(msg=msg)
|
||||
text = store['__index.txt']
|
||||
html = store['__index.html']
|
||||
|
||||
loader = local_store.MsgLoader(msg=msg)
|
||||
message_params = message_params or {}
|
||||
message = Message(html=html, text=text, **message_params)
|
||||
if html:
|
||||
message.create_transformer(local_loader=store, **kw)
|
||||
message.transformer.load_and_transform()
|
||||
message.transformer.save()
|
||||
else:
|
||||
# TODO: add attachments for text-only message
|
||||
pass
|
||||
|
||||
message = Message(html=loader.html, text=loader.text, **message_params)
|
||||
for att in loader.attachments:
|
||||
message.attachments.add(att)
|
||||
return message
|
|
@ -8,7 +8,7 @@ import errno
|
|||
from zipfile import ZipFile
|
||||
import email
|
||||
|
||||
from emails.compat import to_unicode, string_types
|
||||
from emails.compat import to_unicode, string_types, to_native
|
||||
from emails.loader.helpers import guess_html_charset, decode_text
|
||||
|
||||
|
||||
|
@ -145,11 +145,10 @@ class FileSystemLoader(BaseLoader):
|
|||
|
||||
|
||||
class ZipLoader(BaseLoader):
|
||||
def __init__(self, file, encoding='utf-8', base_path=None, guess_encoding=True):
|
||||
def __init__(self, file, encoding='utf-8', base_path=None):
|
||||
self.zipfile = ZipFile(file, 'r')
|
||||
self.encoding = encoding
|
||||
self.base_path = base_path
|
||||
self.guess_encoding = guess_encoding
|
||||
self._filenames = None
|
||||
|
||||
def _decode_zip_filename(self, name):
|
||||
|
@ -189,6 +188,7 @@ class ZipLoader(BaseLoader):
|
|||
return sorted(self._filenames)
|
||||
|
||||
|
||||
|
||||
class MsgLoader(BaseLoader):
|
||||
"""
|
||||
Load files from email.Message
|
||||
|
@ -201,13 +201,16 @@ class MsgLoader(BaseLoader):
|
|||
def __init__(self, msg, base_path=None):
|
||||
if isinstance(msg, string_types):
|
||||
self.msg = email.message_from_string(msg)
|
||||
elif isinstance(msg, bytes):
|
||||
self.msg = email.message_from_string(to_native(msg))
|
||||
else:
|
||||
self.msg = msg
|
||||
self.base_path = base_path
|
||||
self._html_files = []
|
||||
self._text_files = []
|
||||
self._html_parts = []
|
||||
self._text_parts = []
|
||||
self._files = {}
|
||||
|
||||
self._content_ids = {}
|
||||
self._parsed = False
|
||||
|
||||
def decode_text(self, text, charset=None):
|
||||
if charset:
|
||||
|
@ -233,39 +236,37 @@ class MsgLoader(BaseLoader):
|
|||
return self.decode_text(part.get_payload(decode=True), charset=part.get_param('charset'))[0]
|
||||
|
||||
def add_html_part(self, part):
|
||||
name = '__index.html'
|
||||
self._files[name] = {'data': self.extract_part_text(part),
|
||||
'filename': name,
|
||||
'content_type': part.get_content_type()}
|
||||
self._html_parts.append({'data': self.extract_part_text(part),
|
||||
'content_type': part.get_content_type()})
|
||||
|
||||
def add_text_part(self, part):
|
||||
name = '__index.txt'
|
||||
self._files[name] = {'data': self.extract_part_text(part),
|
||||
'filename': name,
|
||||
'content_type': part.get_content_type()}
|
||||
self._text_parts.append({'data': self.extract_part_text(part),
|
||||
'content_type': part.get_content_type()})
|
||||
|
||||
def add_another_part(self, part):
|
||||
def add_attachment_part(self, part):
|
||||
counter = 1
|
||||
f = {}
|
||||
|
||||
filename = part.get_filename()
|
||||
if not filename:
|
||||
ext = mimetypes.guess_extension(part.get_content_type())
|
||||
if not ext:
|
||||
# Use a generic bag-of-bits extension
|
||||
ext = '.bin'
|
||||
filename = 'part-%03d%s' % (counter, ext)
|
||||
counter += 1
|
||||
f['filename'] = filename
|
||||
f['content_type'] = part.get_content_type()
|
||||
|
||||
content_id = part['Content-ID']
|
||||
if content_id:
|
||||
f['filename'] = self.clean_content_id(content_id)
|
||||
f['content_id'] = self.clean_content_id(content_id)
|
||||
f['inline'] = True
|
||||
else:
|
||||
filename = part.get_filename()
|
||||
if not filename:
|
||||
ext = mimetypes.guess_extension(part.get_content_type())
|
||||
if not ext:
|
||||
# Use a generic bag-of-bits extension
|
||||
ext = '.bin'
|
||||
filename = 'part-%03d%s' % (counter, ext)
|
||||
counter += 1
|
||||
f['filename'] = filename
|
||||
f['content_type'] = part.get_content_type()
|
||||
self._content_ids[f['content_id']] = f['filename']
|
||||
f['data'] = part.get_payload(decode=True)
|
||||
self._files[f['filename']] = f
|
||||
|
||||
def _parse_msg(self):
|
||||
def _parse(self):
|
||||
for part in self.msg.walk():
|
||||
content_type = part.get_content_type()
|
||||
|
||||
|
@ -280,14 +281,40 @@ class MsgLoader(BaseLoader):
|
|||
self.add_text_part(part)
|
||||
continue
|
||||
|
||||
self.add_another_part(part)
|
||||
self.add_attachment_part(part)
|
||||
|
||||
def parse(self):
|
||||
if not self._parsed:
|
||||
self._parse()
|
||||
self._parsed = True
|
||||
|
||||
def get_file(self, name):
|
||||
self._parse_msg()
|
||||
#print("MsgLoader.get_file", name)
|
||||
self.parse()
|
||||
if name.startswith('cid:'):
|
||||
name = self._content_ids.get(name[4:])
|
||||
f = self._files.get(name)
|
||||
if f:
|
||||
return f['data'], name
|
||||
raise FileNotFound(name)
|
||||
|
||||
def list_files(self):
|
||||
return self._files
|
||||
self.parse()
|
||||
return self._files
|
||||
|
||||
@property
|
||||
def attachments(self):
|
||||
self.parse()
|
||||
return self._files.values()
|
||||
|
||||
@property
|
||||
def html(self):
|
||||
self.parse()
|
||||
return self._html_parts and self._html_parts[0]['data'] or None
|
||||
|
||||
@property
|
||||
def text(self):
|
||||
self.parse()
|
||||
return self._text_parts and self._text_parts[0]['data'] or None
|
||||
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ from functools import wraps
|
|||
from dateutil.parser import parse as dateutil_parse
|
||||
from email.header import Header
|
||||
from email.utils import formatdate, getaddresses
|
||||
from emails.compat import string_types, to_unicode, is_callable, to_bytes
|
||||
from emails.compat import string_types, to_unicode, is_callable, to_bytes, to_native
|
||||
from .utils import (SafeMIMEText, SafeMIMEMultipart, sanitize_address,
|
||||
parse_name_and_email, load_email_charsets,
|
||||
encode_header as encode_header_)
|
||||
|
@ -404,15 +404,22 @@ class MessageDKIMMixin(object):
|
|||
return message_string
|
||||
|
||||
def as_message(self, message_cls=None):
|
||||
msg = self.dkim_sign_message(self._build_message(message_cls=message_cls))
|
||||
return msg
|
||||
return self.dkim_sign_message(self._build_message(message_cls=message_cls))
|
||||
|
||||
message = as_message
|
||||
|
||||
def as_string(self, message_cls=None):
|
||||
# self.as_string() is not equialent self.message().as_string()
|
||||
# self.as_string() needs one less message-to-string conversions for dkim
|
||||
return self.dkim_sign_string(self._build_message(message_cls=message_cls).as_string())
|
||||
"""
|
||||
Returns message as string.
|
||||
|
||||
Note: this method costs one less message-to-string conversions
|
||||
for dkim in compare to self.as_message().as_string()
|
||||
|
||||
Changes:
|
||||
v0.4.2: now returns bytes, not native string
|
||||
"""
|
||||
|
||||
return self.dkim_sign_string(to_bytes(self._build_message(message_cls=message_cls).as_string()))
|
||||
|
||||
|
||||
class Message(MessageSendMixin, MessageTransformerMixin, MessageDKIMMixin, BaseMessage):
|
||||
|
|
|
@ -401,10 +401,15 @@ class DKIM(object):
|
|||
#: formed.
|
||||
def sign(self, selector, domain, privkey, identity=None,
|
||||
canonicalize=(b'relaxed',b'simple'), include_headers=None, length=False):
|
||||
try:
|
||||
pk = parse_pem_private_key(privkey)
|
||||
except UnparsableKeyError as e:
|
||||
raise KeyFormatError(str(e))
|
||||
|
||||
if isinstance(privkey, dict):
|
||||
# Dirty patch by github:lavr to use pre-compiled key
|
||||
pk = privkey
|
||||
else:
|
||||
try:
|
||||
pk = parse_pem_private_key(privkey)
|
||||
except UnparsableKeyError as e:
|
||||
raise KeyFormatError(str(e))
|
||||
|
||||
if identity is not None and not identity.endswith(domain):
|
||||
raise ParameterError("identity must end with domain")
|
||||
|
|
|
@ -16,7 +16,7 @@ from emails.compat import to_bytes, to_native
|
|||
|
||||
class DKIMSigner:
|
||||
|
||||
def __init__(self, selector, domain, privkey, ignore_sign_errors=True, **kwargs):
|
||||
def __init__(self, selector, domain, privkey, ignore_sign_errors=False, **kwargs):
|
||||
|
||||
self.ignore_sign_errors = ignore_sign_errors
|
||||
self._sign_params = kwargs
|
||||
|
@ -24,10 +24,9 @@ class DKIMSigner:
|
|||
if privkey and hasattr(privkey, 'read'):
|
||||
privkey = privkey.read()
|
||||
|
||||
privkey = to_bytes(privkey)
|
||||
# Check private key
|
||||
# Compile private key
|
||||
try:
|
||||
parse_pem_private_key(privkey)
|
||||
privkey = parse_pem_private_key(to_bytes(privkey))
|
||||
except UnparsableKeyError as exc:
|
||||
raise DKIMException(exc)
|
||||
|
||||
|
@ -41,7 +40,7 @@ class DKIMSigner:
|
|||
# pydkim module parses message and privkey on each signing
|
||||
# this is not optimal for mass operations
|
||||
# TODO: patch pydkim or use another signing module
|
||||
return to_native(dkim.sign(message=message, **self._sign_params))
|
||||
return dkim.sign(message=message, **self._sign_params)
|
||||
except DKIMException:
|
||||
if self.ignore_sign_errors:
|
||||
logging.exception('Error signing message')
|
||||
|
@ -50,10 +49,12 @@ class DKIMSigner:
|
|||
|
||||
def get_sign_header(self, message):
|
||||
# pydkim returns string, so we should split
|
||||
(header, value) = self.get_sign_string(message).split(': ', 1)
|
||||
if value.endswith("\r\n"):
|
||||
value = value[:-2]
|
||||
return header, value
|
||||
s = self.get_sign_string(message)
|
||||
if s:
|
||||
(header, value) = to_native(s).split(': ', 1)
|
||||
if value.endswith("\r\n"):
|
||||
value = value[:-2]
|
||||
return header, value
|
||||
|
||||
def sign_message(self, msg):
|
||||
"""
|
||||
|
@ -68,4 +69,5 @@ class DKIMSigner:
|
|||
"""
|
||||
Insert DKIM header to message string
|
||||
"""
|
||||
return self.get_sign_string(to_bytes(message_string)) + message_string
|
||||
s = self.get_sign_string(to_bytes(message_string))
|
||||
return s and s + message_string or message_string
|
||||
|
|
|
@ -35,8 +35,6 @@ class BaseFile(object):
|
|||
Store base "attachment-file" information.
|
||||
"""
|
||||
|
||||
content_id_suffix = '@python.emails'
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""
|
||||
uri and filename are connected properties.
|
||||
|
@ -49,6 +47,7 @@ class BaseFile(object):
|
|||
self.data = kwargs.get('data', None)
|
||||
self._mime_type = kwargs.get('mime_type')
|
||||
self._headers = kwargs.get('headers')
|
||||
self._content_id = kwargs.get('content_id')
|
||||
self._content_disposition = kwargs.get('content_disposition', 'attachment')
|
||||
self.subtype = kwargs.get('subtype')
|
||||
self.local_loader = kwargs.get('local_loader')
|
||||
|
@ -136,14 +135,9 @@ class BaseFile(object):
|
|||
|
||||
@property
|
||||
def content_id(self):
|
||||
return "{0}{1}".format(self.filename, self.content_id_suffix)
|
||||
|
||||
@classmethod
|
||||
def parse_content_id(cls, content_id):
|
||||
if content_id.endswith(cls.content_id_suffix):
|
||||
return {'filename': content_id[:-len(cls.content_id_suffix)]}
|
||||
else:
|
||||
return None
|
||||
if self._content_id is None:
|
||||
self._content_id = self.filename
|
||||
return self._content_id
|
||||
|
||||
@property
|
||||
def mime(self):
|
||||
|
|
|
@ -81,6 +81,7 @@ class MemoryFileStore(FileStore):
|
|||
self.remove(uri)
|
||||
value.filename = self.unique_filename(value.filename, uri=uri)
|
||||
self._files[uri] = value
|
||||
|
||||
return value
|
||||
|
||||
def by_uri(self, uri):
|
||||
|
@ -91,11 +92,6 @@ class MemoryFileStore(FileStore):
|
|||
if uri:
|
||||
return self.by_uri(uri)
|
||||
|
||||
def by_content_id(self, content_id):
|
||||
parsed = self.file_cls.parse_content_id(content_id)
|
||||
if parsed:
|
||||
return self.by_filename(parsed['filename'])
|
||||
|
||||
def __getitem__(self, uri):
|
||||
return self.by_uri(uri) or self.by_filename(uri)
|
||||
|
||||
|
|
|
@ -144,40 +144,9 @@ def test_external_urls():
|
|||
pass
|
||||
|
||||
|
||||
def test_msgloader():
|
||||
|
||||
data = {'charset': 'utf-8',
|
||||
'subject': 'Что-то по-русски',
|
||||
'mail_from': ('Максим Иванов', 'ivanov@ya.ru'),
|
||||
'mail_to': ('Полина Сергеева', 'polina@mail.ru'),
|
||||
'html': '<h1>Привет!</h1><p>В первых строках...',
|
||||
'text': 'Привет!\nВ первых строках...',
|
||||
'headers': {'X-Mailer': 'python-emails'},
|
||||
'attachments': [{'data': 'aaa', 'filename': 'Event.ics'},],
|
||||
'message_id': 'message_id'}
|
||||
|
||||
msg = emails.Message(**data).as_string()
|
||||
loader = MsgLoader(msg=msg)
|
||||
loader._parse_msg()
|
||||
assert 'Event.ics' in loader.list_files()
|
||||
assert loader['__index.html'] == data['html']
|
||||
assert loader['__index.txt'] == data['text']
|
||||
|
||||
assert emails.loader.from_rfc822(msg=msg).as_string()
|
||||
# TODO: more tests
|
||||
|
||||
|
||||
def _test_mass_msgloader():
|
||||
ROOT = os.path.dirname(__file__)
|
||||
for filename in glob.glob(os.path.join(ROOT, "data/msg/*.eml")):
|
||||
msg = email.message_from_string(open(filename).read())
|
||||
msgloader = MsgLoader(msg=msg)
|
||||
msgloader._parse_msg()
|
||||
|
||||
|
||||
def _get_loaders():
|
||||
# All loaders loads same data
|
||||
yield FileSystemLoader(os.path.join(ROOT, "data/html_import/oldornament/oldornament/"))
|
||||
yield FileSystemLoader(os.path.join(ROOT, "data/html_import/./oldornament/oldornament"))
|
||||
yield ZipLoader(open(os.path.join(ROOT, "data/html_import/oldornament/oldornament.zip"), 'rb'))
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
# encoding: utf-8
|
||||
from __future__ import unicode_literals, print_function
|
||||
import glob
|
||||
import email
|
||||
import os.path
|
||||
from emails.compat import to_native
|
||||
import emails.loader
|
||||
from emails.loader.local_store import MsgLoader
|
||||
#from emails.loader.helpers import guess_charset
|
||||
|
||||
ROOT = os.path.dirname(__file__)
|
||||
|
||||
|
||||
def _get_message():
|
||||
m = emails.loader.from_zip(open(os.path.join(ROOT, "data/html_import/oldornament/oldornament.zip"), 'rb'))
|
||||
m.text = 'text'
|
||||
n = len(m.attachments)
|
||||
for i, a in enumerate(m.attachments):
|
||||
a.content_disposition = 'inline' if i < n/2 else 'attachment'
|
||||
m.transformer.synchronize_inline_images()
|
||||
m.transformer.save()
|
||||
#open('oldornament.eml', 'wb').write(m.as_string())
|
||||
return m
|
||||
|
||||
|
||||
def _compare_messages(a, b):
|
||||
assert a.text == b.text
|
||||
assert a.html and a.html == b.html
|
||||
assert len(a.attachments) == len(b.attachments)
|
||||
assert sorted([att.filename for att in a.attachments]) == sorted([att.filename for att in b.attachments])
|
||||
for att in a.attachments:
|
||||
assert att.data == b.attachments.by_filename(att.filename).data
|
||||
|
||||
|
||||
def test_rfc822_loader(**kw):
|
||||
source_message = _get_message()
|
||||
message = emails.loader.from_rfc822(source_message.as_string(), **kw)
|
||||
_compare_messages(message, source_message)
|
||||
assert len(message.attachments.by_filename('arrow.png').data) == 484
|
||||
|
||||
|
||||
def test_msgloader():
|
||||
|
||||
data = {'charset': 'utf-8',
|
||||
'subject': 'Что-то по-русски',
|
||||
'mail_from': ('Максим Иванов', 'ivanov@ya.ru'),
|
||||
'mail_to': ('Полина Сергеева', 'polina@mail.ru'),
|
||||
'html': '<h1>Привет!</h1><p>В первых строках...',
|
||||
'text': 'Привет!\nВ первых строках...',
|
||||
'headers': {'X-Mailer': 'python-emails'},
|
||||
'attachments': [{'data': 'X', 'filename': 'Event.ics'},
|
||||
{'data': 'Y', 'filename': 'Map.png', 'content_disposition': 'inline'},],
|
||||
'message_id': 'message_id'}
|
||||
|
||||
source_message = emails.Message(**data)
|
||||
loader = MsgLoader(msg=source_message.as_string())
|
||||
assert loader.html == data['html']
|
||||
assert loader.text == data['text']
|
||||
|
||||
assert 'Event.ics' in loader.list_files()
|
||||
assert loader.content('Event.ics') == 'X'
|
||||
|
||||
# check file search by content-id
|
||||
map_cid = "cid:%s" % source_message.attachments['Map.png'].content_id
|
||||
assert loader.content(map_cid) == 'Y'
|
||||
|
||||
assert emails.loader.from_rfc822(msg=source_message.as_string()).as_string()
|
||||
# TODO: more tests
|
||||
|
||||
|
||||
def _try_decode(s):
|
||||
for charset in ['utf-8', 'cp1251']:
|
||||
try:
|
||||
return to_native(s, charset)
|
||||
except UnicodeDecodeError:
|
||||
pass
|
||||
|
||||
def test_mass_msgloader():
|
||||
ROOT = os.path.dirname(__file__)
|
||||
for filename in glob.glob(os.path.join(ROOT, "data/msg/*.eml")):
|
||||
msg = _try_decode(open(filename, 'rb').read())
|
||||
if msg is None:
|
||||
print("can not read filename=", filename)
|
||||
continue
|
||||
#msg = email.message_from_string(open(filename).read())
|
||||
msgloader = MsgLoader(msg=msg)
|
||||
print(len(msgloader.attachments))
|
||||
|
|
@ -3,81 +3,81 @@ from __future__ import unicode_literals
|
|||
import os
|
||||
import pytest
|
||||
import emails
|
||||
from emails import Message
|
||||
from emails.compat import NativeStringIO, to_bytes, to_native
|
||||
from emails.exc import DKIMException
|
||||
import emails.packages.dkim
|
||||
from .helpers import common_email_data
|
||||
|
||||
|
||||
TRAVIS_CI = os.environ.get('TRAVIS')
|
||||
HAS_INTERNET_CONNECTION = not TRAVIS_CI
|
||||
|
||||
KEYSIZE = 1024
|
||||
|
||||
DEFAULT_PRIVKEY = """-----BEGIN RSA PRIVATE KEY-----
|
||||
MIICXgIBAAKBgQDkAjnzycNmm4NXwTnW0T8p89UpLj/shFyh7UFDucxiRGiUVPdi
|
||||
F5QkoUvt+BGDd2DqrR42daEypP5/EkkvvMiuR1Yr1JM3/jzioshliVKv8luwbVhK
|
||||
ir16Utppig8BZ8RTeEOY0xIxCfhoQlO0jyEaVPm9jB/UXmUC9zxt8/5FfQIDAQAB
|
||||
AoGAEmBjj1R5nTF3eoEmSjv/HUB7s5/4ovVgCeT3V6AH6vuceigG8C76T6F4XyuZ
|
||||
LcFXXFKrlrQQU+acZF1y7JgIjGxY0zqZ85sIR5EQaTUygTp8eB5TK3ztZFqBvmvE
|
||||
n9F8pX52AihkN+fRlon/DOqvFgkuaQ58sZQtErURwSNgJkECQQDqKEyI6FoSKBjq
|
||||
tq96fZ4rn7GPvAUJvFKRamrttlGB4cFM4OEn/ovOfWXQPFJ4CqvEvr1SKVA4k3Ja
|
||||
QE55YILpAkEA+UcYarnI1w1kW+MSvq7CoYbY1FbgZerlQ7XvanjjjtETU9SuPxM+
|
||||
SahCidwc5JXdJqYZrSGl72hZjGMORF5JdQJBALRBr6FZVTVS/tN5LR8bou6sMdGX
|
||||
iT1UZy+gf45dYuOceeUH3Oyf7NpZ+E3UkhvtAwwjVbTxLttOzqIhjQetPzkCQQCw
|
||||
cTZDNMWIEp6au5ulBKYXFw+bHPMwsJce2kRgpLjNegeoKr47Py+zizmtwvNgiQNE
|
||||
PAWomkyNrNrVl7edhO+RAkEA4aC38DCBs3Y3NVFQvyRn3oRjDuAv04RxiSnd9XBi
|
||||
TQR25Ou2gNcYS33ddgnIrCLOjxcdrzORNcUitXjy3qsEfQ==
|
||||
PRIV_KEY = b"""-----BEGIN RSA PRIVATE KEY-----
|
||||
MIICXAIBAAKBgQDKHKzbg7LwpSJVfy9h8YQciVuIiexJ6OKJcCc6akJuLx+qPJGr
|
||||
t0chdV92slT9Lm1DUAjQEd8r9kVKa8FrWrnThMWx5HoXkGOIW2NqC0vrTZUgvhWy
|
||||
mlnwiysIylCirStZvA2uszYiFQK8slYD3H25UFTIOqLgB6AvV6URo26iJQIDAQAB
|
||||
AoGAOHt5B0Ov3zaW+MO5byq6m+r7DJZW1XTi0jvoipelhvteYwnYP9/RXhVaH2bI
|
||||
/5RY7qXQQK2t67BAPwMMI79QDL+jWsgwE0hly/qloOgEuX1+D/yGBShlYNQXvjAY
|
||||
UgkNYtp5JBVr8byz7upzvIyDsWJGoUrBindYnEiAVgwzZuECQQDKsKRwQhTCOZjW
|
||||
tkrockxDKMlXyKRLpOdqmwH0hwUdcWklxlmE+IJz4NVlz5qCVJz/oT+TgBNex8I5
|
||||
spxWAmdNAkEA/0UdnlXYueGVDIe5SUQGlXb8U8fTYtA/NsduFwq8QEWMrVBXK+uH
|
||||
4upq70kFlyfP5mpTOZwUgY2jH/qrXD8qOQJAdx1L5bTP4jxa94N1jhjtfGJRwMbm
|
||||
1pV4cgvaIEvg06a8djiUjzJD57lvbz+Lu5/iC9BFPnd76q1WFPZELb+H2QJBAK8y
|
||||
DWDlBEiW5QfjgqwhDu+36PfLNm4kBK6g8xLHYGowEZvFfv56uRloz5mIoVibj1lR
|
||||
ceshDwXXYrSJAuDdzSkCQDkx2TeKLUqKSxJNUYSrakQIo/41AOFvFBTbJuH3RZoy
|
||||
W/1DFMld7rC2gVHYW3m/LNd1qbi5QR9/buGxE7Y8ylI=
|
||||
-----END RSA PRIVATE KEY-----"""
|
||||
|
||||
PUB_KEY = b"""MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDKHKzbg7LwpSJVfy9h8YQciVuI
|
||||
iexJ6OKJcCc6akJuLx+qPJGrt0chdV92slT9Lm1DUAjQEd8r9kVKa8FrWrnThMWx
|
||||
5HoXkGOIW2NqC0vrTZUgvhWymlnwiysIylCirStZvA2uszYiFQK8slYD3H25UFTI
|
||||
OqLgB6AvV6URo26iJQIDAQAB"""
|
||||
|
||||
def _generate_privkey():
|
||||
|
||||
def _generate_key(length=1024):
|
||||
# From: http://stackoverflow.com/questions/3504955/using-rsa-in-python
|
||||
try:
|
||||
# From: http://stackoverflow.com/questions/3504955/using-rsa-in-python
|
||||
from Crypto.PublicKey import RSA
|
||||
private = RSA.generate(KEYSIZE)
|
||||
public = private.publickey()
|
||||
privkey = private.exportKey()
|
||||
key = RSA.generate(length)
|
||||
return to_bytes(key.exportKey()), to_bytes(key.publickey().exportKey())
|
||||
except ImportError:
|
||||
privkey = DEFAULT_PRIVKEY
|
||||
return PRIV_KEY, PUB_KEY
|
||||
|
||||
return to_bytes(privkey)
|
||||
|
||||
def _check_dkim(message, pub_key=PUB_KEY):
|
||||
def _plain_public_key(s):
|
||||
return b"".join([l for l in s.split(b'\n') if not l.startswith(b'---')])
|
||||
o = emails.packages.dkim.DKIM(message=message.as_string())
|
||||
return o.verify(dnsfunc=lambda name: b"".join([b"v=DKIM1; p=", _plain_public_key(pub_key)]))
|
||||
|
||||
|
||||
def test_dkim():
|
||||
|
||||
DKIM_PARAMS = [dict(privkey=NativeStringIO(to_native(_generate_privkey())),
|
||||
selector='_dkim',
|
||||
domain='somewhere.net',
|
||||
ignore_sign_errors=False),
|
||||
priv_key, pub_key = _generate_key(length=1024)
|
||||
|
||||
dict(privkey=_generate_privkey(),
|
||||
DKIM_PARAMS = [dict(privkey=NativeStringIO(to_native(priv_key)),
|
||||
selector='_dkim',
|
||||
domain='somewhere.net',
|
||||
ignore_sign_errors=False),
|
||||
domain='somewhere.net'),
|
||||
dict(privkey=priv_key,
|
||||
selector='_dkim',
|
||||
domain='somewhere.net'),
|
||||
]
|
||||
|
||||
for dkimparams in DKIM_PARAMS:
|
||||
|
||||
message = emails.html(html='<p>This is the end, beautiful friend<br>'
|
||||
'This is the end, my only friend',
|
||||
subject='Hello, world!',
|
||||
message_id=False,
|
||||
mail_from=('Jim', 'jim@somewhere.net'),
|
||||
mail_to='Anyone <anyone@here.net>')
|
||||
|
||||
message.attach(data=NativeStringIO('x' * 10), filename='Data.dat')
|
||||
|
||||
message = Message(**common_email_data())
|
||||
message.dkim(**dkimparams)
|
||||
|
||||
# check that DKIM header exist
|
||||
# check DKIM header exist
|
||||
assert message.as_message()['DKIM-Signature']
|
||||
assert 'DKIM-Signature: ' in message.as_string()
|
||||
# TODO: check dkim valid
|
||||
#print(__name__, "type message.as_string()==", type(message.as_string()))
|
||||
assert b'DKIM-Signature: ' in message.as_string()
|
||||
assert _check_dkim(message, pub_key)
|
||||
|
||||
|
||||
def test_dkim_errors():
|
||||
def test_dkim_error():
|
||||
|
||||
# Error in invalid key
|
||||
m = emails.html(**common_email_data())
|
||||
invalid_key = 'X'
|
||||
with pytest.raises(DKIMException):
|
||||
|
@ -85,3 +85,24 @@ def test_dkim_errors():
|
|||
selector='_dkim',
|
||||
domain='somewhere.net',
|
||||
ignore_sign_errors=False)
|
||||
|
||||
# Error on invalid dkim parameters
|
||||
|
||||
m.dkim(privkey=PRIV_KEY,
|
||||
selector='_dkim',
|
||||
domain='somewhere.net',
|
||||
include_headers=['To'])
|
||||
|
||||
with pytest.raises(DKIMException):
|
||||
# include_heades must contain 'From'
|
||||
m.as_string()
|
||||
|
||||
# Skip error on ignore_sign_errors=True
|
||||
m.dkim(privkey=PRIV_KEY,
|
||||
selector='_dkim',
|
||||
domain='somewhere.net',
|
||||
ignore_sign_errors=True,
|
||||
include_headers=['To'])
|
||||
|
||||
m.as_string()
|
||||
m.as_message()
|
||||
|
|
|
@ -5,15 +5,29 @@ from dateutil.parser import parse as dateutil_parse
|
|||
import pytest
|
||||
import emails
|
||||
import emails.exc
|
||||
from emails.compat import to_unicode, StringIO
|
||||
from emails.compat import to_unicode, StringIO, is_py2, is_py3
|
||||
from .helpers import common_email_data
|
||||
|
||||
|
||||
def test_message_types():
|
||||
|
||||
if is_py2:
|
||||
bytes_types = (str, )
|
||||
native_string = (unicode, )
|
||||
else:
|
||||
bytes_types = (bytes, )
|
||||
native_string = (str, )
|
||||
|
||||
m = emails.Message(**common_email_data())
|
||||
print(type(m.as_string()))
|
||||
#assert isinstance(m.as_message().as_string(), native_string)
|
||||
assert isinstance(m.as_string(), bytes_types)
|
||||
|
||||
|
||||
def test_message_build():
|
||||
|
||||
# Test simple build
|
||||
kwargs = common_email_data()
|
||||
m = emails.Message(**kwargs)
|
||||
m = emails.Message(**common_email_data())
|
||||
assert m.as_string()
|
||||
|
||||
# If no html or text - raises ValueError
|
||||
|
|
|
@ -35,6 +35,7 @@ def test_store_unique_name():
|
|||
assert f1.filename == 'c.gif'
|
||||
f2 = store.add({'uri': '/a/b/c.gif'})
|
||||
assert f2.filename == 'c-2.gif'
|
||||
assert f1.content_id != f2.content_id
|
||||
|
||||
|
||||
def test_store_commons2():
|
||||
|
@ -43,7 +44,6 @@ def test_store_commons2():
|
|||
assert f1.filename
|
||||
assert f1.content_id
|
||||
assert f1 in store and f1.uri in store # tests __contains__
|
||||
assert store.by_content_id(f1.content_id) == f1
|
||||
assert len(store) == 1 # tests __len__
|
||||
assert len(list(store.as_dict())) == 1
|
||||
with pytest.raises(ValueError):
|
||||
|
|
Reference in New Issue