398 lines
15 KiB
Python
398 lines
15 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import os
|
|
import re
|
|
|
|
from email import encoders
|
|
from email.header import Header
|
|
from email.mime.application import MIMEApplication
|
|
from email.mime.audio import MIMEAudio
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.image import MIMEImage
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.nonmultipart import MIMENonMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.utils import formataddr
|
|
import smtplib
|
|
import socket
|
|
|
|
try:
|
|
import docutils
|
|
import docutils.core
|
|
import docutils.io
|
|
import docutils.parsers.rst
|
|
import docutils.parsers.rst.states
|
|
except ImportError:
|
|
docutils = None
|
|
|
|
from django.template.loader import render_to_string
|
|
from django.utils.safestring import mark_safe
|
|
from django.utils.six import StringIO
|
|
|
|
from quixote import get_request, get_response, get_publisher
|
|
|
|
from .publisher import get_cfg, get_logger
|
|
from . import force_str
|
|
from . import errors
|
|
from . import tokens
|
|
from .admin.emails import EmailsDirectory
|
|
from .template import Template
|
|
|
|
try:
|
|
import email.Charset as Charset
|
|
Charset.add_charset('utf-8', Charset.QP, Charset.QP, 'utf-8')
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
if docutils:
|
|
from docutils import statemachine
|
|
|
|
class Body(docutils.parsers.rst.states.Body):
|
|
def is_enumerated_list_item(self, ordinal, sequence, format):
|
|
# customised to only allow arabic sequences, this prevents the rst
|
|
# parser to consider M. as starting a (upper alpha / roman) sequence.
|
|
if format == 'period' and sequence != 'arabic':
|
|
return False
|
|
return docutils.parsers.rst.states.Body.is_enumerated_list_item(
|
|
self, ordinal, sequence, format)
|
|
|
|
def line(self, match, context, next_state):
|
|
# customised to ignore unexpected overlines or transitions (due
|
|
# for example by a field filled by question marks.
|
|
if self.state_machine.match_titles:
|
|
return [match.string], 'Line', []
|
|
elif match.string.strip() == '::':
|
|
raise statemachine.TransitionCorrection('text')
|
|
else:
|
|
# Unexpected possible title overline or transition.
|
|
# Treating it as ordinary text.
|
|
raise statemachine.TransitionCorrection('text')
|
|
|
|
class CustomRstParser(docutils.parsers.rst.Parser):
|
|
def __init__(self, *args, **kwargs):
|
|
docutils.parsers.rst.Parser.__init__(self, *args, **kwargs)
|
|
self.state_classes = tuple([Body] + list(self.state_classes[1:]))
|
|
docutils.parsers.rst.states.state_classes = self.state_classes
|
|
|
|
def custom_rststate_init(self, state_machine, debug=False):
|
|
state_classes = tuple([Body] + list(docutils.parsers.rst.states.state_classes[1:]))
|
|
self.nested_sm_kwargs = {'state_classes': state_classes,
|
|
'initial_state': 'Body'}
|
|
docutils.parsers.rst.states.StateWS.__init__(self, state_machine, debug)
|
|
|
|
docutils.parsers.rst.states.RSTState.__init__ = custom_rststate_init
|
|
|
|
|
|
def custom_template_email(key, mail_body_data, email_rcpt, **kwargs):
|
|
if not EmailsDirectory.is_enabled(key):
|
|
return
|
|
mail_subject = EmailsDirectory.get_subject(key)
|
|
mail_body = EmailsDirectory.get_body(key)
|
|
if not mail_body_data:
|
|
mail_body_data = {}
|
|
if not mail_body_data.get('sitename'):
|
|
mail_body_data['sitename'] = get_cfg('misc', {}).get('sitename')
|
|
return template_email(mail_subject, mail_body, mail_body_data, email_type=key,
|
|
email_rcpt=email_rcpt, **kwargs)
|
|
|
|
|
|
def template_email(subject, mail_body, mail_body_data, email_rcpt, email_type=None, **kwargs):
|
|
data = get_publisher().substitutions.get_context_variables()
|
|
if mail_body_data:
|
|
data.update(mail_body_data)
|
|
real_subject = Template(subject, autoescape=False).render(data)
|
|
real_mail_body = Template(mail_body, autoescape=False).render(data)
|
|
return email(real_subject, real_mail_body, email_rcpt=email_rcpt,
|
|
email_type=email_type, **kwargs)
|
|
|
|
|
|
def data_as_octet_stream(data, filename, **kwargs):
|
|
msg = MIMEApplication(data, **kwargs)
|
|
msg.add_header('Content-Disposition', 'attachment', filename=filename)
|
|
return msg
|
|
|
|
|
|
def data_as_text(data, filename, **kwargs):
|
|
msg = MIMEText(data, **kwargs)
|
|
msg.add_header('Content-Disposition', 'attachment', filename=filename)
|
|
return msg
|
|
|
|
|
|
def convert_to_mime(attachment):
|
|
if hasattr(attachment, 'get_file_pointer'): # qommon.form.PicklableUpload-like object
|
|
attachment.get_file_pointer().seek(0)
|
|
content = attachment.get_file_pointer().read()
|
|
content_type = getattr(attachment, 'content_type', None) or 'application/octet-stream'
|
|
maintype, subtype = content_type.split('/', 1)
|
|
charset = getattr(attachment, 'charset', None) or get_publisher().site_charset
|
|
if maintype == 'application':
|
|
part = MIMEApplication(content, subtype)
|
|
elif maintype == 'image':
|
|
part = MIMEImage(content, subtype)
|
|
elif maintype == 'text':
|
|
part = MIMEText(content, subtype, _charset=charset)
|
|
elif maintype == 'audio':
|
|
part = MIMEAudio(content, subtype)
|
|
else:
|
|
part = MIMENonMultipart(maintype, subtype)
|
|
part.set_payload(content, charset=charset)
|
|
encoders.encode_base64(part)
|
|
if getattr(attachment, 'base_filename', None):
|
|
part.add_header('Content-Disposition', 'attachment',
|
|
filename=attachment.base_filename)
|
|
return part
|
|
get_logger().warn('Failed to build MIME part from %r', attachment)
|
|
|
|
|
|
def email(subject, mail_body, email_rcpt, replyto=None, bcc=None,
|
|
email_from=None, exclude_current_user=False, email_type=None,
|
|
want_html=True, hide_recipients=False, fire_and_forget=False,
|
|
smtp_timeout=None, attachments=(),
|
|
extra_headers=None):
|
|
|
|
if not get_request():
|
|
# we are not processing a request, no sense delaying the handling
|
|
# (for example when running a cronjob)
|
|
fire_and_forget = False
|
|
|
|
emails_cfg = get_cfg('emails', {})
|
|
footer = emails_cfg.get('footer') or ''
|
|
|
|
encoding = get_publisher().site_charset
|
|
|
|
# in restructuredtext lines starting with a pipe were used to give
|
|
# appropriate multiline formatting, remove them.
|
|
footer = re.sub(r'^\|\s+', '', footer, flags=re.DOTALL | re.MULTILINE)
|
|
|
|
text_body = str(mail_body)
|
|
html_body = None
|
|
|
|
if text_body.startswith('<'):
|
|
# native HTML, keep it that way
|
|
html_body = text_body
|
|
text_body = None
|
|
elif want_html:
|
|
# body may be reStructuredText, try converting.
|
|
try:
|
|
htmlmail, pub = docutils.core.publish_programmatically(
|
|
source_class = docutils.io.StringInput,
|
|
source = mail_body,
|
|
source_path = None,
|
|
destination_class = docutils.io.StringOutput,
|
|
destination = None,
|
|
destination_path = None,
|
|
reader = None, reader_name = 'standalone',
|
|
parser=CustomRstParser(), parser_name=None,
|
|
writer = None, writer_name = 'html',
|
|
settings = None, settings_spec = None,
|
|
settings_overrides = {'input_encoding': encoding,
|
|
'output_encoding': encoding,
|
|
'embed_stylesheet': False,
|
|
'stylesheet': None,
|
|
'stylesheet_path': None,
|
|
'file_insertion_enabled': 0,
|
|
'xml_declaration': 0,
|
|
'report_level': 5},
|
|
config_section = None,
|
|
enable_exit_status = None)
|
|
# change paragraphs so manual newlines are considered.
|
|
htmlmail = force_str(htmlmail).replace('<p>', '<p style="white-space: pre-line;">')
|
|
except:
|
|
htmlmail = None
|
|
|
|
try:
|
|
html_body = re.findall('<body>(.*)</body>', htmlmail or '', re.DOTALL)[0]
|
|
except IndexError:
|
|
pass
|
|
|
|
context = get_publisher().get_substitution_variables()
|
|
context['email_signature'] = footer
|
|
context['subject'] = mark_safe(subject)
|
|
|
|
subject = render_to_string('qommon/email_subject.txt', context).strip()
|
|
|
|
# handle action links/buttons
|
|
button_re = re.compile(r'---===BUTTON:(?P<token>[a-zA-Z0-9]*):(?P<label>.*?)===---')
|
|
|
|
def get_action_url(match):
|
|
return '%s/actions/%s/' % (
|
|
get_publisher().get_frontoffice_url(),
|
|
match.group('token'))
|
|
|
|
def text_button(match):
|
|
return '[%s] %s' % (match.group('label'), get_action_url(match))
|
|
|
|
def html_button(match):
|
|
context = {
|
|
'label': match.group('label'),
|
|
'url': get_action_url(match),
|
|
}
|
|
return force_str(render_to_string('qommon/email_button_link.html', context))
|
|
|
|
text_body = button_re.sub(text_button, text_body) if text_body else None
|
|
html_body = button_re.sub(html_button, html_body) if html_body else None
|
|
|
|
if text_body:
|
|
context['content'] = mark_safe(text_body)
|
|
text_body = render_to_string('qommon/email_body.txt', context)
|
|
|
|
if html_body:
|
|
context['content'] = mark_safe(html_body)
|
|
html_body = render_to_string('qommon/email_body.html', context)
|
|
|
|
if text_body and html_body:
|
|
msg_body = MIMEMultipart(_charset=encoding, _subtype='alternative')
|
|
msg_body.attach(MIMEText(text_body, _charset=encoding))
|
|
msg_body.attach(MIMEText(html_body, _subtype='html', _charset=encoding))
|
|
elif text_body:
|
|
msg_body = MIMEText(text_body, _charset=encoding)
|
|
else:
|
|
msg_body = MIMEText(html_body, _subtype='html', _charset=encoding)
|
|
|
|
attachments_parts = []
|
|
for attachment in attachments or []:
|
|
if not isinstance(attachment, MIMEBase):
|
|
attachment = convert_to_mime(attachment)
|
|
if attachment:
|
|
attachments_parts.append(attachment)
|
|
|
|
if not attachments_parts:
|
|
msg = msg_body
|
|
else:
|
|
msg = MIMEMultipart(_charset=encoding, _subtype='mixed')
|
|
msg.attach(msg_body)
|
|
for attachment in attachments_parts:
|
|
msg.attach(attachment)
|
|
|
|
msg['Subject'] = Header(subject, encoding)
|
|
if hide_recipients or email_rcpt is None:
|
|
msg['To'] = 'Undisclosed recipients:;'
|
|
else:
|
|
if type(email_rcpt) is list:
|
|
msg['To'] = ', '.join(email_rcpt)
|
|
else:
|
|
msg['To'] = email_rcpt
|
|
|
|
if not email_from:
|
|
email_from = emails_cfg.get('from', 'noreply@entrouvert.com')
|
|
|
|
sitename = get_publisher().get_site_option('global_title', 'variables')
|
|
if sitename:
|
|
msg['From'] = formataddr((str(Header(sitename, encoding)), email_from))
|
|
else:
|
|
msg['From'] = email_from
|
|
|
|
if emails_cfg.get('reply_to'):
|
|
msg['Reply-To'] = emails_cfg.get('reply_to')
|
|
if replyto:
|
|
msg['Reply-To'] = replyto
|
|
|
|
msg['X-Qommon-Id'] = os.path.basename(get_publisher().app_dir)
|
|
if extra_headers:
|
|
for key, value in extra_headers.items():
|
|
msg[key] = value
|
|
|
|
if type(email_rcpt) is list:
|
|
rcpts = email_rcpt[:]
|
|
else:
|
|
rcpts = [email_rcpt]
|
|
if bcc:
|
|
rcpts += bcc
|
|
|
|
if exclude_current_user:
|
|
user = get_request().user
|
|
if user and user.email and user.email in rcpts:
|
|
rcpts.remove(user.email)
|
|
|
|
rcpts = [x for x in rcpts if x]
|
|
if len(rcpts) == 0:
|
|
return
|
|
|
|
mail_redirection = get_cfg('debug', {}).get('mail_redirection')
|
|
if mail_redirection:
|
|
rcpts = [mail_redirection]
|
|
if os.environ.get('QOMMON_MAIL_REDIRECTION'):
|
|
# if QOMMON_MAIL_REDIRECTION is set in the environment, send all emails
|
|
# to that address instead of the real recipients.
|
|
rcpts = [os.environ.get('QOMMON_MAIL_REDIRECTION')]
|
|
|
|
if not fire_and_forget:
|
|
s = create_smtp_server(emails_cfg, smtp_timeout=smtp_timeout)
|
|
try:
|
|
s.sendmail(email_from, rcpts, msg.as_string())
|
|
except smtplib.SMTPRecipientsRefused:
|
|
get_logger().error('Failed to send mail to %r', rcpts)
|
|
s.close()
|
|
else:
|
|
get_response().add_after_job('sending email',
|
|
EmailToSend(email_from, rcpts, msg.as_string()),
|
|
fire_and_forget = True)
|
|
|
|
|
|
def create_smtp_server(emails_cfg, smtp_timeout=None):
|
|
try:
|
|
s = smtplib.SMTP(emails_cfg.get('smtp_server', None) or 'localhost',
|
|
timeout=smtp_timeout)
|
|
except socket.timeout:
|
|
get_logger().error('Failed to connect to SMTP server (timeout)')
|
|
raise errors.EmailError('Failed to connect to SMTP server (timeout)')
|
|
except socket.error:
|
|
# XXX: write message in a queue somewhere?
|
|
get_logger().error('Failed to connect to SMTP server')
|
|
raise errors.EmailError('Failed to connect to SMTP server')
|
|
if not s.sock:
|
|
get_logger().error('Failed to connect to SMTP server')
|
|
raise errors.EmailError('Failed to connect to SMTP server')
|
|
rc_code, ehlo_answer = s.ehlo()
|
|
if rc_code != 250:
|
|
get_logger().error('Failed to EHLO to SMTP server (%s)', rc_code)
|
|
raise errors.EmailError('Failed to EHLO to SMTP server (%s)' % rc_code)
|
|
if b'STARTTLS' in ehlo_answer:
|
|
rc_code, starttls_answer = s.starttls()
|
|
if rc_code != 220:
|
|
get_logger().error('Failed to STARTTLS to SMTP server (%s)', rc_code)
|
|
raise errors.EmailError('Failed to STARTTLS to SMTP server (%s)' % rc_code)
|
|
if emails_cfg.get('smtp_login'):
|
|
try:
|
|
s.login(emails_cfg.get('smtp_login') or '',
|
|
emails_cfg.get('smtp_password') or '')
|
|
except smtplib.SMTPAuthenticationError:
|
|
get_logger().error('Failed to authenticate to SMTP server')
|
|
raise errors.EmailError('Failed to authenticate to SMTP server')
|
|
except smtplib.SMTPException:
|
|
get_logger().error('Failed to authenticate to SMTP server, unknown error.')
|
|
raise errors.EmailError('Failed to authenticate to SMTP server, unknown error.')
|
|
return s
|
|
|
|
|
|
class EmailToSend(object):
|
|
def __init__(self, msg_from, rcpts, msg_as_string):
|
|
self.msg_from = msg_from
|
|
self.rcpts = rcpts
|
|
self.msg_as_string = msg_as_string
|
|
|
|
def __call__(self, job=None):
|
|
emails_cfg = get_cfg('emails', {})
|
|
|
|
s = create_smtp_server(emails_cfg)
|
|
try:
|
|
s.sendmail(self.msg_from, self.rcpts, self.msg_as_string)
|
|
except smtplib.SMTPRecipientsRefused:
|
|
pass
|
|
s.close()
|