wcs/wcs/qommon/emails.py

302 lines
11 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import os
from email.mime.application import MIMEApplication
from email.MIMEText import MIMEText
from email.MIMEMultipart import MIMEMultipart
from email.Header import Header
import email.Charset as Charset
import smtplib
import socket
from cStringIO import StringIO
import ezt
try:
import docutils
import docutils.core
import docutils.io
except ImportError:
docutils = None
from quixote import get_request, get_response, get_publisher
from publisher import get_cfg, get_logger
import errors
import tokens
from admin.emails import EmailsDirectory
Charset.add_charset('utf-8', Charset.QP, Charset.QP, 'utf-8')
def custom_ezt_email(key, mail_body_data, email_rcpt, **kwargs):
if not EmailsDirectory.is_enabled(key):
return
mail_subject = EmailsDirectory.get_subject(key)
mail_body = EmailsDirectory.get_body(key)
if not mail_body_data:
mail_body_data = {}
if not mail_body_data.get('sitename'):
mail_body_data['sitename'] = get_cfg('misc', {}).get('sitename')
return ezt_email(mail_subject, mail_body, mail_body_data, email_type = key,
email_rcpt = email_rcpt, **kwargs)
def ezt_email(subject, mail_body, mail_body_data, email_rcpt, email_type = None, **kwargs):
mail_subject_template = ezt.Template(compress_whitespace = False)
mail_subject_template.parse(subject)
data = get_publisher().substitutions.get_context_variables()
if mail_body_data:
data.update(mail_body_data)
fd = StringIO()
mail_subject_template.generate(fd, data)
real_mail_subject = fd.getvalue()
mail_body_template = ezt.Template(compress_whitespace = False)
mail_body_template.parse(mail_body)
fd = StringIO()
mail_body_template.generate(fd, data)
real_mail_body = fd.getvalue()
return email(real_mail_subject, real_mail_body, email_rcpt = email_rcpt,
email_type = email_type, **kwargs)
def data_as_octet_stream(data, filename, **kwargs):
msg = MIMEApplication(data, **kwargs)
msg.add_header('Content-Disposition', 'attachment', filename=filename)
return msg
def data_as_text(data, filename, **kwargs):
msg = MIMEText(data, **kwargs)
msg.add_header('Content-Disposition', 'attachment', filename=filename)
return msg
def email(subject, mail_body, email_rcpt, replyto = None, bcc = None,
email_from=None, exclude_current_user=False, email_type=None,
want_html=True, hide_recipients = False, fire_and_forget = False,
smtp_timeout = None, attachments = ()):
if not get_request():
# we are not processing a request, no sense delaying the handling
# (for example when running a cronjob)
fire_and_forget = False
emails_cfg = get_cfg('emails', {})
footer = emails_cfg.get('footer')
encoding = get_publisher().site_charset
mail_body = str(mail_body)
htmlmail = None
if want_html:
try:
if footer:
rst_footer = footer
if not rst_footer.startswith('|') and '\n' in rst_footer:
# unless the footer text is already formatted like a block
# of lines, add pipes to give it appropriate multilines
# formatting.
rst_footer = '\n'.join(['| ' + x for x in rst_footer.splitlines()])
rst_mail_body = mail_body + '\n\n--------\n\n' + rst_footer
else:
rst_mail_body = mail_body
htmlmail, pub = docutils.core.publish_programmatically(
source_class = docutils.io.StringInput,
source = rst_mail_body,
source_path = None,
destination_class = docutils.io.StringOutput,
destination = None,
destination_path = None,
reader = None, reader_name = 'standalone',
parser = None, parser_name = 'restructuredtext',
writer = None, writer_name = 'html',
settings = None, settings_spec = None,
settings_overrides = {'input_encoding': encoding,
'output_encoding': encoding,
'embed_stylesheet': False,
'stylesheet': None,
'stylesheet_path': None,
'file_insertion_enabled': 0,
'xml_declaration': 0,
'report_level': 5},
config_section = None,
enable_exit_status = None)
except:
pass
else:
if pub.document.reporter.max_level >= 3:
# notify user
pass
if mail_body.startswith('<'):
htmlmail = mail_body
mail_body = None
elif footer:
mail_body += '\n-- \n' + footer
# If the mail contains only one part, make it the main contentn of the mail
# and do not use the multipart encoding
multipart = (len(attachments) > 0) or (mail_body and htmlmail)
if multipart:
msg = MIMEMultipart(_charset = encoding, _subtype = 'alternative')
else:
if mail_body:
msg = MIMEText(mail_body, _charset = encoding)
else:
msg = MIMEText(htmlmail, _subtype = 'html', _charset = encoding)
msg['Subject'] = Header(subject, encoding)
if hide_recipients or email_rcpt is None:
msg['To'] = 'Undisclosed recipients:;'
else:
if type(email_rcpt) is list:
msg['To'] = ', '.join(email_rcpt)
else:
msg['To'] = email_rcpt
if email_from:
msg['From'] = email_from
else:
msg['From'] = emails_cfg.get('from', 'noreply@entrouvert.com')
if emails_cfg.get('reply_to'):
msg['Reply-To'] = emails_cfg.get('reply_to')
if replyto:
msg['Reply-To'] = replyto
msg['X-Qommon-Id'] = os.path.basename(get_publisher().app_dir)
msg.preamble = ''
msg.epilogue = ''
if mail_body and multipart:
msg.attach(MIMEText(mail_body, _charset = encoding))
if htmlmail and multipart:
msg.attach(MIMEText(htmlmail, _subtype = 'html', _charset = encoding))
for attachment in attachments:
msg.attach(attachment)
if type(email_rcpt) is list:
rcpts = email_rcpt[:]
else:
rcpts = [email_rcpt]
if bcc:
rcpts += bcc
if exclude_current_user:
user = get_request().user
if user and user.email and user.email in rcpts:
rcpts.remove(user.email)
rcpts = [x for x in rcpts if x]
if len(rcpts) == 0:
return
mail_redirection = get_cfg('debug', {}).get('mail_redirection')
if mail_redirection:
rcpts = [mail_redirection]
if os.environ.get('QOMMON_MAIL_REDIRECTION'):
# if QOMMON_MAIL_REDIRECTION is set in the environment, send all emails
# to that address instead of the real recipients.
rcpts = [os.environ.get('QOMMON_MAIL_REDIRECTION')]
if emails_cfg.get('bounce_handler'):
if get_request():
server_name = get_request().get_server().split(':')[0]
else:
server_name = msg['From'].split('@')[1]
token = tokens.Token(7 * 86400)
token.type = 'email-bounce'
token.email_rcpts = [str(x) for x in rcpts]
token.email_message = msg.as_string()
token.email_type = str(email_type)
token.store()
msg_from = '%s-bounces+%s@%s' % (get_publisher().APP_NAME, token.id, server_name)
else:
msg_from = msg['From']
if not fire_and_forget:
s = create_smtp_server(emails_cfg, smtp_timeout=smtp_timeout)
try:
s.sendmail(msg_from, rcpts, msg.as_string())
except smtplib.SMTPRecipientsRefused:
get_logger().error('Failed to send mail to %s' % ', '.join(rcpts))
s.close()
else:
get_response().add_after_job('sending email',
EmailToSend(msg_from, rcpts, msg.as_string()),
fire_and_forget = True)
def create_smtp_server(emails_cfg, smtp_timeout=None):
try:
s = smtplib.SMTP(emails_cfg.get('smtp_server', None) or 'localhost',
timeout=smtp_timeout)
except socket.timeout:
get_logger().error('Failed to connect to SMTP server (timeout)')
raise errors.EmailError('Failed to connect to SMTP server (timeout)')
except socket.error:
# XXX: write message in a queue somewhere?
get_logger().error('Failed to connect to SMTP server')
raise errors.EmailError('Failed to connect to SMTP server')
if not s.sock:
get_logger().error('Failed to connect to SMTP server')
raise errors.EmailError('Failed to connect to SMTP server')
rc_code, ehlo_answer = s.ehlo()
if rc_code != 250:
get_logger().error('Failed to EHLO to SMTP server (%s)', rc_code)
raise errors.EmailError('Failed to EHLO to SMTP server (%s)' % rc_code)
if 'STARTTLS' in ehlo_answer:
rc_code, starttls_answer = s.starttls()
if rc_code != 220:
get_logger().error('Failed to STARTTLS to SMTP server (%s)', rc_code)
raise errors.EmailError('Failed to STARTTLS to SMTP server (%s)' % rc_code)
if emails_cfg.get('smtp_login'):
try:
s.login(emails_cfg.get('smtp_login') or '',
emails_cfg.get('smtp_password') or '')
except smtplib.SMTPAuthenticationError:
get_logger().error('Failed to authenticate to SMTP server')
raise errors.EmailError('Failed to authenticate to SMTP server')
except smtplib.SMTPException:
get_logger().error('Failed to authenticate to SMTP server, unknown error.')
raise errors.EmailError('Failed to authenticate to SMTP server, unknown error.')
return s
class EmailToSend(object):
def __init__(self, msg_from, rcpts, msg_as_string):
self.msg_from = msg_from
self.rcpts = rcpts
self.msg_as_string = msg_as_string
def __call__(self, job=None):
emails_cfg = get_cfg('emails', {})
s = create_smtp_server(emails_cfg)
try:
s.sendmail(self.msg_from, self.rcpts, self.msg_as_string)
except smtplib.SMTPRecipientsRefused:
pass
s.close()