passerelle/tests/test_proxylogger.py

191 lines
6.9 KiB
Python

# -*- coding: utf-8 -*-
import logging
import datetime
import itertools
import pytest
from httmock import HTTMock
from django.core.exceptions import ValidationError
from passerelle.base.models import ProxyLogger, ResourceLog
from passerelle.apps.feeds.models import Feed
from .test_availability import down_mock, up_mock
@pytest.fixture
def connector():
connector, created = Feed.objects.get_or_create(slug='some-slug')
connector.set_log_level('DEBUG')
connector.url = 'http://example.net/'
connector.save()
return connector
def test_proxy_logger_basic(db, connector):
pr = ProxyLogger(connector)
pr.debug(u'some message')
rl_query = ResourceLog.objects.all()
assert len(rl_query) == 1
rl = rl_query.first()
assert rl.message == u'some message'
assert rl.levelno == logging.DEBUG
assert rl.appname == u'feeds'
assert rl.slug == u'some-slug'
def test_proxy_logger_std_interpolation(db, connector):
ResourceLog.objects.all().delete()
pr = ProxyLogger(connector)
pr.debug(u'some message %s', u'some var')
rl_query = ResourceLog.objects.all()
rl = rl_query.first()
assert rl.message == u'some message some var'
def test_proxy_logger_dict_interpolation(db, connector):
ResourceLog.objects.all().delete()
pr = ProxyLogger(connector)
pr.debug(u'some message %(var_name)s', {u'var_name': u'some var'})
rl_query = ResourceLog.objects.all()
rl = rl_query.first()
assert rl.message == u'some message some var'
def test_proxy_logger_ignore(db, connector):
ResourceLog.objects.all().delete()
connector.set_log_level('INFO')
pr = ProxyLogger(connector)
pr.debug(u'some message')
assert len(ResourceLog.objects.all()) == 0
def test_proxy_logger_ignore_when_down(db, connector):
with HTTMock(down_mock): # set connector as down
connector.availability()
assert connector.down() is True
ResourceLog.objects.all().delete()
pr = ProxyLogger(connector)
pr.debug(u'some message')
assert len(ResourceLog.objects.all()) == 0
def test_validate_notification_delays(db, connector):
def take(iterator, count):
return list(itertools.compress(iterator, range(1, count + 1)))
availability_parameters = connector.availability_parameters
assert availability_parameters.notification_delays == '0'
availability_parameters.full_clean()
assert availability_parameters.has_zero_delay()
assert list(availability_parameters.notification_delays_generator()) == [0]
availability_parameters.notification_delays = '0,5,100,1000'
availability_parameters.full_clean()
assert availability_parameters.has_zero_delay()
assert take(availability_parameters.notification_delays_generator(), 6) == [0, 5, 100, 1000, 2000, 3000]
availability_parameters.notification_delays = '5,100,1000'
availability_parameters.full_clean()
assert not availability_parameters.has_zero_delay()
assert take(availability_parameters.notification_delays_generator(), 6) == [5, 100, 1000, 2000, 3000, 4000]
availability_parameters.notification_delays = '5'
availability_parameters.full_clean()
assert not availability_parameters.has_zero_delay()
assert take(availability_parameters.notification_delays_generator(), 3) == [5, 10, 15]
# validation errors
availability_parameters.notification_delays = '5,100,100'
with pytest.raises(ValidationError):
availability_parameters.full_clean()
availability_parameters.notification_delays = ''
with pytest.raises(ValidationError):
availability_parameters.full_clean()
availability_parameters.notification_delays = '1,x'
with pytest.raises(ValidationError):
availability_parameters.full_clean()
availability_parameters.notification_delays = '10,1'
with pytest.raises(ValidationError):
availability_parameters.full_clean()
@pytest.mark.parametrize('notification_delays', ['0', '0,5,100', '5,100'])
def test_log_on_connector_availability_change(db, connector, freezer, notification_delays):
connector.title = u'éléphant'
availability_parameters = connector.availability_parameters
availability_parameters.notification_delays = notification_delays
availability_parameters.save()
with HTTMock(up_mock): # set connector as up
connector.availability()
assert ResourceLog.objects.count() == 1
assert ResourceLog.objects.latest('id').level == 'info'
assert ResourceLog.objects.latest('id').message == 'GET http://example.net/ (=> 200)'
ResourceLog.objects.all().delete()
# move 5 minutes in the future
freezer.move_to(datetime.timedelta(seconds=60 * 5 + 1))
with HTTMock(down_mock): # set connector as down
connector.availability()
assert connector.down()
last_count1 = ResourceLog.objects.count()
assert last_count1 == 2
assert ResourceLog.objects.all()[0].message == 'GET http://example.net/ (=> 404)'
assert ResourceLog.objects.all()[1].level == 'error' if notification_delays.startswith('0') else 'warning'
assert (u'connector "éléphant" (Feed) is now down: 404 Client'
in ResourceLog.objects.all()[1].message)
# move 5 minutes in the future
freezer.move_to(datetime.timedelta(seconds=60 * 5 + 1))
# second time log as error
with HTTMock(down_mock): # connector is still down
connector.availability()
assert connector.down()
last_count2 = ResourceLog.objects.count()
if notification_delays == '0':
assert last_count2 == last_count1
else:
assert last_count2 == last_count1 + 1
assert ResourceLog.objects.all()[2].level == 'error' if notification_delays != '0' else 'warning'
assert (u'connector "éléphant" (Feed) has been down for 5 minutes: 404'
in ResourceLog.objects.all()[2].message)
# move 3 minutes in the future
freezer.move_to(datetime.timedelta(seconds=60 * 3 + 1))
# third time no log
with HTTMock(down_mock): # connector is still down
connector.availability()
assert ResourceLog.objects.count() == last_count2
# move 3 minutes in the future
freezer.move_to(datetime.timedelta(seconds=60 * 3 + 1))
with HTTMock(up_mock): # set connector as up
connector.availability()
assert not connector.down()
last_count3 = ResourceLog.objects.count()
assert last_count3 == last_count2 + 1
assert ResourceLog.objects.latest('id').level == 'info'
assert ResourceLog.objects.latest('id').message == u'connector "éléphant" (Feed) is back up'
# move 3 minutes in the future
freezer.move_to(datetime.timedelta(seconds=60 * 3 + 1))
# last message is a GET
with HTTMock(up_mock): # set connector as up
connector.availability()
assert not connector.down()
last_count4 = ResourceLog.objects.count()
assert last_count4 == last_count3 + 1
assert ResourceLog.objects.latest('id').level == 'info'
assert ResourceLog.objects.latest('id').message == 'GET http://example.net/ (=> 200)'