147 lines
5.0 KiB
Python
147 lines
5.0 KiB
Python
import datetime
|
|
from unittest import mock
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from requests.exceptions import ReadTimeout
|
|
|
|
from passerelle.apps.base_adresse.models import BaseAdresse
|
|
from passerelle.apps.clicrdv.models import ClicRdv
|
|
from passerelle.apps.opengis.models import OpenGIS
|
|
from passerelle.base.models import ResourceLog
|
|
from tests.test_manager import login
|
|
|
|
|
|
def test_get_description_url_fields(db):
|
|
connector = OpenGIS(slug='plop', wms_service_url='http://www.example.net')
|
|
assert 'http://www.example.net' in [x[1] for x in connector.get_description_fields()]
|
|
|
|
connector = OpenGIS(slug='plop', wms_service_url='http://username:secret@www.example.net')
|
|
assert 'http://***:***@www.example.net' in [x[1] for x in connector.get_description_fields()]
|
|
|
|
connector = OpenGIS(slug='plop', wms_service_url='http://username@example.net:secret@www.example.net')
|
|
assert 'http://***:***@www.example.net' in [x[1] for x in connector.get_description_fields()]
|
|
|
|
|
|
def test_get_description_secret_fields(db):
|
|
connector = ClicRdv(slug='plop', apikey='secret1', username='plop', password='secret2')
|
|
assert 'secret1' not in [x[1] for x in connector.get_description_fields()]
|
|
assert 'secret2' not in [x[1] for x in connector.get_description_fields()]
|
|
|
|
|
|
def test_log_cleaning(app, db, admin_user, settings):
|
|
ResourceLog.objects.all().delete()
|
|
connector = OpenGIS(slug='plop', wms_service_url='http://www.example.net')
|
|
connector.save()
|
|
connector.logger.error('hello1')
|
|
connector.logger.error('hello2')
|
|
|
|
assert ResourceLog.objects.all().count() == 2
|
|
|
|
ResourceLog.objects.update(timestamp=timezone.now() - datetime.timedelta(days=10))
|
|
connector.logger.error('hello3')
|
|
assert ResourceLog.objects.all().count() == 3
|
|
|
|
settings.LOG_RETENTION_DAYS = 11
|
|
connector.daily()
|
|
assert ResourceLog.objects.all().count() == 3
|
|
settings.LOG_RETENTION_DAYS = 10
|
|
connector.daily()
|
|
assert ResourceLog.objects.all().count() == 1
|
|
|
|
ResourceLog.objects.all().delete()
|
|
connector.logger.error('hello1')
|
|
connector.logger.error('hello2')
|
|
assert ResourceLog.objects.all().count() == 2
|
|
ResourceLog.objects.update(timestamp=timezone.now() - datetime.timedelta(days=10))
|
|
connector.logger.error('hello3')
|
|
assert ResourceLog.objects.all().count() == 3
|
|
|
|
url = reverse(
|
|
'logging-parameters',
|
|
kwargs={
|
|
'resource_type': ContentType.objects.get_for_model(connector).id,
|
|
'resource_pk': connector.id,
|
|
},
|
|
)
|
|
app = login(app)
|
|
resp = app.get(url)
|
|
assert not resp.html.find('input', {'name': 'log_retention_days'}).has_attr('value')
|
|
resp.form['log_retention_days'] = '11'
|
|
resp.form.submit()
|
|
connector.daily()
|
|
assert ResourceLog.objects.all().count() == 3
|
|
|
|
resp = app.get(url)
|
|
assert int(resp.html.find('input', {'name': 'log_retention_days'})['value']) == 11
|
|
resp.form['log_retention_days'] = '10'
|
|
resp.form.submit()
|
|
connector.daily()
|
|
assert ResourceLog.objects.all().count() == 1
|
|
|
|
|
|
@pytest.fixture
|
|
def email_handler():
|
|
import logging
|
|
|
|
from django.utils.log import AdminEmailHandler
|
|
|
|
root = logging.getLogger()
|
|
handler = AdminEmailHandler(include_html=True)
|
|
handler.level = logging.ERROR
|
|
root.handlers.append(handler)
|
|
try:
|
|
yield
|
|
finally:
|
|
root.handlers.remove(handler)
|
|
|
|
|
|
def test_trace_emails(app, settings, dummy_csv_datasource, email_handler, mailoutbox):
|
|
from tests.utils import generic_endpoint_url
|
|
|
|
settings.ADMINS = [('admin', 'admin@example.net')]
|
|
|
|
logging_parameters = dummy_csv_datasource.logging_parameters
|
|
logging_parameters.save()
|
|
|
|
assert not mailoutbox
|
|
|
|
with patch.object(
|
|
dummy_csv_datasource.__class__, 'execute_query', side_effect=ValueError('coin'), autospec=True
|
|
):
|
|
app.get(
|
|
generic_endpoint_url(
|
|
connector='csvdatasource', endpoint='query/dummy-query/', slug=dummy_csv_datasource.slug
|
|
),
|
|
status=500,
|
|
)
|
|
|
|
assert mailoutbox[0].to == ['admin@example.net']
|
|
|
|
idx = len(mailoutbox)
|
|
logging_parameters.trace_emails = 'john.doe@example.net'
|
|
logging_parameters.save()
|
|
app.get(
|
|
generic_endpoint_url(
|
|
connector='csvdatasource', endpoint='query/dummy-query/', slug=dummy_csv_datasource.slug
|
|
),
|
|
status=500,
|
|
)
|
|
|
|
assert mailoutbox[0].to == ['admin@example.net']
|
|
assert mailoutbox[idx].to == ['john.doe@example.net']
|
|
|
|
|
|
def test_check_status_no_traceback_email(db, settings, email_handler, mailoutbox):
|
|
settings.ADMINS = [('admin', 'admin@example.net')]
|
|
connector = BaseAdresse.objects.create(slug='base-adresse')
|
|
|
|
with mock.patch('requests.sessions.HTTPAdapter.send', autospec=True, side_effect=ReadTimeout('timeout')):
|
|
connector.availability()
|
|
assert len(mailoutbox) == 1
|
|
assert 'is now down' in mailoutbox[0].subject
|
|
assert ResourceLog.objects.exists()
|