misc: use Django's EmailBackend (#36977)
gitea-wip/wcs/pipeline/head Build started... Details

This commit is contained in:
Lauréline Guérin 2021-09-21 15:22:16 +02:00
parent e116639413
commit c25e7e6074
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
7 changed files with 105 additions and 105 deletions

View File

@ -1388,8 +1388,8 @@ def test_form_submit_with_user(pub, emails):
assert formdef.data_class().count() == 1
assert '<div class="section foldable folded" id="summary">' in next_page.text
# check the user received a copy by email
assert emails.emails.get('New form (test)')
assert emails.emails.get('New form (test)')['email_rcpt'] == ['foo@localhost']
assert emails.get('New form (test)')
assert emails.get('New form (test)')['email_rcpt'] == ['foo@localhost']
def test_form_submit_with_just_disabled_user(pub, emails):
@ -1918,8 +1918,8 @@ def test_form_tracking_code_email(pub, emails, nocache):
assert '<h2>Keep your tracking code</h2>' in resp.text
resp.forms[0]['email'] = 'foo@localhost'
resp = resp.forms[0].submit()
assert emails.emails.get('Tracking Code reminder')
assert tracking_code in list(emails.emails.values())[0]['payload']
assert emails.get('Tracking Code reminder')
assert tracking_code in emails.get('Tracking Code reminder')['payload']
assert resp.location == 'http://example.net/test/code/%s/load' % tracking_code
resp = resp.follow()
resp = resp.follow()
@ -1948,7 +1948,7 @@ def test_form_tracking_code_email_antibot(pub, emails, nocache):
resp.forms[0]['email'] = 'foo@localhost'
resp.forms[0]['validation'].checked = True # stupit bot will do that
resp = resp.forms[0].submit()
assert not emails.emails.values()
assert not emails.count()
def test_form_tracking_code_remove_draft(pub, nocache):
@ -3408,7 +3408,7 @@ def test_form_table_field_submit(pub, emails):
resp = resp.follow()
assert 'The form has been recorded' in resp.text
# check rst2html didn't fail
assert b'ee' in emails.emails['New form (test)']['msg'].get_payload()[1].get_payload(decode=True)
assert b'ee' in emails.get('New form (test)')['msg'].get_payload()[1].get_payload(decode=True)
def test_form_table_rows_field_submit(pub, emails):
@ -3480,7 +3480,7 @@ def test_form_table_rows_field_submit(pub, emails):
resp = resp.form.submit('submit')
resp = resp.follow()
assert 'The form has been recorded' in resp.text
assert b'ee' in emails.emails['New form (test)']['msg'].get_payload()[1].get_payload(decode=True)
assert b'ee' in emails.get('New form (test)')['msg'].get_payload()[1].get_payload(decode=True)
def test_form_new_table_rows_field_draft_recall(pub):

View File

@ -4,13 +4,12 @@ import pwd
import socket
import pytest
from quixote import cleanup
from wcs.qommon.emails import docutils # noqa pylint: disable=unused-import
from wcs.qommon.emails import email as send_email
from wcs.qommon.upload_storage import PicklableUpload
from .utilities import clean_temporary_pub, create_temporary_pub
from .utilities import clean_temporary_pub, cleanup, create_temporary_pub
def setup_module(module):
@ -36,12 +35,14 @@ def test_email_from(emails):
assert emails.count() == 1
assert emails.emails['test']['from'] == '%s@%s' % (pwd.getpwuid(os.getuid())[0], socket.getfqdn())
emails.empty()
pub.cfg['emails'] = {'from': 'foo@localhost'}
send_email('test', mail_body='Hello', email_rcpt='test@localhost', want_html=False)
assert emails.count() == 1
assert emails.emails['test']['from'] == 'foo@localhost'
assert emails.emails['test']['msg']['From'] == 'foo@localhost'
emails.empty()
if not pub.site_options.has_section('variables'):
pub.site_options.add_section('variables')
pub.site_options.set('variables', 'global_title', 'HELLO')
@ -125,7 +126,6 @@ def test_email_signature_rst_pipes(emails):
def test_email_plain_with_attachments(emails):
create_temporary_pub()
jpg = PicklableUpload('test.jpeg', 'image/jpeg')
with open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb') as fd:
jpg_content = fd.read()

View File

@ -1,10 +1,8 @@
import base64
import io
import os
import xml.etree.ElementTree as ET
import pytest
from django.utils.encoding import force_bytes
from quixote import cleanup
from webtest import Upload
@ -244,9 +242,7 @@ def test_workflow_send_mail_template_with_sql(superuser, mail_templates_option,
pub.get_request().response.process_after_jobs()
assert emails.count() == 1
assert emails.get('test subject')['email_rcpt'] == ['xyz@localhost']
assert b'test body' in base64.decodebytes(
force_bytes(emails.get('test subject')['msg'].get_payload(0).get_payload())
)
assert 'test body' in emails.get('test subject')['msg'].get_payload(0).get_payload()
# check nothing is sent and an error is logged if the mail template is
# missing

View File

@ -144,7 +144,7 @@ def test_admin_notification(pub, emails):
do_user_registration(pub)
assert emails.get('New Registration')
assert emails.get('New Registration').get('email_rcpt') == ['admin@localhost']
assert emails.get('New Registration')['email_rcpt'] == ['admin@localhost']
def test_user_notification(pub, emails):
@ -164,8 +164,8 @@ def test_user_notification(pub, emails):
account = PasswordAccount.get('foo@localhost')
assert emails.get('Welcome to example.net')
assert emails.get('Welcome to example.net').get('to') == 'foo@localhost'
assert account.password in emails.get('Welcome to example.net').get('payload')
assert emails.get('Welcome to example.net')['to'] == 'foo@localhost'
assert account.password in emails.get('Welcome to example.net')['payload']
def test_user_login(pub):
@ -241,6 +241,7 @@ def test_forgotten(pub, emails):
assert 'The token you submitted does not exist' in resp.text
# new forgotten request
emails.empty()
resp = app.get('/ident/password/forgotten')
resp.forms[0]['username'] = 'foo'
resp = resp.forms[0].submit()
@ -267,6 +268,7 @@ def test_forgotten(pub, emails):
pub.cfg['passwords'] = {'generate': False, 'can_change': True}
pub.write_cfg()
emails.empty()
resp = app.get('/ident/password/forgotten')
resp.forms[0]['username'] = 'foo'
resp = resp.forms[0].submit()

View File

@ -1,17 +1,15 @@
import email.header
import email.parser
import http.cookies
import json
import os
import random
import shutil
import sys
import tempfile
import urllib.parse
import psycopg2
from django.conf import settings
from django.utils.encoding import force_bytes, force_text
from django.core import mail
from django.utils.encoding import force_bytes
from quixote import cleanup, get_publisher
from webtest import TestApp
@ -245,65 +243,82 @@ def login(app, username='admin', password='admin'):
return app
class Email:
def __init__(self, email):
self.email = email
@property
def msg(self):
return self.email.message()
@property
def email_rcpt(self):
return self.email.recipients()
@property
def payload(self):
return force_str(self.payloads[0])
@property
def payloads(self):
if self.msg.is_multipart():
return [x.get_payload(decode=True) for x in self.msg.get_payload()]
return [self.msg.get_payload(decode=True)]
@property
def to(self):
return self.email.message()['To']
def get(self, key):
return getattr(self.email, key)
def __getitem__(self, key):
if key in ['msg', 'email_rcpt', 'payload', 'payloads', 'to']:
return getattr(self, key)
if key == 'from':
key = 'from_email'
return getattr(self.email, key)
class Emails:
def __contains__(self, value):
return self[value] is not None
def __getitem__(self, key):
for em in mail.outbox:
if em.subject == key:
return Email(em)
class EmailsMocking:
def create_smtp_server(self, *args, **kwargs):
class MockSmtplibSMTP:
def __init__(self, mocking):
self.mocking = mocking
def send_message(self, msg, msg_from, rcpts):
return self.sendmail(msg_from, rcpts, msg.as_string())
def sendmail(self, msg_from, rcpts, msg):
msg = email.parser.Parser().parsestr(msg)
subject = email.header.decode_header(msg['Subject'])[0][0]
if msg.is_multipart():
payloads = [x.get_payload(decode=True) for x in msg.get_payload()]
payload = payloads[0]
else:
payload = msg.get_payload(decode=True)
payloads = [payload]
self.mocking.emails[force_text(subject)] = {
'from': msg_from,
'to': email.header.decode_header(msg['To'])[0][0],
'payload': force_str(payload if payload else ''),
'payloads': payloads,
'msg': msg,
'subject': force_text(subject),
}
self.mocking.emails[force_text(subject)]['email_rcpt'] = rcpts
self.mocking.latest_subject = force_text(subject)
def quit(self):
pass
return MockSmtplibSMTP(self)
def get(self, subject):
return self.emails.get(subject)
return self.emails[subject]
def get_latest(self, part=None):
email = self.emails.get(self.latest_subject, {})
email = Email(mail.outbox[-1])
if part:
return email.get(part) if email else None
return email
def empty(self):
self.emails.clear()
mail.outbox = []
def count(self):
return len(self.emails)
return len(mail.outbox)
@property
def latest_subject(self):
return mail.outbox[-1].subject
@property
def emails(self):
return Emails()
def __enter__(self):
self.wcs_create_smtp_server = sys.modules['wcs.qommon.emails'].create_smtp_server
sys.modules['wcs.qommon.emails'].create_smtp_server = self.create_smtp_server
self.emails = {}
self.latest_subject = None
return self
def __exit__(self, exc_type, exc_value, tb):
del self.emails
sys.modules['wcs.qommon.emails'].create_smtp_server = self.wcs_create_smtp_server
pass
class MockSubstitutionVariables:

View File

@ -1685,7 +1685,7 @@ def test_email(pub, emails):
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar').get('from') == 'foobar@localhost'
assert emails.get('foobar')['from'] == 'foobar@localhost'
# custom from email (computed)
emails.empty()
@ -1694,7 +1694,7 @@ def test_email(pub, emails):
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar').get('from') == 'foobar@localhost'
assert emails.get('foobar')['from'] == 'foobar@localhost'
# custom sender name defined from site-options variable
pub.load_site_options()

View File

@ -38,7 +38,7 @@ try:
except ImportError:
docutils = None
from django.core.mail import EmailMessage, EmailMultiAlternatives
from django.core.mail import EmailMessage, EmailMultiAlternatives, get_connection
from django.template.loader import render_to_string
from django.utils.safestring import mark_safe
from quixote import get_publisher, get_request, get_response
@ -341,51 +341,38 @@ def email(
get_response().add_after_job('sending email', email_to_send, fire_and_forget=True)
def create_smtp_server(emails_cfg, smtp_timeout=None):
publisher = get_publisher()
try:
s = smtplib.SMTP(emails_cfg.get('smtp_server', None) or 'localhost', timeout=smtp_timeout)
except socket.timeout as e:
publisher.record_error(_('Failed to connect to SMTP server (timeout)'), exception=e)
raise errors.EmailError('Failed to connect to SMTP server (timeout)')
except OSError as e:
publisher.record_error(_('Failed to connect to SMTP server'), exception=e)
raise errors.EmailError('Failed to connect to SMTP server')
if not s.sock:
publisher.record_error(_('Failed to connect to SMTP server'))
raise errors.EmailError('Failed to connect to SMTP server')
rc_code, ehlo_answer = s.ehlo()
if rc_code != 250:
publisher.record_error(_('Failed to EHLO to SMTP server (%s)') % rc_code)
raise errors.EmailError('Failed to EHLO to SMTP server (%s)' % rc_code)
if b'STARTTLS' in ehlo_answer:
rc_code = s.starttls()[0]
if rc_code != 220:
publisher.record_error(_('Failed to STARTTLS to SMTP server (%s)') % rc_code)
raise errors.EmailError('Failed to STARTTLS to SMTP server (%s)' % rc_code)
if emails_cfg.get('smtp_login'):
try:
s.login(emails_cfg.get('smtp_login') or '', emails_cfg.get('smtp_password') or '')
except smtplib.SMTPAuthenticationError as e:
publisher.record_error(_('Failed to authenticate to SMTP server'), exception=e)
raise errors.EmailError('Failed to authenticate to SMTP server')
except smtplib.SMTPException as e:
publisher.record_error(_('Failed to authenticate to SMTP server, unknown error.'), exception=e)
raise errors.EmailError('Failed to authenticate to SMTP server, unknown error.')
return s
class EmailToSend:
def __init__(self, email_msg, smtp_timeout):
self.email_msg = email_msg
self.smtp_timeout = smtp_timeout
def __call__(self, job=None):
publisher = get_publisher()
emails_cfg = get_cfg('emails', {})
s = create_smtp_server(emails_cfg, self.smtp_timeout)
try:
s.send_message(self.email_msg.message(), self.email_msg.from_email, self.email_msg.recipients())
if emails_cfg.get('smtp_server', None):
kwargs = {
'host': emails_cfg['smtp_server'],
'username': emails_cfg.get('smtp_login') or '',
'password': emails_cfg.get('smtp_password') or '',
'timeout': self.smtp_timeout,
}
backend = get_connection(backend='django.core.mail.backends.smtp.EmailBackend', **kwargs)
self.email_msg.connection = backend
self.email_msg.send()
except socket.timeout as e:
publisher.record_error(_('Failed to connect to SMTP server (timeout)'), exception=e)
raise errors.EmailError('Failed to connect to SMTP server (timeout)')
except smtplib.SMTPAuthenticationError as e:
publisher.record_error(_('Failed to authenticate to SMTP server'), exception=e)
raise errors.EmailError('Failed to authenticate to SMTP server')
except (smtplib.SMTPRecipientsRefused, smtplib.SMTPNotSupportedError, smtplib.SMTPDataError):
pass
s.quit()
except smtplib.SMTPException as e:
publisher.record_error(_('Failed to authenticate to SMTP server, unknown error.'), exception=e)
raise errors.EmailError('Failed to authenticate to SMTP server, unknown error.')
except OSError as e:
publisher.record_error(_('Failed to connect to SMTP server'), exception=e)
raise errors.EmailError('Failed to connect to SMTP server')