wcs/wcs/qommon/emails.py

368 lines
14 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import os
import re
from email import encoders
from email.header import Header
from email.mime.application import MIMEApplication
from email.mime.audio import MIMEAudio
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.nonmultipart import MIMENonMultipart
from email.mime.text import MIMEText
from email.utils import formataddr
import smtplib
import socket
try:
import docutils
import docutils.core
import docutils.io
import docutils.parsers.rst
import docutils.parsers.rst.states
except ImportError:
docutils = None
from django.template.loader import render_to_string
from django.utils.safestring import mark_safe
from django.utils.six import StringIO
from quixote import get_request, get_response, get_publisher
from .publisher import get_cfg, get_logger
from . import force_str
from . import errors
from . import tokens
from .admin.emails import EmailsDirectory
from .template import Template
try:
import email.Charset as Charset
Charset.add_charset('utf-8', Charset.QP, Charset.QP, 'utf-8')
except ImportError:
pass
if docutils:
# custom parser to only allow arabic sequences, this prevents the rst
# parser to consider M. as starting a (upper alpha / roman) sequence.
class Body(docutils.parsers.rst.states.Body):
def is_enumerated_list_item(self, ordinal, sequence, format):
if format == 'period' and sequence != 'arabic':
return False
return docutils.parsers.rst.states.Body.is_enumerated_list_item(
self, ordinal, sequence, format)
class CustomRstParser(docutils.parsers.rst.Parser):
def __init__(self, *args, **kwargs):
docutils.parsers.rst.Parser.__init__(self, *args, **kwargs)
self.state_classes = tuple([Body] + list(self.state_classes[1:]))
def custom_template_email(key, mail_body_data, email_rcpt, **kwargs):
if not EmailsDirectory.is_enabled(key):
return
mail_subject = EmailsDirectory.get_subject(key)
mail_body = EmailsDirectory.get_body(key)
if not mail_body_data:
mail_body_data = {}
if not mail_body_data.get('sitename'):
mail_body_data['sitename'] = get_cfg('misc', {}).get('sitename')
return template_email(mail_subject, mail_body, mail_body_data, email_type=key,
email_rcpt=email_rcpt, **kwargs)
def template_email(subject, mail_body, mail_body_data, email_rcpt, email_type=None, **kwargs):
data = get_publisher().substitutions.get_context_variables()
if mail_body_data:
data.update(mail_body_data)
real_subject = Template(subject, autoescape=False).render(data)
real_mail_body = Template(mail_body, autoescape=False).render(data)
return email(real_subject, real_mail_body, email_rcpt=email_rcpt,
email_type=email_type, **kwargs)
def data_as_octet_stream(data, filename, **kwargs):
msg = MIMEApplication(data, **kwargs)
msg.add_header('Content-Disposition', 'attachment', filename=filename)
return msg
def data_as_text(data, filename, **kwargs):
msg = MIMEText(data, **kwargs)
msg.add_header('Content-Disposition', 'attachment', filename=filename)
return msg
def convert_to_mime(attachment):
if hasattr(attachment, 'get_file_pointer'): # qommon.form.PicklableUpload-like object
attachment.get_file_pointer().seek(0)
content = attachment.get_file_pointer().read()
content_type = getattr(attachment, 'content_type', None) or 'application/octet-stream'
maintype, subtype = content_type.split('/', 1)
charset = getattr(attachment, 'charset', None) or get_publisher().site_charset
if maintype == 'application':
part = MIMEApplication(content, subtype)
elif maintype == 'image':
part = MIMEImage(content, subtype)
elif maintype == 'text':
part = MIMEText(content, subtype, _charset=charset)
elif maintype == 'audio':
part = MIMEAudio(content, subtype)
else:
part = MIMENonMultipart(maintype, subtype)
part.set_payload(content, charset=charset)
encoders.encode_base64(part)
if getattr(attachment, 'base_filename', None):
part.add_header('Content-Disposition', 'attachment',
filename=attachment.base_filename)
return part
get_logger().warn('Failed to build MIME part from %r', attachment)
def email(subject, mail_body, email_rcpt, replyto=None, bcc=None,
email_from=None, exclude_current_user=False, email_type=None,
want_html=True, hide_recipients=False, fire_and_forget=False,
smtp_timeout=None, attachments=(),
extra_headers=None):
if not get_request():
# we are not processing a request, no sense delaying the handling
# (for example when running a cronjob)
fire_and_forget = False
emails_cfg = get_cfg('emails', {})
footer = emails_cfg.get('footer') or ''
encoding = get_publisher().site_charset
# in restructuredtext lines starting with a pipe were used to give
# appropriate multiline formatting, remove them.
footer = re.sub(r'^\|\s+', '', footer, flags=re.DOTALL | re.MULTILINE)
text_body = str(mail_body)
html_body = None
if text_body.startswith('<'):
# native HTML, keep it that way
html_body = text_body
text_body = None
elif want_html:
# body may be reStructuredText, try converting.
try:
htmlmail, pub = docutils.core.publish_programmatically(
source_class = docutils.io.StringInput,
source = mail_body,
source_path = None,
destination_class = docutils.io.StringOutput,
destination = None,
destination_path = None,
reader = None, reader_name = 'standalone',
parser=CustomRstParser(), parser_name=None,
writer = None, writer_name = 'html',
settings = None, settings_spec = None,
settings_overrides = {'input_encoding': encoding,
'output_encoding': encoding,
'embed_stylesheet': False,
'stylesheet': None,
'stylesheet_path': None,
'file_insertion_enabled': 0,
'xml_declaration': 0,
'report_level': 5},
config_section = None,
enable_exit_status = None)
# change paragraphs so manual newlines are considered.
htmlmail = force_str(htmlmail).replace('<p>', '<p style="white-space: pre-line;">')
except:
htmlmail = None
try:
html_body = re.findall('<body>(.*)</body>', htmlmail or '', re.DOTALL)[0]
except IndexError:
pass
context = get_publisher().get_substitution_variables()
context['email_signature'] = footer
context['subject'] = mark_safe(subject)
subject = render_to_string('qommon/email_subject.txt', context).strip()
# handle action links/buttons
button_re = re.compile(r'---===BUTTON:(?P<token>[a-zA-Z0-9]*):(?P<label>.*?)===---')
def get_action_url(match):
return '%s/actions/%s/' % (
get_publisher().get_frontoffice_url(),
match.group('token'))
def text_button(match):
return '[%s] %s' % (match.group('label'), get_action_url(match))
def html_button(match):
context = {
'label': match.group('label'),
'url': get_action_url(match),
}
return force_str(render_to_string('qommon/email_button_link.html', context))
text_body = button_re.sub(text_button, text_body) if text_body else None
html_body = button_re.sub(html_button, html_body) if html_body else None
if text_body:
context['content'] = mark_safe(text_body)
text_body = render_to_string('qommon/email_body.txt', context)
if html_body:
context['content'] = mark_safe(html_body)
html_body = render_to_string('qommon/email_body.html', context)
if text_body and html_body:
msg_body = MIMEMultipart(_charset=encoding, _subtype='alternative')
msg_body.attach(MIMEText(text_body, _charset=encoding))
msg_body.attach(MIMEText(html_body, _subtype='html', _charset=encoding))
elif text_body:
msg_body = MIMEText(text_body, _charset=encoding)
else:
msg_body = MIMEText(html_body, _subtype='html', _charset=encoding)
attachments_parts = []
for attachment in attachments or []:
if not isinstance(attachment, MIMEBase):
attachment = convert_to_mime(attachment)
if attachment:
attachments_parts.append(attachment)
if not attachments_parts:
msg = msg_body
else:
msg = MIMEMultipart(_charset=encoding, _subtype='mixed')
msg.attach(msg_body)
for attachment in attachments_parts:
msg.attach(attachment)
msg['Subject'] = Header(subject, encoding)
if hide_recipients or email_rcpt is None:
msg['To'] = 'Undisclosed recipients:;'
else:
if type(email_rcpt) is list:
msg['To'] = ', '.join(email_rcpt)
else:
msg['To'] = email_rcpt
if not email_from:
email_from = emails_cfg.get('from', 'noreply@entrouvert.com')
sitename = get_publisher().get_site_option('global_title', 'variables')
if sitename:
msg['From'] = formataddr((str(Header(sitename, encoding)), email_from))
else:
msg['From'] = email_from
if emails_cfg.get('reply_to'):
msg['Reply-To'] = emails_cfg.get('reply_to')
if replyto:
msg['Reply-To'] = replyto
msg['X-Qommon-Id'] = os.path.basename(get_publisher().app_dir)
if extra_headers:
for key, value in extra_headers.items():
msg[key] = value
if type(email_rcpt) is list:
rcpts = email_rcpt[:]
else:
rcpts = [email_rcpt]
if bcc:
rcpts += bcc
if exclude_current_user:
user = get_request().user
if user and user.email and user.email in rcpts:
rcpts.remove(user.email)
rcpts = [x for x in rcpts if x]
if len(rcpts) == 0:
return
mail_redirection = get_cfg('debug', {}).get('mail_redirection')
if mail_redirection:
rcpts = [mail_redirection]
if os.environ.get('QOMMON_MAIL_REDIRECTION'):
# if QOMMON_MAIL_REDIRECTION is set in the environment, send all emails
# to that address instead of the real recipients.
rcpts = [os.environ.get('QOMMON_MAIL_REDIRECTION')]
if not fire_and_forget:
s = create_smtp_server(emails_cfg, smtp_timeout=smtp_timeout)
try:
s.sendmail(email_from, rcpts, msg.as_string())
except smtplib.SMTPRecipientsRefused:
get_logger().error('Failed to send mail to %r', rcpts)
s.close()
else:
get_response().add_after_job('sending email',
EmailToSend(email_from, rcpts, msg.as_string()),
fire_and_forget = True)
def create_smtp_server(emails_cfg, smtp_timeout=None):
try:
s = smtplib.SMTP(emails_cfg.get('smtp_server', None) or 'localhost',
timeout=smtp_timeout)
except socket.timeout:
get_logger().error('Failed to connect to SMTP server (timeout)')
raise errors.EmailError('Failed to connect to SMTP server (timeout)')
except socket.error:
# XXX: write message in a queue somewhere?
get_logger().error('Failed to connect to SMTP server')
raise errors.EmailError('Failed to connect to SMTP server')
if not s.sock:
get_logger().error('Failed to connect to SMTP server')
raise errors.EmailError('Failed to connect to SMTP server')
rc_code, ehlo_answer = s.ehlo()
if rc_code != 250:
get_logger().error('Failed to EHLO to SMTP server (%s)', rc_code)
raise errors.EmailError('Failed to EHLO to SMTP server (%s)' % rc_code)
if 'STARTTLS' in ehlo_answer:
rc_code, starttls_answer = s.starttls()
if rc_code != 220:
get_logger().error('Failed to STARTTLS to SMTP server (%s)', rc_code)
raise errors.EmailError('Failed to STARTTLS to SMTP server (%s)' % rc_code)
if emails_cfg.get('smtp_login'):
try:
s.login(emails_cfg.get('smtp_login') or '',
emails_cfg.get('smtp_password') or '')
except smtplib.SMTPAuthenticationError:
get_logger().error('Failed to authenticate to SMTP server')
raise errors.EmailError('Failed to authenticate to SMTP server')
except smtplib.SMTPException:
get_logger().error('Failed to authenticate to SMTP server, unknown error.')
raise errors.EmailError('Failed to authenticate to SMTP server, unknown error.')
return s
class EmailToSend(object):
def __init__(self, msg_from, rcpts, msg_as_string):
self.msg_from = msg_from
self.rcpts = rcpts
self.msg_as_string = msg_as_string
def __call__(self, job=None):
emails_cfg = get_cfg('emails', {})
s = create_smtp_server(emails_cfg)
try:
s.sendmail(self.msg_from, self.rcpts, self.msg_as_string)
except smtplib.SMTPRecipientsRefused:
pass
s.close()