tests: revamp email mocking to have more of wcs code exposed

This commit is contained in:
Frédéric Péters 2015-03-20 16:30:35 +01:00
parent 9e75fe6793
commit 86691534a6
4 changed files with 72 additions and 26 deletions

View File

@ -189,7 +189,7 @@ def test_form_submit_with_user(pub):
assert formdef.data_class().count() == 1
# check the user received a copy by email
assert emails.emails.get('New form (test)')
assert emails.emails.get('New form (test)')['kwargs']['email_rcpt'] == ['foo@localhost']
assert emails.emails.get('New form (test)')['email_rcpt'] == ['foo@localhost']
def test_form_visit_existing(pub):
user = create_user(pub)
@ -415,7 +415,7 @@ def test_form_tracking_code_email(pub):
resp.forms[0]['email'] = 'foo@localhost'
resp = resp.forms[0].submit()
assert emails.emails.get('Tracking Code reminder')
assert 'ABCDEF' in emails.emails.values()[0]['args'][0]
assert 'ABCDEF' in emails.emails.values()[0]['payload']
assert resp.location == 'http://example.net/test/code/ABCDEF/load'
def test_form_invalid_tracking_code(pub):

View File

@ -115,8 +115,7 @@ def test_admin_notification():
do_user_registration()
assert emails.get('New Registration')
assert emails.get('New Registration').get('kwargs').get('email_rcpt') == ['admin@localhost']
assert emails.get('New Registration').get('kwargs').get('fire_and_forget') == True
assert emails.get('New Registration').get('email_rcpt') == ['admin@localhost']
def test_user_notification():
pub.cfg['identities'] = {'creation': 'self', 'notify-on-register': False,
@ -136,8 +135,8 @@ def test_user_notification():
account = PasswordAccount.get('foo@localhost')
assert emails.get('Welcome to example.net')
assert emails.get('Welcome to example.net').get('kwargs').get('email_rcpt') == 'foo@localhost'
assert account.password in emails.get('Welcome to example.net').get('args')[0]
assert emails.get('Welcome to example.net').get('to') == 'foo@localhost'
assert account.password in emails.get('Welcome to example.net').get('payload')
def test_user_login():
pub.cfg['identities'] = {'creation': 'self', 'notify-on-register': False}
@ -195,8 +194,8 @@ def test_forgotten():
assert 'A token for changing your password has been emailed to you.' in resp.body
assert emails.get('Change Password Request')
assert emails.get('Change Password Request')['kwargs']['email_rcpt'] == 'foo@localhost'
body = emails.get('Change Password Request')['args'][0]
assert emails.get('Change Password Request')['to'] == 'foo@localhost'
body = emails.get('Change Password Request')['payload']
confirm_urls = re.findall(r'http://.*\w', body)
assert 'a=cfmpw' in confirm_urls[0]
@ -215,7 +214,7 @@ def test_forgotten():
resp = resp.forms[0].submit()
assert 'A token for changing your password has been emailed to you.' in resp.body
body = emails.get('Change Password Request')['args'][0]
body = emails.get('Change Password Request')['payload']
confirm_urls = re.findall(r'http://.*\w', body)
assert 'a=cfmpw' in confirm_urls[0]
assert 'a=cxlpw' in confirm_urls[1]
@ -225,7 +224,7 @@ def test_forgotten():
assert emails.get('Your new password')
# check new password is working
new_password = re.findall('password: (.*)\n', emails.get('Your new password')['args'][0])[0]
new_password = re.findall('password: (.*)\n', emails.get('Your new password')['payload'])[0]
resp = app.get('/login/')
resp.forms[0]['username'] = 'foo'
resp.forms[0]['password'] = new_password
@ -241,7 +240,7 @@ def test_forgotten():
resp = resp.forms[0].submit()
assert 'A token for changing your password has been emailed to you.' in resp.body
body = emails.get('Change Password Request')['args'][0]
body = emails.get('Change Password Request')['payload']
confirm_urls = re.findall(r'http://.*\w', body)
assert 'a=cfmpw' in confirm_urls[0]
assert 'a=cxlpw' in confirm_urls[1]

View File

@ -3,8 +3,8 @@ import pytest
import shutil
import time
from quixote import cleanup
from quixote.http_request import HTTPRequest
from quixote import cleanup, get_response
from wcs.qommon.http_request import HTTPRequest
from wcs.formdef import FormDef
from wcs import sessions
@ -282,36 +282,42 @@ def test_email():
# send using an uncompleted element
item = SendmailWorkflowStatusItem()
item.perform(formdata) # nothing
get_response().process_after_jobs()
assert emails.count() == 0
item.to = [role1.id]
item.perform(formdata) # no subject nor body
get_response().process_after_jobs()
assert emails.count() == 0
item.subject = 'foobar'
item.perform(formdata) # no body
get_response().process_after_jobs()
assert emails.count() == 0
# send for real
item.body = 'baz'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')
assert emails.get('foobar')['kwargs']['email_rcpt'] == ['foo@localhost']
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost']
# two recipients
emails.empty()
item.to = [role1.id, role2.id]
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['kwargs']['email_rcpt'] is None
assert emails.get('foobar')['kwargs']['bcc'] == ['foo@localhost',
assert emails.get('foobar')['to'] == 'Undisclosed recipients:;'
assert emails.get('foobar')['email_rcpt'] == ['foo@localhost',
'bar@localhost', 'baz@localhost']
# submitter as recipient, no known email address
emails.empty()
item.to = ['_submitter']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 0
# submitter as recipient, known email address
@ -319,22 +325,25 @@ def test_email():
formdata.user_id = user.id
formdata.store()
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['kwargs']['email_rcpt'] == ['zorg@localhost']
assert emails.get('foobar')['email_rcpt'] == ['zorg@localhost']
# computed recipient
emails.empty()
item.to = ['=email']
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['kwargs']['email_rcpt'] == ['sub@localhost']
assert emails.get('foobar')['email_rcpt'] == ['sub@localhost']
# string as recipient
emails.empty()
item.to = 'xyz@localhost'
item.perform(formdata)
get_response().process_after_jobs()
assert emails.count() == 1
assert emails.get('foobar')['kwargs']['email_rcpt'] == ['xyz@localhost']
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
def test_webservice_call():
pub.substitutions.feed(MockSubstitutionVariables())

View File

@ -1,4 +1,6 @@
import cPickle
import email.header
import email.parser
import os
import tempfile
import random
@ -11,7 +13,6 @@ from wcs import sql
from webtest import TestApp
from quixote import cleanup, get_publisher
from quixote.wsgi import QWIP
import wcs
from wcs import publisher
@ -19,7 +20,27 @@ from wcs.qommon.http_request import HTTPRequest
from wcs.users import User
from wcs.tracking_code import TrackingCode
QWIP.request_class = HTTPRequest
class QWIP:
# copy of quixote original QWIP code, adapted to use our own HTTPRequest
# object and to process after jobs.
request_class = HTTPRequest
def __init__(self, publisher):
self.publisher = publisher
def __call__(self, env, start_response):
"""I am called for each request."""
if 'REQUEST_URI' not in env:
env['REQUEST_URI'] = env['SCRIPT_NAME'] + env['PATH_INFO']
input = env['wsgi.input']
request = self.request_class(input, env)
response = self.publisher.process_request(request)
status = "%03d %s" % (response.status_code, response.reason_phrase)
headers = response.generate_headers()
start_response(status, headers)
result = list(response.generate_body_chunks()) # Iterable object.
response.process_after_jobs()
return result
class KnownElements(object):
pickle_app_dir = None
@ -130,17 +151,34 @@ class EmailsMocking(object):
self.emails = {}
import wcs.qommon.emails
import qommon.emails
wcs.qommon.emails.email = self.sendmail
qommon.emails.email = self.sendmail
wcs.qommon.emails.create_smtp_server = self.create_smtp_server
qommon.emails.create_smtp_server = self.create_smtp_server
def sendmail(self, subject, *args, **kwargs):
self.emails[subject] = {'args': args, 'kwargs': kwargs}
def create_smtp_server(self, *args, **kwargs):
class MockSmtplibSMTP(object):
def __init__(self, emails):
self.emails = emails
def sendmail(self, msg_from, rcpts, msg):
msg = email.parser.Parser().parsestr(msg)
subject = email.header.decode_header(msg['Subject'])[0][0]
self.emails[subject] = {
'from': msg_from,
'to': email.header.decode_header(msg['To'])[0][0],
'payload': msg.get_payload(decode=True),
}
self.emails[subject]['email_rcpt'] = rcpts
def close(self):
pass
return MockSmtplibSMTP(self.emails)
def get(self, subject):
return self.emails.get(subject)
def empty(self):
self.emails = {}
self.emails.clear()
def count(self):
return len(self.emails)