fix encoding in zip and directory loaders

This commit is contained in:
Sergey Lavrinenko 2015-02-27 00:34:13 +03:00
parent c4c8a80aec
commit 70aa73715c
9 changed files with 165 additions and 72 deletions

View File

@ -1,12 +1,12 @@
# encoding: utf-8
import os
import os.path
from emails.loader.helpers import guess_charset
from emails.compat import to_unicode
from emails.compat import urlparse
from emails import Message
from emails.utils import fetch_url
from emails.loader import local_store
from emails.loader.helpers import guess_charset
def from_url(url, message_params=None, requests_params=None, **kwargs):
@ -43,7 +43,7 @@ def from_directory(directory, index_file=None, message_params=None, **kwargs):
store.base_path = dirname
message_params = message_params or {}
message = Message(html=store[index_file_name], **message_params)
message = Message(html=store.content(index_file_name, is_html=True, guess_charset=True), **message_params)
message.create_transformer(local_loader=store, requests_params=kwargs.pop('requests_params', None))
message.transformer.load_and_transform(**kwargs)
message.transformer.save()
@ -62,7 +62,7 @@ def from_zip(zip_file, message_params=None, **kwargs):
store.base_path = dirname
message_params = message_params or {}
message = Message(html=store[index_file_name], **message_params)
message = Message(html=store.content(index_file_name, is_html=True, guess_charset=True), **message_params)
message.create_transformer(local_loader=store, requests_params=kwargs.pop('requests_params', None))
message.transformer.load_and_transform(**kwargs)
message.transformer.save()
@ -83,8 +83,8 @@ from_string = from_html
def from_rfc822(msg, message_params=None, **kw):
store = local_store.MsgLoader(msg=msg)
text = store.get_source('__index.txt')
html = store.get_source('__index.html')
text = store['__index.txt']
html = store['__index.html']
message_params = message_params or {}
message = Message(html=html, text=text, **message_params)

View File

@ -4,19 +4,8 @@ __all__ = ['guess_charset', 'fix_content_type']
import re
import cgi
import chardet
from emails.compat import to_unicode, is_py3, is_py2
import logging
# HTML page charset stuff
RE_CHARSET_U = re.compile(u"charset=\"?'?(.+)\"?'?", re.I + re.S + re.M)
RE_META_U = re.compile(u"<meta.*?http-equiv=\"?'?content-type\"?'?.*?>", re.I + re.S + re.M)
RE_INSIDE_META_U = re.compile(u"content=\"?'?([^\"'>]+)", re.I + re.S + re.M)
RE_CHARSET_B = re.compile(b"charset=\"?'?(.+)\"?'?", re.I + re.S + re.M)
RE_META_B = re.compile(b"<meta.*?http-equiv=\"?'?content-type\"?'?.*?>", re.I + re.S + re.M)
RE_INSIDE_META_B = re.compile(b"content=\"?'?([^\"'>]+)", re.I + re.S + re.M)
import charade
from emails.compat import to_unicode, to_native
def fix_content_type(content_type, t='image'):
@ -26,6 +15,45 @@ def fix_content_type(content_type, t='image'):
return content_type
# HTML page charset stuff
class ReRules:
re_meta = b"(?i)(?<=<meta).*?(?=>)"
re_is_http_equiv = b"http-equiv=\"?'?content-type\"?'?"
re_parse_http_equiv = b"http-equiv=\"?'?content-type\"?'?"
re_charset = b"charset=\"?'?([\w-]+)\"?'?"
def __init__(self, conv=None):
if conv is None:
conv = lambda x: x
for k in dir(self):
if k.startswith('re_'):
setattr(self, k, re.compile(conv(getattr(self, k))))
RULES_U = ReRules(conv=to_unicode)
RULES_B = ReRules()
def guess_text_charset(text, is_html=False):
if is_html:
rules = isinstance(text, bytes) and RULES_B or RULES_U
for meta in rules.re_meta.findall(text):
if rules.re_is_http_equiv.findall(meta):
for content in rules.re_parse_http_equiv.findall(meta):
for charset in rules.re_charset.findall(content):
return to_native(charset)
else:
for charset in rules.re_charset.findall(meta):
return to_native(charset)
# guess by chardet
if isinstance(text, bytes):
return to_native(charade.detect(text)['encoding'])
def guess_html_charset(html):
return guess_text_charset(text=html, is_html=True)
def guess_charset(headers, html):
# guess by http headers
@ -37,20 +65,43 @@ def guess_charset(headers, html):
if r:
return r
# guess by html meta
if isinstance(html, bytes):
RE_META, RE_INSIDE_META, RE_CHARSET = RE_META_B, RE_INSIDE_META_B, RE_CHARSET_B
else:
# Should we guess encoding for unicode html ?
RE_META, RE_INSIDE_META, RE_CHARSET = RE_META_U, RE_INSIDE_META_U, RE_CHARSET_U
# guess by html content
charset = guess_html_charset(html)
if charset:
return to_unicode(charset)
for s in RE_META.findall(html):
for x in RE_INSIDE_META.findall(s):
for charset in RE_CHARSET.findall(x):
return to_unicode(charset)
COMMON_CHARSETS = ('ascii', 'utf-8', 'utf-16', 'windows-1251', 'windows-1252', 'cp850')
if isinstance(html, bytes):
# guess by chardet
return chardet.detect(html)['encoding']
def decode_text(text,
is_html=False,
guess_charset=True,
try_common_charsets=True,
charsets=None,
fallback_charset='utf-8'):
if not isinstance(text, bytes):
return text
_charsets = []
if guess_charset:
c = guess_text_charset(text, is_html=is_html)
if c:
_charsets.append(c)
if charsets:
_charsets.extend(charsets)
if try_common_charsets:
_charsets.extend(COMMON_CHARSETS)
if fallback_charset:
_charsets.append(fallback_charset)
_last_exc = None
for enc in _charsets:
try:
return to_unicode(text, charset=enc), enc
except UnicodeDecodeError as exc:
_last_exc = exc
raise _last_exc

View File

@ -9,6 +9,7 @@ from zipfile import ZipFile
import email
from emails.compat import to_unicode, string_types
from emails.loader.helpers import guess_html_charset, decode_text
class FileNotFound(Exception):
@ -42,13 +43,27 @@ def open_if_exists(filename, mode='rb'):
class BaseLoader(object):
def __getitem__(self, filename):
try:
contents, _ = self.get_source(filename)
contents, _ = self.get_file(filename)
return contents
except FileNotFound:
return None
def get_file(self, name):
raise NotImplementedError
def content(self, filename, is_html=False, decode=True, guess_charset=False, charset='utf-8'):
data = self[filename]
if decode:
data, encoding = decode_text(data,
is_html=is_html,
guess_charset=guess_charset,
try_common_charsets=False,
fallback_charset=charset)
return data
def find_index_file(self, filename=None):
if filename:
if self[filename]:
@ -98,25 +113,21 @@ class FileSystemLoader(BaseLoader):
self.encoding = encoding
self.base_path = base_path
def get_source(self, template):
def get_file(self, filename):
if self.base_path:
name = path.join(self.base_path, template)
pieces = split_template_path(template)
filename = path.join(self.base_path, filename)
pieces = split_template_path(filename)
for searchpath in self.searchpath:
filename = path.join(searchpath, *pieces)
f = open_if_exists(filename)
if f is None:
continue
try:
contents = f.read().decode(self.encoding)
contents = f.read()
finally:
f.close()
return contents, filename
raise FileNotFound(template)
raise FileNotFound(filename)
def list_files(self):
found = set()
@ -133,11 +144,11 @@ class FileSystemLoader(BaseLoader):
class ZipLoader(BaseLoader):
def __init__(self, file, encoding='utf-8', base_path=None):
def __init__(self, file, encoding='utf-8', base_path=None, guess_encoding=True):
self.zipfile = ZipFile(file, 'r')
self.encoding = encoding
self.base_path = base_path
self.mapping = {}
self.guess_encoding = guess_encoding
self._filenames = None
def _decode_zip_filename(self, name):
@ -155,7 +166,7 @@ class ZipLoader(BaseLoader):
decoded_name = self._decode_zip_filename(name)
self._filenames[decoded_name] = name
def get_source(self, name):
def get_file(self, name):
if self.base_path:
name = path.join(self.base_path, name)
@ -165,19 +176,12 @@ class ZipLoader(BaseLoader):
if isinstance(name, str):
name = to_unicode(name, 'utf-8')
data = self.mapping.get(name, None)
if data is not None:
return data, name
original_name = self._filenames.get(name)
if original_name is None:
raise FileNotFound(name)
data = self.zipfile.read(original_name).decode(self.encoding)
return data, name
return self.zipfile.read(original_name), name
def list_files(self):
self._unpack_zip()
@ -277,12 +281,12 @@ class MsgLoader(BaseLoader):
self.add_another_part(part)
def get_source(self, name):
def get_file(self, name):
self._parse_msg()
f = self._files.get(name)
if f:
return f['data'], name
return None, name
raise FileNotFound(name)
def list_files(self):
return self._files

View File

@ -68,7 +68,7 @@ class SecureSMTPDServer(object):
self.port = 25127
else:
self.port = 25125
cmd = '/bin/sh ./run.sh'.split(' ')
cmd = '/bin/sh ./run-smtpd.sh'.split(' ')
if argv:
cmd.extend(argv)
self._process = subprocess.Popen(cmd, shell=False, cwd=self._cwd)

View File

@ -10,5 +10,5 @@ else
PYTHON=python
fi
echo "$PYTHON run.py $@"
$PYTHON run.py $@
echo "use python $PYTHON"
$PYTHON run-smtpd.py $@

View File

@ -0,0 +1,46 @@
# encoding: utf-8
from __future__ import unicode_literals, print_function
from emails.store.file import fix_content_type
from emails.loader.helpers import guess_charset, decode_text
def test_guess_charset():
assert guess_charset(headers={'content-type': 'text/html; charset=utf-8'}, html='') == 'utf-8'
assert guess_charset(headers=None, html='<meta charset="xxx-N" >') == 'xxx-N'
html = """<html><meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />"""
assert guess_charset(headers=None, html=html) == 'UTF-8'
html = """Шла Саша по шоссе и сосала сушку"""
assert guess_charset(headers=None, html=html.encode('utf-8')) == 'utf-8'
def test_fix_content_type():
assert fix_content_type('x') == 'x'
assert fix_content_type('') == 'image/unknown'
def test_decode_text():
import encodings
def norma_enc(enc):
enc_ = encodings.normalize_encoding(enc.lower())
enc_ = encodings._aliases.get(enc_) or enc_
assert enc_
return enc_
assert decode_text(u'A')[0] == u'A'
assert decode_text(b'A') == (u'A', 'ascii')
for enc in ['utf-8', 'windows-1251', 'cp866']:
t = u'Шла Саша по шоссе и сосала сушку. В огороде бузина, в Киеве дядька.'
text, guessed_encoding = decode_text(t.encode(enc))
print(text, norma_enc(guessed_encoding))
assert (text, norma_enc(guessed_encoding)) == (t, norma_enc(enc))
html = u"""<html><meta http-equiv="Content-Type" content="text/html; charset=%s" />""" % enc
text, guessed_encoding = decode_text(html.encode('utf-8'), is_html=True)
print(text, norma_enc(guessed_encoding))
assert (text, norma_enc(guessed_encoding)) == (html, norma_enc(enc))

View File

@ -9,8 +9,9 @@ import emails
import emails.loader
import emails.transformer
from emails.loader.local_store import MsgLoader, FileSystemLoader, FileNotFound, ZipLoader
from emails.loader import guess_charset
from emails.compat import text_type
from emails.loader.helpers import guess_charset
ROOT = os.path.dirname(__file__)
BASE_URL = 'http://lavr.github.io/python-emails/tests/campaignmonitor-samples/oldornament'
@ -138,16 +139,6 @@ def _test_mass_msgloader():
msgloader._parse_msg()
def test_guess_charset():
html = """<html><meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />"""
assert guess_charset(headers=None, html=html) == 'UTF-8'
html = """Шла Саша по шоссе и сосала сушку"""
assert guess_charset(headers=None, html=html.encode('utf-8')) == 'utf-8'
assert guess_charset(headers={'content-type': 'text/html; charset=utf-8'}, html='') == 'utf-8'
def _get_loaders():
# All loaders loads same data
yield FileSystemLoader(os.path.join(ROOT, "data/html_import/oldornament/"))
@ -158,10 +149,11 @@ def test_local_store1():
for loader in _get_loaders():
print(loader)
print(type(loader['index.html']))
assert isinstance(loader['index.html'], text_type)
assert '<table' in loader['index.html']
assert isinstance(loader.content('index.html'), text_type)
assert isinstance(loader['index.html'], bytes)
assert '<table' in loader.content('index.html')
with pytest.raises(FileNotFound):
loader.get_source('nofile.ext')
loader.get_file('nofile.ext')
files_list = list(loader.list_files())
assert 'images/arrow.png' in files_list
assert len(files_list) in [15, 16]

View File

@ -1,6 +1,6 @@
cssutils
lxml
chardet
charade
python-dateutil
requests
premailer>=2.8.3