467 lines
17 KiB
Python
467 lines
17 KiB
Python
import json
|
|
|
|
import mock
|
|
import pytest
|
|
from decimal import Decimal
|
|
from datetime import timedelta as timedelta
|
|
|
|
from django.contrib.auth.models import User
|
|
from django.test.client import RequestFactory
|
|
from django.utils.timezone import timedelta, now
|
|
from django.core.urlresolvers import reverse
|
|
|
|
from django.test import Client
|
|
|
|
from combo.data.models import Page
|
|
from combo.apps.notifications.models import Notification, NotificationsCell
|
|
from combo.apps.lingo.models import Regie, ActiveItems
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
client = Client()
|
|
|
|
|
|
def login(user):
|
|
resp = client.post('/login/', {'username': user.username, 'password': user.username})
|
|
assert resp.status_code == 302
|
|
|
|
|
|
@pytest.fixture
|
|
def regie():
|
|
try:
|
|
regie = Regie.objects.get(slug='remote')
|
|
except Regie.DoesNotExist:
|
|
regie = Regie()
|
|
regie.label = 'Remote'
|
|
regie.slug = 'remote'
|
|
regie.description = 'remote'
|
|
regie.payment_min_amount = Decimal(2.0)
|
|
regie.service = 'dummy'
|
|
regie.save()
|
|
return regie
|
|
|
|
|
|
def test_notification_api(john_doe, jane_doe):
|
|
notification = Notification.notify(john_doe, 'notifoo')
|
|
assert Notification.objects.count() == 1
|
|
assert notification.summary == 'notifoo'
|
|
assert notification.body == ''
|
|
assert notification.url == ''
|
|
assert notification.origin == ''
|
|
assert notification.external_id is None
|
|
assert notification.end_timestamp - notification.start_timestamp == timedelta(3)
|
|
assert notification.acked is False
|
|
Notification.objects.visible(john_doe).ack()
|
|
assert Notification.objects.get().acked is True
|
|
|
|
Notification.notify(john_doe, 'notirefoo', id=str(notification.pk), acked=False)
|
|
assert Notification.objects.count() == 1
|
|
assert Notification.objects.get().summary == 'notirefoo'
|
|
# we updated the notification, it's un-acked
|
|
assert Notification.objects.get().acked is False
|
|
|
|
Notification.notify(john_doe, 'notirefoo', id=str(notification.pk), duration=3600)
|
|
noti = Notification.objects.get()
|
|
assert noti.end_timestamp - noti.start_timestamp == timedelta(seconds=3600)
|
|
|
|
notification = Notification.notify(john_doe, 'notibar', id='ns:notibar')
|
|
assert Notification.objects.count() == 2
|
|
notification = Notification.notify(john_doe, 'notirebar', id='ns:notibar')
|
|
assert Notification.objects.count() == 2
|
|
|
|
notification = Notification.notify(jane_doe, 'notiother')
|
|
notification.forget()
|
|
assert Notification.objects.filter(user=jane_doe).count() == 1
|
|
notification = Notification.objects.filter(user=jane_doe).get()
|
|
assert notification.end_timestamp < now()
|
|
assert notification.acked is True
|
|
|
|
|
|
def test_notification_cell(john_doe, jane_doe):
|
|
page = Page(title='notif', slug='test_notification_cell', template_name='standard')
|
|
page.save()
|
|
cell = NotificationsCell(page=page, placeholder='content', order=0)
|
|
|
|
context = {'request': RequestFactory().get('/')}
|
|
context['synchronous'] = True # to get fresh content
|
|
|
|
context['request'].user = None
|
|
assert cell.is_visible(context['request'].user) is False
|
|
context['request'].user = john_doe
|
|
assert cell.is_visible(context['request'].user) is True
|
|
assert cell.get_badge(context) is None
|
|
|
|
notification1 = Notification.notify(john_doe, 'notibar')
|
|
notification2 = Notification.notify(john_doe, 'notifoo')
|
|
content = cell.render(context)
|
|
assert 'notibar' in content
|
|
assert 'notifoo' in content
|
|
assert cell.get_badge(context) == {'badge': '2'}
|
|
|
|
notification2.forget()
|
|
content = cell.render(context)
|
|
assert 'notibar' in content
|
|
assert 'notifoo' not in content
|
|
assert cell.get_badge(context) == {'badge': '1'}
|
|
|
|
Notification.notify(john_doe, 'notirebar', id=str(notification1.pk))
|
|
content = cell.render(context)
|
|
assert 'notirebar' in content
|
|
assert 'notibar' not in content
|
|
|
|
Notification.notify(john_doe, 'notiurl', id=str(notification1.pk), url='https://www.example.net/')
|
|
content = cell.render(context)
|
|
assert 'notiurl' in content
|
|
assert 'https://www.example.net/' in content
|
|
|
|
notification3 = Notification.notify(john_doe, 'ackme')
|
|
notification3.ack()
|
|
content = cell.render(context)
|
|
assert 'acked' in content
|
|
assert cell.get_badge(context) == {'badge': '1'}
|
|
|
|
notification1.ack()
|
|
content = cell.render(context)
|
|
assert cell.get_badge(context) is None
|
|
|
|
Notification.notify(jane_doe, 'notiother')
|
|
content = cell.render(context)
|
|
assert 'notiurl' in content
|
|
assert 'notiother' not in content
|
|
assert cell.get_badge(context) is None
|
|
context['request'].user = jane_doe
|
|
content = cell.render(context)
|
|
assert 'notiurl' not in content
|
|
assert 'notiother' in content
|
|
assert cell.get_badge(context) == {'badge': '1'}
|
|
|
|
|
|
def test_notification_ws(john_doe):
|
|
|
|
def notify(data, check_id, count):
|
|
resp = client.post(reverse('api-notification-add'), json.dumps(data),
|
|
content_type='application/json')
|
|
assert resp.status_code == 200
|
|
result = json.loads(resp.content)
|
|
assert result == {'data': {'id': check_id}, 'err': 0}
|
|
assert Notification.objects.filter(user=john_doe).count() == count
|
|
return Notification.objects.find(john_doe, check_id).get()
|
|
|
|
login(john_doe)
|
|
notify({'summary': 'foo'}, '1', 1)
|
|
notify({'summary': 'bar'}, '2', 2)
|
|
notify({'summary': 'bar', 'id': 'ns:noti3'}, 'ns:noti3', 3)
|
|
notif = {
|
|
'summary': 'bar',
|
|
'url': 'http://www.example.net',
|
|
'body': 'foobar',
|
|
'origin': 'blah',
|
|
'start_timestamp': '2016-11-11T11:11',
|
|
'end_timestamp': '2016-12-12T12:12',
|
|
}
|
|
result = notify(notif, '4', 4)
|
|
assert result.summary == notif['summary']
|
|
assert result.url == notif['url']
|
|
assert result.body == notif['body']
|
|
assert result.origin == notif['origin']
|
|
assert result.start_timestamp.isoformat()[:19] == '2016-11-11T11:11:00'
|
|
assert result.end_timestamp.isoformat()[:19] == '2016-12-12T12:12:00'
|
|
|
|
del notif['end_timestamp']
|
|
notif['duration'] = 3600
|
|
result = notify(notif, '5', 5)
|
|
assert result.end_timestamp.isoformat()[:19] == '2016-11-11T12:11:00'
|
|
|
|
notif['duration'] = '3600'
|
|
result = notify(notif, '6', 6)
|
|
assert result.end_timestamp.isoformat()[:19] == '2016-11-11T12:11:00'
|
|
|
|
resp = client.get(reverse('api-notification-ack', kwargs={'notification_id': '6'}))
|
|
assert resp.status_code == 200
|
|
assert Notification.objects.filter(acked=True).count() == 1
|
|
assert Notification.objects.filter(acked=True).get().public_id == '6'
|
|
|
|
resp = client.get(reverse('api-notification-forget', kwargs={'notification_id': '5'}))
|
|
assert resp.status_code == 200
|
|
assert Notification.objects.filter(acked=True).count() == 2
|
|
notif = Notification.objects.find(john_doe, '5').get()
|
|
assert notif.public_id == '5'
|
|
assert notif.acked is True
|
|
assert notif.end_timestamp < now()
|
|
|
|
|
|
def test_notification_ws_badrequest(john_doe):
|
|
|
|
def check_error(data, message):
|
|
resp = client.post(reverse('api-notification-add'),
|
|
json.dumps(data) if data else None,
|
|
content_type='application/json')
|
|
assert resp.status_code == 400
|
|
result = json.loads(resp.content)
|
|
assert result['err'] == 1
|
|
assert message in result['err_desc'].values()[0][0]
|
|
|
|
login(john_doe)
|
|
check_error(None, 'required')
|
|
check_error('blahblah', 'Invalid data')
|
|
check_error({'summary': ''}, 'may not be blank')
|
|
check_error({'summary': 'x'*1000}, 'no more than 140 char')
|
|
check_error({'summary': 'ok', 'url': 'xx'}, 'valid URL')
|
|
check_error({'summary': 'ok', 'start_timestamp': 'foo'}, 'wrong format')
|
|
check_error({'summary': 'ok', 'end_timestamp': 'bar'}, 'wrong format')
|
|
check_error({'summary': 'ok', 'duration': 'xx'}, 'valid integer is required')
|
|
check_error({'summary': 'ok', 'duration': 4.01}, 'valid integer is required')
|
|
check_error({'summary': 'ok', 'duration': -4}, 'greater than')
|
|
|
|
|
|
def test_notification_ws_deny():
|
|
assert client.post(reverse('api-notification-add'),
|
|
json.dumps({'summary': 'ok'}),
|
|
content_type='application/json').status_code == 403
|
|
assert client.get(reverse('api-notification-ack',
|
|
kwargs={'notification_id': '1'})).status_code == 403
|
|
assert client.get(reverse('api-notification-forget',
|
|
kwargs={'notification_id': '1'})).status_code == 403
|
|
|
|
|
|
def test_notification_ws_check_urls():
|
|
assert reverse('api-notification-add') == '/api/notification/add/'
|
|
assert reverse('api-notification-ack',
|
|
kwargs={'notification_id': 'noti1'}) == '/api/notification/ack/noti1/'
|
|
assert reverse('api-notification-forget',
|
|
kwargs={'notification_id': 'noti1'}) == '/api/notification/forget/noti1/'
|
|
|
|
|
|
def test_notification_id_and_origin(john_doe):
|
|
login(john_doe)
|
|
|
|
def notify(data):
|
|
resp = client.post(reverse('api-notification-add'), json.dumps(data),
|
|
content_type='application/json')
|
|
|
|
return json.loads(resp.content)
|
|
|
|
result = notify({'summary': 'foo', 'id': '1'})
|
|
assert result['err'] == 1
|
|
|
|
notification = Notification.notify(john_doe, 'foo')
|
|
|
|
result = notify({'summary': 'foo', 'id': str(notification.id)})
|
|
assert result['err'] == 0
|
|
|
|
result = notify({'summary': 'foo', 'id': 'foo'})
|
|
assert result['err'] == 1
|
|
|
|
result = notify({'summary': 'foo', 'id': 'foo:foo', 'origin': 'bar'})
|
|
assert result['err'] == 0
|
|
|
|
|
|
@mock.patch('combo.utils.requests_wrapper.RequestsSession.request')
|
|
@pytest.mark.freeze_time('2016-01-02')
|
|
def test_notify_remote_items(mock_get, app, john_doe, jane_doe, regie, monkeypatch, freezer):
|
|
datetime_format = '%Y-%m-%dT%H:%M:%S'
|
|
invoice_now = now()
|
|
invoice_now = freezer()
|
|
FAKE_PENDING_INVOICES = {
|
|
"data":
|
|
{
|
|
john_doe.username: {
|
|
"invoices": [
|
|
{
|
|
'id': '01',
|
|
'label': '010101',
|
|
'total_amount': '10',
|
|
'amount': '10',
|
|
'created': '2016-01-01',
|
|
'pay_limit_date': '2016-01-20',
|
|
'has_pdf': False,
|
|
},
|
|
{
|
|
'id': '011',
|
|
'label': '0101011',
|
|
'total_amount': '1.5',
|
|
'amount': '1.5',
|
|
'created': '2016-01-01',
|
|
'pay_limit_date': '2016-01-20',
|
|
'has_pdf': False,
|
|
}
|
|
]
|
|
},
|
|
jane_doe.username: {
|
|
'invoices': [
|
|
{
|
|
'id': '02',
|
|
'label': '020202',
|
|
'total_amount': '2.0',
|
|
'amount': '2.0',
|
|
'created': '2016-01-01',
|
|
'pay_limit_date': '2016-01-20',
|
|
'has_pdf': False,
|
|
},
|
|
{
|
|
|
|
'id': 'O5',
|
|
'label': '050505',
|
|
'total_amount': '24',
|
|
'amount': '24',
|
|
'created': '2015-12-05',
|
|
'pay_limit_date': '2015-12-30',
|
|
'has_pdf': False,
|
|
}
|
|
]
|
|
},
|
|
'foo': {
|
|
'invoices': [
|
|
{
|
|
|
|
'id': 'O3',
|
|
'label': '030303',
|
|
'total_amount': '42',
|
|
'amount': '42',
|
|
'created': '2016-01-01',
|
|
'pay_limit_date': '2016-01-20',
|
|
'has_pdf': False,
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
mock_response = mock.Mock(status_code=200, content=json.dumps(FAKE_PENDING_INVOICES))
|
|
mock_response.json.return_value = FAKE_PENDING_INVOICES
|
|
mock_get.return_value = mock_response
|
|
|
|
# make sure the regie is not remote
|
|
regie.webservice_url = ''
|
|
regie.save()
|
|
|
|
regie.notify_new_remote_invoices()
|
|
assert mock_get.call_count == 0
|
|
regie.webservice_url = 'http://example.org/regie' # is_remote
|
|
regie.save()
|
|
regie.notify_new_remote_invoices()
|
|
mock_get.assert_called_once()
|
|
|
|
from combo.apps.lingo import models
|
|
monkeypatch.setattr(models, 'UserSAMLIdentifier', None)
|
|
regie.notify_new_remote_invoices()
|
|
|
|
# make sure no other requests calls are made
|
|
assert mock_get.call_count == 1
|
|
|
|
with mock.patch('combo.apps.lingo.models.UserSAMLIdentifier') as user_saml:
|
|
# simulate class exception
|
|
class DoesNotExist(Exception):
|
|
pass
|
|
user_saml.DoesNotExist = DoesNotExist
|
|
def side_effect(*args, **kwargs):
|
|
name_id = kwargs['name_id']
|
|
if name_id == 'foo':
|
|
raise user_saml.DoesNotExist
|
|
return mock.Mock(user=User.objects.get(username=name_id))
|
|
|
|
mocked_objects = mock.Mock()
|
|
mocked_objects.get = mock.Mock(side_effect=side_effect)
|
|
user_saml.objects = mocked_objects
|
|
|
|
assert Notification.objects.all().count() == 0
|
|
|
|
regie.notify_new_remote_invoices()
|
|
assert 'NameID=' not in mock_get.call_args[0][1]
|
|
assert 'email=' not in mock_get.call_args[0][1]
|
|
assert Notification.objects.filter(external_id__startswith='invoice-%s:' % regie.slug).visible().new().count() == 2
|
|
assert Notification.objects.filter(external_id__startswith='invoice-%s:reminder-' % regie.slug).count() == 0
|
|
assert Notification.objects.count() == 2
|
|
for notif in Notification.objects.all():
|
|
assert notif.url == '', notif.id
|
|
|
|
page = Page(title='Active Items', slug='active_items', template_name='standard')
|
|
page.save()
|
|
cell = ActiveItems(page=page, placeholder='content', order=0)
|
|
cell.save()
|
|
|
|
freezer.move_to(invoice_now + timedelta(days=16))
|
|
|
|
# create remind notifications
|
|
regie.notify_new_remote_invoices()
|
|
assert Notification.objects.exclude(external_id__startswith='invoice-%s:reminder-' % regie.slug) \
|
|
.visible().count() == 0
|
|
assert Notification.objects.filter(external_id__startswith='invoice-%s:reminder-' % regie.slug) \
|
|
.visible().new().count() == 2
|
|
assert Notification.objects.count() == 4
|
|
|
|
# url appeared on new reminder notifications
|
|
assert len([notif for notif in Notification.objects.all() if notif.url == page.get_online_url()]) == 2
|
|
|
|
# be sure the are no more reminders created
|
|
regie.notify_new_remote_invoices()
|
|
assert Notification.objects.count() == 4
|
|
|
|
freezer.move_to(invoice_now + timedelta(days=21))
|
|
regie.notify_new_remote_invoices()
|
|
assert Notification.objects.visible().count() == 0
|
|
|
|
|
|
def test_notification_never_expire(app, freezer, rf, john_doe):
|
|
start = freezer()
|
|
|
|
app.authorization = ('Basic', (john_doe.username, john_doe.username))
|
|
app.post_json(reverse('api-notification-add'), params={
|
|
'summary': 'notibar',
|
|
'url': 'http://www.example.net',
|
|
'body': 'foobar',
|
|
'origin': 'blah',
|
|
'duration': 0,
|
|
})
|
|
app.post_json(reverse('api-notification-add'), params={
|
|
'summary': 'notifoo',
|
|
'url': 'http://www.example.net',
|
|
'body': 'foobar',
|
|
'origin': 'blah',
|
|
'duration': 86400 * 2, # 2 days
|
|
})
|
|
page = Page.objects.create(title='notif', slug='test_notification_cell', template_name='standard')
|
|
cell = NotificationsCell(page=page, placeholder='content', order=0)
|
|
|
|
request = rf.get('/')
|
|
request.user = john_doe
|
|
context = {'request': request}
|
|
|
|
freezer.move_to(start - timedelta(seconds=10))
|
|
assert Notification.objects.visible(john_doe).count() == 0
|
|
content = cell.render(context)
|
|
assert 'notibar' not in content
|
|
assert 'notifoo' not in content
|
|
|
|
freezer.move_to(start + timedelta(seconds=10))
|
|
assert Notification.objects.visible(john_doe).count() == 2
|
|
content = cell.render(context)
|
|
assert 'notibar' in content
|
|
assert 'notifoo' in content
|
|
|
|
freezer.move_to(start + timedelta(days=1))
|
|
content = cell.render(context)
|
|
assert Notification.objects.visible(john_doe).count() == 2
|
|
assert 'notibar' in content
|
|
assert 'notifoo' in content
|
|
|
|
freezer.move_to(start + timedelta(days=3))
|
|
content = cell.render(context)
|
|
assert Notification.objects.visible(john_doe).count() == 1
|
|
assert 'notibar' in content
|
|
assert 'notifoo' not in content
|
|
|
|
freezer.move_to(start + timedelta(days=365))
|
|
content = cell.render(context)
|
|
assert Notification.objects.visible(john_doe).count() == 1
|
|
assert 'notibar' in content
|
|
assert 'notifoo' not in content
|
|
|
|
Notification.objects.visible(john_doe).ack()
|
|
|
|
# acking a notification without and end_timestamp, still visible
|
|
freezer.move_to(start + timedelta(days=365, seconds=1))
|
|
content = cell.render(context)
|
|
assert Notification.objects.visible(john_doe).count() == 1
|
|
assert 'notibar' in content
|
|
assert 'notifoo' not in content
|