247 lines
11 KiB
Python
247 lines
11 KiB
Python
import pytest
|
|
from uuid import uuid4
|
|
import os
|
|
import re
|
|
import urllib
|
|
import logging
|
|
import mock
|
|
import random
|
|
import requests
|
|
|
|
from django.core.urlresolvers import reverse
|
|
from django.core import mail, signing
|
|
from django.utils import timezone
|
|
from django.core.files.storage import DefaultStorage
|
|
from django.utils.text import slugify
|
|
from django.test import override_settings
|
|
|
|
from corbo.models import Category, Announce, Subscription, Broadcast
|
|
from corbo.models import channel_choices
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
CATEGORIES = (u'Alerts', u'News')
|
|
|
|
def get_random_number():
|
|
number_generator = range(10)
|
|
random.shuffle(number_generator)
|
|
number = ''.join(map(str, number_generator))
|
|
return number
|
|
|
|
|
|
@pytest.fixture
|
|
def categories():
|
|
categories = []
|
|
for category in CATEGORIES:
|
|
c, created = Category.objects.get_or_create(name=category, slug=slugify(category))
|
|
categories.append(c)
|
|
return categories
|
|
|
|
|
|
@pytest.fixture
|
|
def announces():
|
|
announces = []
|
|
for category in Category.objects.all():
|
|
a = Announce.objects.create(category=category, title='Announce 1',
|
|
publication_time=timezone.now(),
|
|
text='<h2>Announce 1</h2>')
|
|
Broadcast.objects.create(announce=a)
|
|
announces.append(a)
|
|
a = Announce.objects.create(category=category, title='Announce 2',
|
|
publication_time=timezone.now(),
|
|
text='<h2>Announce 2</h2>')
|
|
Broadcast.objects.create(announce=a)
|
|
announces.append(a)
|
|
return announces
|
|
|
|
|
|
def test_emailing_with_no_subscriptions(app, categories, announces, mailoutbox):
|
|
for announce in announces:
|
|
broadcast = Broadcast.objects.get(announce=announce)
|
|
broadcast.send()
|
|
assert not broadcast.delivery_count
|
|
assert not len(mailoutbox)
|
|
|
|
|
|
def test_send_email(app, categories, announces, mailoutbox):
|
|
for category in categories:
|
|
uuid = uuid4()
|
|
Subscription.objects.create(category=category,
|
|
identifier='mailto:%s@example.net' % uuid, uuid=uuid)
|
|
for i, announce in enumerate(announces):
|
|
broadcast = Broadcast.objects.get(announce=announce)
|
|
broadcast.send()
|
|
assert broadcast.delivery_count
|
|
assert len(mailoutbox) == i+1
|
|
|
|
|
|
def test_check_inline_css(app, categories, announces, mailoutbox):
|
|
total_sent = 0
|
|
for i, announce in enumerate(announces):
|
|
announce.text = '<style type="text/css">h2 {color: #F00}</style>' + announce.text
|
|
announce.save()
|
|
uuid = uuid4()
|
|
Subscription.objects.create(category=announce.category,
|
|
identifier='mailto:%s@example.net' % uuid, uuid=uuid)
|
|
broadcast = Broadcast.objects.get(announce=announce)
|
|
broadcast.send()
|
|
assert broadcast.delivery_count
|
|
assert len(mailoutbox) == total_sent + broadcast.delivery_count
|
|
total_sent += broadcast.delivery_count
|
|
assert 'h2 style="color:#F00"' in mailoutbox[i].html
|
|
|
|
|
|
@mock.patch('emails.utils.requests.get')
|
|
def test_check_inline_images(mocked_get, app, categories, announces, mailoutbox):
|
|
storage = DefaultStorage()
|
|
media_path = os.path.join(os.path.dirname(__file__), 'media')
|
|
image_name = 'logo.png'
|
|
image_name = storage.save(image_name, file(os.path.join(media_path, image_name)))
|
|
total_sent = 0
|
|
for i, announce in enumerate(announces):
|
|
img_src = "/media/%s" % image_name
|
|
announce.text = announce.text + '<img src="%s" />' % img_src
|
|
announce.save()
|
|
uuid = uuid4()
|
|
Subscription.objects.create(category=announce.category,
|
|
identifier='mailto:%s@example.net' % uuid, uuid=uuid)
|
|
broadcast = Broadcast.objects.get(announce=announce)
|
|
mocked_get.return_value = mock.Mock(status_code=200,
|
|
headers={'content-type': 'image/png'},
|
|
content=storage.open(image_name).read())
|
|
broadcast.send()
|
|
assert broadcast.delivery_count
|
|
|
|
assert len(mailoutbox) == total_sent + broadcast.delivery_count
|
|
attachments = [a['filename'] for a in mailoutbox[0].attachments.as_dict()]
|
|
assert image_name in attachments
|
|
assert 'cid:%s' % image_name in mail.outbox[0].html_body
|
|
assert 'cid:%s' % image_name in mail.outbox[0].text_body
|
|
total_sent += broadcast.delivery_count
|
|
storage.delete(image_name)
|
|
|
|
|
|
def test_unsubscription_link(app, categories, announces, custom_mailoutbox):
|
|
unsubscription_link_sentinel = ''
|
|
subscriptions_number = 3
|
|
scheme = 'mailto:'
|
|
for category in categories:
|
|
for i in xrange(subscriptions_number):
|
|
uuid = uuid4()
|
|
uri = scheme + '%s@example.com' % uuid
|
|
Subscription.objects.create(category=category, identifier=uri, uuid=str(uuid))
|
|
|
|
for i, announce in enumerate(announces):
|
|
if announce.category != category:
|
|
continue
|
|
broadcast = Broadcast.objects.get(announce=announce)
|
|
broadcast.send()
|
|
assert broadcast.delivery_count
|
|
assert len(mail.outbox) == (i+1)*subscriptions_number
|
|
assert mail.outbox[i*subscriptions_number].subject == announce.title
|
|
|
|
for counter, destination in enumerate(category.subscription_set.all()):
|
|
index = i*subscriptions_number+counter
|
|
signature = urllib.unquote(re.findall('/unsubscribe/(.*)"', mail.outbox[index].html)[0])
|
|
unsubscription_link = reverse('unsubscribe', kwargs={'unsubscription_token': signature})
|
|
assert mail.outbox[index]._headers['List-Unsubscribe'] == '<http://localhost%s>' % unsubscription_link
|
|
assert unsubscription_link in mail.outbox[index].html
|
|
assert unsubscription_link in mail.outbox[index].text
|
|
assert unsubscription_link_sentinel != unsubscription_link
|
|
assert signing.loads(signature) == {
|
|
'category': announce.category.pk, 'identifier': destination.identifier}
|
|
unsubscription_link_sentinel = unsubscription_link
|
|
|
|
# refuse altered signature
|
|
resp = app.get(unsubscription_link + 'altered', status=404)
|
|
|
|
# make sure the uri schema is not in the page
|
|
resp = app.get(unsubscription_link)
|
|
assert scheme not in resp.content
|
|
|
|
def test_send_sms_with_no_gateway_defined(app, categories, announces, caplog):
|
|
for category in categories:
|
|
uuid = uuid4()
|
|
Subscription.objects.create(category=category,
|
|
identifier='sms:%s' % get_random_number(), uuid=uuid)
|
|
for i, announce in enumerate(announces):
|
|
broadcast = Broadcast.objects.get(announce=announce)
|
|
broadcast.send()
|
|
assert broadcast.delivery_count == 0
|
|
for record in caplog.records:
|
|
assert record.name == 'corbo.utils'
|
|
assert record.levelno == logging.ERROR
|
|
assert record.getMessage() == 'SMS send requested but no SMS gateway defined.'
|
|
|
|
@mock.patch('corbo.utils.requests.post')
|
|
def test_send_sms_with_gateway_api_error(mocked_post, app, categories, announces, caplog):
|
|
for category in categories:
|
|
for i in range(3):
|
|
uuid = uuid4()
|
|
Subscription.objects.create(category=category,
|
|
identifier='sms:%s' % get_random_number(), uuid=uuid)
|
|
for i, announce in enumerate(announces):
|
|
broadcast = Broadcast.objects.get(announce=announce)
|
|
with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
|
|
mocked_response = mock.Mock()
|
|
mocked_response.json.return_value = {'err': 1, 'data': None,
|
|
'err_desc': 'Payload error: missing "message" in JSON payload'}
|
|
mocked_post.return_value = mocked_response
|
|
broadcast.send()
|
|
assert broadcast.delivery_count == 0
|
|
records = caplog.records
|
|
assert len(records) == 1 + i
|
|
for record in records:
|
|
assert record.name == 'corbo.utils'
|
|
assert record.levelno == logging.WARNING
|
|
assert record.getMessage() == 'Error occured while sending sms: Payload error: missing "message" in JSON payload'
|
|
|
|
@mock.patch('corbo.utils.requests.post')
|
|
def test_send_sms_with_gateway_connection_error(mocked_post, app, categories, announces, caplog):
|
|
for category in categories:
|
|
for i in range(3):
|
|
uuid = uuid4()
|
|
Subscription.objects.create(category=category,
|
|
identifier='sms:%s' % get_random_number(), uuid=uuid)
|
|
for i, announce in enumerate(announces):
|
|
broadcast = Broadcast.objects.get(announce=announce)
|
|
with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
|
|
mocked_response = mock.Mock()
|
|
def mocked_requests_connection_error(*args, **kwargs):
|
|
raise requests.ConnectionError('unreachable')
|
|
mocked_post.side_effect = mocked_requests_connection_error
|
|
mocked_post.return_value = mocked_response
|
|
broadcast.send()
|
|
assert broadcast.delivery_count == 0
|
|
records = caplog.records
|
|
assert len(records) == 1 + i
|
|
for record in records:
|
|
assert record.name == 'corbo.utils'
|
|
assert record.levelno == logging.WARNING
|
|
assert record.getMessage() == 'Failed to reach SMS gateway: unreachable'
|
|
|
|
@mock.patch('corbo.utils.requests.post')
|
|
def test_send_sms(mocked_post, app, categories, announces):
|
|
for category in categories:
|
|
for i in range(3):
|
|
uuid = uuid4()
|
|
Subscription.objects.create(category=category,
|
|
identifier='sms:%s' % get_random_number(), uuid=uuid)
|
|
for announce in announces:
|
|
broadcast = Broadcast.objects.get(announce=announce)
|
|
with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
|
|
mocked_response = mock.Mock()
|
|
mocked_response.json.return_value = {'err': 0, 'err_desc': None, 'data': 'gateway response'}
|
|
|
|
for announce in announces:
|
|
broadcast = Broadcast.objects.get(announce=announce)
|
|
with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
|
|
mocked_response = mock.Mock()
|
|
mocked_response.json.return_value = {'err': 0, 'err_desc': None, 'data': 'gateway response'}
|
|
mocked_post.return_value = mocked_response
|
|
broadcast.send()
|
|
assert mocked_post.call_args[0][0] == 'http://sms.gateway'
|
|
assert mocked_post.call_args[1]['json']['from'] == 'Corbo'
|
|
assert isinstance(mocked_post.call_args[1]['json']['to'], list)
|
|
assert broadcast.delivery_count == 3
|