407 lines
15 KiB
Python
407 lines
15 KiB
Python
import json
|
|
|
|
import mock
|
|
import pytest
|
|
from decimal import Decimal
|
|
|
|
from django.contrib.auth.models import User
|
|
from django.test.client import RequestFactory
|
|
from django.utils.timezone import timedelta, now
|
|
from django.core.urlresolvers import reverse
|
|
|
|
from django.test import Client
|
|
|
|
from combo.data.models import Page
|
|
from combo.apps.notifications.models import Notification, NotificationsCell
|
|
from combo.apps.lingo.models import Regie, ActiveItems
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
client = Client()
|
|
|
|
|
|
@pytest.fixture
|
|
def user():
|
|
try:
|
|
admin = User.objects.get(username='admin')
|
|
except User.DoesNotExist:
|
|
admin = User.objects.create_user('admin', email=None, password='admin')
|
|
admin.email = 'admin@example.net'
|
|
admin.save()
|
|
return admin
|
|
|
|
|
|
@pytest.fixture
|
|
def user2():
|
|
try:
|
|
admin2 = User.objects.get(username='admin2')
|
|
except User.DoesNotExist:
|
|
admin2 = User.objects.create_user('admin2', email=None, password='admin2')
|
|
return admin2
|
|
|
|
|
|
def login(username='admin', password='admin'):
|
|
resp = client.post('/login/', {'username': username, 'password': password})
|
|
assert resp.status_code == 302
|
|
|
|
|
|
@pytest.fixture
|
|
def regie():
|
|
try:
|
|
regie = Regie.objects.get(slug='remote')
|
|
except Regie.DoesNotExist:
|
|
regie = Regie()
|
|
regie.label = 'Remote'
|
|
regie.slug = 'remote'
|
|
regie.description = 'remote'
|
|
regie.payment_min_amount = Decimal(2.0)
|
|
regie.service = 'dummy'
|
|
regie.save()
|
|
return regie
|
|
|
|
|
|
def test_notification_api(user, user2):
|
|
notification = Notification.notify(user, 'notifoo')
|
|
assert Notification.objects.count() == 1
|
|
assert notification.summary == 'notifoo'
|
|
assert notification.body == ''
|
|
assert notification.url == ''
|
|
assert notification.origin == ''
|
|
assert notification.external_id is None
|
|
assert notification.end_timestamp - notification.start_timestamp == timedelta(3)
|
|
assert notification.acked is False
|
|
Notification.objects.visible(user).ack()
|
|
assert Notification.objects.get().acked is True
|
|
|
|
Notification.notify(user, 'notirefoo', id=str(notification.pk), acked=False)
|
|
assert Notification.objects.count() == 1
|
|
assert Notification.objects.get().summary == 'notirefoo'
|
|
# we updated the notification, it's un-acked
|
|
assert Notification.objects.get().acked is False
|
|
|
|
Notification.notify(user, 'notirefoo', id=str(notification.pk), duration=3600)
|
|
noti = Notification.objects.get()
|
|
assert noti.end_timestamp - noti.start_timestamp == timedelta(seconds=3600)
|
|
|
|
notification = Notification.notify(user, 'notibar', id='ns:notibar')
|
|
assert Notification.objects.count() == 2
|
|
notification = Notification.notify(user, 'notirebar', id='ns:notibar')
|
|
assert Notification.objects.count() == 2
|
|
|
|
notification = Notification.notify(user2, 'notiother')
|
|
notification.forget()
|
|
assert Notification.objects.filter(user=user2).count() == 1
|
|
notification = Notification.objects.filter(user=user2).get()
|
|
assert notification.end_timestamp < now()
|
|
assert notification.acked is True
|
|
|
|
|
|
def test_notification_cell(user, user2):
|
|
page = Page(title='notif', slug='test_notification_cell', template_name='standard')
|
|
page.save()
|
|
cell = NotificationsCell(page=page, placeholder='content', order=0)
|
|
|
|
context = {'request': RequestFactory().get('/')}
|
|
context['synchronous'] = True # to get fresh content
|
|
|
|
context['request'].user = None
|
|
assert cell.is_visible(context['request'].user) is False
|
|
context['request'].user = user
|
|
assert cell.is_visible(context['request'].user) is True
|
|
assert cell.get_badge(context) is None
|
|
|
|
notification1 = Notification.notify(user, 'notibar')
|
|
notification2 = Notification.notify(user, 'notifoo')
|
|
content = cell.render(context)
|
|
assert 'notibar' in content
|
|
assert 'notifoo' in content
|
|
assert cell.get_badge(context) == {'badge': '2'}
|
|
|
|
notification2.forget()
|
|
content = cell.render(context)
|
|
assert 'notibar' in content
|
|
assert 'notifoo' not in content
|
|
assert cell.get_badge(context) == {'badge': '1'}
|
|
|
|
Notification.notify(user, 'notirebar', id=str(notification1.pk))
|
|
content = cell.render(context)
|
|
assert 'notirebar' in content
|
|
assert 'notibar' not in content
|
|
|
|
Notification.notify(user, 'notiurl', id=str(notification1.pk), url='https://www.example.net/')
|
|
content = cell.render(context)
|
|
assert 'notiurl' in content
|
|
assert 'https://www.example.net/' in content
|
|
|
|
notification3 = Notification.notify(user, 'ackme')
|
|
notification3.ack()
|
|
content = cell.render(context)
|
|
assert 'acked' in content
|
|
assert cell.get_badge(context) == {'badge': '1'}
|
|
|
|
notification1.ack()
|
|
content = cell.render(context)
|
|
assert cell.get_badge(context) is None
|
|
|
|
Notification.notify(user2, 'notiother')
|
|
content = cell.render(context)
|
|
assert 'notiurl' in content
|
|
assert 'notiother' not in content
|
|
assert cell.get_badge(context) is None
|
|
context['request'].user = user2
|
|
content = cell.render(context)
|
|
assert 'notiurl' not in content
|
|
assert 'notiother' in content
|
|
assert cell.get_badge(context) == {'badge': '1'}
|
|
|
|
|
|
def test_notification_ws(user):
|
|
|
|
def notify(data, check_id, count):
|
|
resp = client.post(reverse('api-notification-add'), json.dumps(data),
|
|
content_type='application/json')
|
|
assert resp.status_code == 200
|
|
result = json.loads(resp.content)
|
|
assert result == {'data': {'id': check_id}, 'err': 0}
|
|
assert Notification.objects.filter(user=user).count() == count
|
|
return Notification.objects.find(user, check_id).get()
|
|
|
|
login()
|
|
notify({'summary': 'foo'}, '1', 1)
|
|
notify({'summary': 'bar'}, '2', 2)
|
|
notify({'summary': 'bar', 'id': 'ns:noti3'}, 'ns:noti3', 3)
|
|
notif = {
|
|
'summary': 'bar',
|
|
'url': 'http://www.example.net',
|
|
'body': 'foobar',
|
|
'origin': 'blah',
|
|
'start_timestamp': '2016-11-11T11:11',
|
|
'end_timestamp': '2016-12-12T12:12',
|
|
}
|
|
result = notify(notif, '4', 4)
|
|
assert result.summary == notif['summary']
|
|
assert result.url == notif['url']
|
|
assert result.body == notif['body']
|
|
assert result.origin == notif['origin']
|
|
assert result.start_timestamp.isoformat()[:19] == '2016-11-11T11:11:00'
|
|
assert result.end_timestamp.isoformat()[:19] == '2016-12-12T12:12:00'
|
|
|
|
del notif['end_timestamp']
|
|
notif['duration'] = 3600
|
|
result = notify(notif, '5', 5)
|
|
assert result.end_timestamp.isoformat()[:19] == '2016-11-11T12:11:00'
|
|
|
|
notif['duration'] = '3600'
|
|
result = notify(notif, '6', 6)
|
|
assert result.end_timestamp.isoformat()[:19] == '2016-11-11T12:11:00'
|
|
|
|
resp = client.get(reverse('api-notification-ack', kwargs={'notification_id': '6'}))
|
|
assert resp.status_code == 200
|
|
assert Notification.objects.filter(acked=True).count() == 1
|
|
assert Notification.objects.filter(acked=True).get().public_id == '6'
|
|
|
|
resp = client.get(reverse('api-notification-forget', kwargs={'notification_id': '5'}))
|
|
assert resp.status_code == 200
|
|
assert Notification.objects.filter(acked=True).count() == 2
|
|
notif = Notification.objects.find(user, '5').get()
|
|
assert notif.public_id == '5'
|
|
assert notif.acked is True
|
|
assert notif.end_timestamp < now()
|
|
|
|
|
|
def test_notification_ws_badrequest(user):
|
|
|
|
def check_error(data, message):
|
|
resp = client.post(reverse('api-notification-add'),
|
|
json.dumps(data) if data else None,
|
|
content_type='application/json')
|
|
assert resp.status_code == 400
|
|
result = json.loads(resp.content)
|
|
assert result['err'] == 1
|
|
assert message in result['err_desc'].values()[0][0]
|
|
|
|
login()
|
|
check_error(None, 'required')
|
|
check_error('blahblah', 'Invalid data')
|
|
check_error({'summary': ''}, 'may not be blank')
|
|
check_error({'summary': 'x'*1000}, 'no more than 140 char')
|
|
check_error({'summary': 'ok', 'url': 'xx'}, 'valid URL')
|
|
check_error({'summary': 'ok', 'start_timestamp': 'foo'}, 'wrong format')
|
|
check_error({'summary': 'ok', 'end_timestamp': 'bar'}, 'wrong format')
|
|
check_error({'summary': 'ok', 'duration': 'xx'}, 'valid integer is required')
|
|
check_error({'summary': 'ok', 'duration': 4.01}, 'valid integer is required')
|
|
check_error({'summary': 'ok', 'duration': -4}, 'greater than')
|
|
|
|
|
|
def test_notification_ws_deny():
|
|
assert client.post(reverse('api-notification-add'),
|
|
json.dumps({'summary': 'ok'}),
|
|
content_type='application/json').status_code == 403
|
|
assert client.get(reverse('api-notification-ack',
|
|
kwargs={'notification_id': '1'})).status_code == 403
|
|
assert client.get(reverse('api-notification-forget',
|
|
kwargs={'notification_id': '1'})).status_code == 403
|
|
|
|
|
|
def test_notification_ws_check_urls():
|
|
assert reverse('api-notification-add') == '/api/notification/add/'
|
|
assert reverse('api-notification-ack',
|
|
kwargs={'notification_id': 'noti1'}) == '/api/notification/ack/noti1/'
|
|
assert reverse('api-notification-forget',
|
|
kwargs={'notification_id': 'noti1'}) == '/api/notification/forget/noti1/'
|
|
|
|
|
|
def test_notification_id_and_origin(user):
|
|
login()
|
|
|
|
def notify(data):
|
|
resp = client.post(reverse('api-notification-add'), json.dumps(data),
|
|
content_type='application/json')
|
|
|
|
return json.loads(resp.content)
|
|
|
|
result = notify({'summary': 'foo', 'id': '1'})
|
|
assert result['err'] == 1
|
|
|
|
notification = Notification.notify(user, 'foo')
|
|
|
|
result = notify({'summary': 'foo', 'id': str(notification.id)})
|
|
assert result['err'] == 0
|
|
|
|
result = notify({'summary': 'foo', 'id': 'foo'})
|
|
assert result['err'] == 1
|
|
|
|
result = notify({'summary': 'foo', 'id': 'foo:foo', 'origin': 'bar'})
|
|
assert result['err'] == 0
|
|
|
|
|
|
@mock.patch('combo.utils.requests_wrapper.RequestsSession.request')
|
|
def test_notify_remote_items(mock_get, app, user, user2, regie):
|
|
invoice_now = now()
|
|
creation_date = (invoice_now - timedelta(days=1)).date().isoformat()
|
|
pay_limit_date = (invoice_now + timedelta(days=20)).date().isoformat()
|
|
new_pay_limit_date = (invoice_now + timedelta(days=5)).date().isoformat()
|
|
FAKE_PENDING_INVOICES = {
|
|
"data":
|
|
{
|
|
"admin": {
|
|
"invoices": [
|
|
{
|
|
'id': '01',
|
|
'label': '010101',
|
|
'total_amount': '10',
|
|
'amount': '10',
|
|
'created': creation_date,
|
|
'pay_limit_date': pay_limit_date,
|
|
'has_pdf': False,
|
|
},
|
|
{
|
|
'id': '011',
|
|
'label': '0101011',
|
|
'total_amount': '1.5',
|
|
'amount': '1.5',
|
|
'created': creation_date,
|
|
'pay_limit_date': pay_limit_date,
|
|
'has_pdf': False,
|
|
}
|
|
]
|
|
},
|
|
'admin2': {
|
|
'invoices': [
|
|
{
|
|
'id': '02',
|
|
'label': '020202',
|
|
'total_amount': '2.0',
|
|
'amount': '2.0',
|
|
'created': creation_date,
|
|
'pay_limit_date': pay_limit_date,
|
|
'has_pdf': False,
|
|
},
|
|
{
|
|
|
|
'id': 'O5',
|
|
'label': '050505',
|
|
'total_amount': '24',
|
|
'amount': '24',
|
|
'created': (invoice_now - timedelta(days=25)).date().isoformat(),
|
|
'pay_limit_date': (invoice_now - timedelta(days=10)).date().isoformat(),
|
|
'has_pdf': False,
|
|
}
|
|
]
|
|
},
|
|
'foo': {
|
|
'invoices': [
|
|
{
|
|
|
|
'id': 'O3',
|
|
'label': '030303',
|
|
'total_amount': '42',
|
|
'amount': '42',
|
|
'created': creation_date,
|
|
'pay_limit_date': pay_limit_date,
|
|
'has_pdf': False,
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
mock_response = mock.Mock(status_code=200, content=json.dumps(FAKE_PENDING_INVOICES))
|
|
mock_response.json.return_value = FAKE_PENDING_INVOICES
|
|
mock_get.return_value = mock_response
|
|
regie.notify_new_remote_invoices()
|
|
assert mock_get.call_count == 0
|
|
regie.webservice_url = 'http://example.org/regie' # is_remote
|
|
regie.save()
|
|
regie.notify_new_remote_invoices()
|
|
mock_get.assert_not_called()
|
|
|
|
with mock.patch('combo.apps.lingo.models.UserSAMLIdentifier') as user_saml:
|
|
# simulate class exception
|
|
class DoesNotExist(Exception):
|
|
pass
|
|
user_saml.DoesNotExist = DoesNotExist
|
|
def side_effect(*args, **kwargs):
|
|
name_id = kwargs['name_id']
|
|
if name_id == 'foo':
|
|
raise user_saml.DoesNotExist
|
|
return mock.Mock(user=User.objects.get(username=name_id))
|
|
|
|
mocked_objects = mock.Mock()
|
|
mocked_objects.get = mock.Mock(side_effect=side_effect)
|
|
user_saml.objects = mocked_objects
|
|
|
|
assert Notification.objects.all().count() == 0
|
|
|
|
regie.notify_new_remote_invoices()
|
|
assert 'NameID=' not in mock_get.call_args[0][1]
|
|
assert 'email=' not in mock_get.call_args[0][1]
|
|
assert Notification.objects.filter(external_id__startswith='invoice-%s:' % regie.slug).visible().new().count() == 2
|
|
assert Notification.objects.filter(external_id__startswith='invoice-%s:reminder-' % regie.slug).count() == 0
|
|
assert Notification.objects.count() == 2
|
|
for notif in Notification.objects.all():
|
|
assert notif.url == '', notif.id
|
|
|
|
page = Page(title='Active Items', slug='active_items', template_name='standard')
|
|
page.save()
|
|
cell = ActiveItems(page=page, placeholder='content', order=0)
|
|
cell.save()
|
|
|
|
for username in FAKE_PENDING_INVOICES['data']:
|
|
for invoice in FAKE_PENDING_INVOICES['data'][username]['invoices']:
|
|
invoice['pay_limit_date'] = new_pay_limit_date
|
|
|
|
# create remind notifications
|
|
regie.notify_new_remote_invoices()
|
|
assert Notification.objects.exclude(external_id__startswith='invoice-%s:reminder-' % regie.slug) \
|
|
.visible().count() == 0
|
|
assert Notification.objects.filter(external_id__startswith='invoice-%s:reminder-' % regie.slug) \
|
|
.visible().new().count() == 3
|
|
assert Notification.objects.count() == 5
|
|
|
|
# url appeared on new reminder notifications
|
|
assert len([notif for notif in Notification.objects.all() if notif.url == page.get_online_url()]) == 3
|
|
|
|
# be sure the are no more reminders created
|
|
regie.notify_new_remote_invoices()
|
|
assert Notification.objects.count() == 5
|