This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
corbo/tests/test_broadcasting.py

247 lines
11 KiB
Python

import pytest
from uuid import uuid4
import os
import re
import logging
import mock
import random
import requests
from django.core.urlresolvers import reverse
from django.core import mail, signing
from django.utils import timezone
from django.core.files.storage import DefaultStorage
from django.utils.six.moves.urllib import parse as urllib
from django.utils.text import slugify
from django.test import override_settings
from corbo.models import Category, Announce, Subscription, Broadcast
from corbo.models import channel_choices
pytestmark = pytest.mark.django_db
CATEGORIES = (u'Alerts', u'News')
def get_random_number():
number_generator = list(range(10))
random.shuffle(number_generator)
number = ''.join(map(str, number_generator))
return number
@pytest.fixture
def categories():
categories = []
for category in CATEGORIES:
c, created = Category.objects.get_or_create(name=category, slug=slugify(category))
categories.append(c)
return categories
@pytest.fixture
def announces():
announces = []
for category in Category.objects.all():
a = Announce.objects.create(category=category, title='Announce 1',
publication_time=timezone.now(),
text='<h2>Announce 1</h2>')
Broadcast.objects.create(announce=a)
announces.append(a)
a = Announce.objects.create(category=category, title='Announce 2',
publication_time=timezone.now(),
text='<h2>Announce 2</h2>')
Broadcast.objects.create(announce=a)
announces.append(a)
return announces
def test_emailing_with_no_subscriptions(app, categories, announces, mailoutbox):
for announce in announces:
broadcast = Broadcast.objects.get(announce=announce)
broadcast.send()
assert not broadcast.delivery_count
assert not len(mailoutbox)
def test_send_email(app, categories, announces, mailoutbox):
for category in categories:
uuid = uuid4()
Subscription.objects.create(category=category,
identifier='mailto:%s@example.net' % uuid, uuid=uuid)
for i, announce in enumerate(announces):
broadcast = Broadcast.objects.get(announce=announce)
broadcast.send()
assert broadcast.delivery_count
assert len(mailoutbox) == i+1
def test_check_inline_css(app, categories, announces, mailoutbox):
total_sent = 0
for i, announce in enumerate(announces):
announce.text = '<style type="text/css">h2 {color: #F00}</style>' + announce.text
announce.save()
uuid = uuid4()
Subscription.objects.create(category=announce.category,
identifier='mailto:%s@example.net' % uuid, uuid=uuid)
broadcast = Broadcast.objects.get(announce=announce)
broadcast.send()
assert broadcast.delivery_count
assert len(mailoutbox) == total_sent + broadcast.delivery_count
total_sent += broadcast.delivery_count
assert 'h2 style="color:#F00"' in mailoutbox[i].html
@mock.patch('emails.utils.requests.get')
def test_check_inline_images(mocked_get, app, categories, announces, mailoutbox):
storage = DefaultStorage()
media_path = os.path.join(os.path.dirname(__file__), 'media')
image_name = 'logo.png'
image_name = storage.save(image_name, open(os.path.join(media_path, image_name), 'rb'))
total_sent = 0
for i, announce in enumerate(announces):
img_src = "/media/%s" % image_name
announce.text = announce.text + '<img src="%s" />' % img_src
announce.save()
uuid = uuid4()
Subscription.objects.create(category=announce.category,
identifier='mailto:%s@example.net' % uuid, uuid=uuid)
broadcast = Broadcast.objects.get(announce=announce)
mocked_get.return_value = mock.Mock(status_code=200,
headers={'content-type': 'image/png'},
content=storage.open(image_name).read())
broadcast.send()
assert broadcast.delivery_count
assert len(mailoutbox) == total_sent + broadcast.delivery_count
attachments = [a['filename'] for a in mailoutbox[0].attachments.as_dict()]
assert image_name in attachments
assert 'cid:%s' % image_name in mail.outbox[0].html_body
assert 'cid:%s' % image_name in mail.outbox[0].text_body
total_sent += broadcast.delivery_count
storage.delete(image_name)
def test_unsubscription_link(app, categories, announces, custom_mailoutbox):
unsubscription_link_sentinel = ''
subscriptions_number = 3
scheme = 'mailto:'
for category in categories:
for i in range(subscriptions_number):
uuid = uuid4()
uri = scheme + '%s@example.com' % uuid
Subscription.objects.create(category=category, identifier=uri, uuid=str(uuid))
for i, announce in enumerate(announces):
if announce.category != category:
continue
broadcast = Broadcast.objects.get(announce=announce)
broadcast.send()
assert broadcast.delivery_count
assert len(mail.outbox) == (i+1)*subscriptions_number
assert mail.outbox[i*subscriptions_number].subject == announce.title
for counter, destination in enumerate(category.subscription_set.all()):
index = i*subscriptions_number+counter
signature = urllib.unquote(re.findall('/unsubscribe/(.*)"', mail.outbox[index].html)[0])
unsubscription_link = reverse('unsubscribe', kwargs={'unsubscription_token': signature})
assert mail.outbox[index]._headers['List-Unsubscribe'] == '<http://localhost%s>' % unsubscription_link
assert unsubscription_link in mail.outbox[index].html
assert unsubscription_link in mail.outbox[index].text
assert unsubscription_link_sentinel != unsubscription_link
assert signing.loads(signature) == {
'category': announce.category.pk, 'identifier': destination.identifier}
unsubscription_link_sentinel = unsubscription_link
# refuse altered signature
resp = app.get(unsubscription_link + 'altered', status=404)
# make sure the uri schema is not in the page
resp = app.get(unsubscription_link)
assert scheme not in resp
def test_send_sms_with_no_gateway_defined(app, categories, announces, caplog):
for category in categories:
uuid = uuid4()
Subscription.objects.create(category=category,
identifier='sms:%s' % get_random_number(), uuid=uuid)
for i, announce in enumerate(announces):
broadcast = Broadcast.objects.get(announce=announce)
broadcast.send()
assert broadcast.delivery_count == 0
for record in caplog.records:
assert record.name == 'corbo.utils'
assert record.levelno == logging.ERROR
assert record.getMessage() == 'SMS send requested but no SMS gateway defined.'
@mock.patch('corbo.utils.requests.post')
def test_send_sms_with_gateway_api_error(mocked_post, app, categories, announces, caplog):
for category in categories:
for i in range(3):
uuid = uuid4()
Subscription.objects.create(category=category,
identifier='sms:%s' % get_random_number(), uuid=uuid)
for i, announce in enumerate(announces):
broadcast = Broadcast.objects.get(announce=announce)
with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
mocked_response = mock.Mock()
mocked_response.json.return_value = {'err': 1, 'data': None,
'err_desc': 'Payload error: missing "message" in JSON payload'}
mocked_post.return_value = mocked_response
broadcast.send()
assert broadcast.delivery_count == 0
records = caplog.records
assert len(records) == 1 + i
for record in records:
assert record.name == 'corbo.utils'
assert record.levelno == logging.WARNING
assert record.getMessage() == 'Error occured while sending sms: Payload error: missing "message" in JSON payload'
@mock.patch('corbo.utils.requests.post')
def test_send_sms_with_gateway_connection_error(mocked_post, app, categories, announces, caplog):
for category in categories:
for i in range(3):
uuid = uuid4()
Subscription.objects.create(category=category,
identifier='sms:%s' % get_random_number(), uuid=uuid)
for i, announce in enumerate(announces):
broadcast = Broadcast.objects.get(announce=announce)
with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
mocked_response = mock.Mock()
def mocked_requests_connection_error(*args, **kwargs):
raise requests.ConnectionError('unreachable')
mocked_post.side_effect = mocked_requests_connection_error
mocked_post.return_value = mocked_response
broadcast.send()
assert broadcast.delivery_count == 0
records = caplog.records
assert len(records) == 1 + i
for record in records:
assert record.name == 'corbo.utils'
assert record.levelno == logging.WARNING
assert record.getMessage() == 'Failed to reach SMS gateway: unreachable'
@mock.patch('corbo.utils.requests.post')
def test_send_sms(mocked_post, app, categories, announces):
for category in categories:
for i in range(3):
uuid = uuid4()
Subscription.objects.create(category=category,
identifier='sms:%s' % get_random_number(), uuid=uuid)
for announce in announces:
broadcast = Broadcast.objects.get(announce=announce)
with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
mocked_response = mock.Mock()
mocked_response.json.return_value = {'err': 0, 'err_desc': None, 'data': 'gateway response'}
for announce in announces:
broadcast = Broadcast.objects.get(announce=announce)
with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
mocked_response = mock.Mock()
mocked_response.json.return_value = {'err': 0, 'err_desc': None, 'data': 'gateway response'}
mocked_post.return_value = mocked_response
broadcast.send()
assert mocked_post.call_args[0][0] == 'http://sms.gateway'
assert mocked_post.call_args[1]['json']['from'] == 'Corbo'
assert isinstance(mocked_post.call_args[1]['json']['to'], list)
assert broadcast.delivery_count == 3