190 lines
6.6 KiB
Python
190 lines
6.6 KiB
Python
import datetime
|
|
from io import StringIO
|
|
from unittest import mock
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.core.files import File
|
|
from django.db import connection
|
|
from django.db.migrations.executor import MigrationExecutor
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from requests.exceptions import ReadTimeout
|
|
|
|
from passerelle.apps.base_adresse.models import BaseAdresse
|
|
from passerelle.apps.clicrdv.models import ClicRdv
|
|
from passerelle.apps.opengis.models import OpenGIS
|
|
from passerelle.base.models import ResourceLog
|
|
from tests.test_manager import login
|
|
|
|
|
|
def test_get_description_url_fields(db):
|
|
connector = OpenGIS(slug='plop', wms_service_url='http://www.example.net')
|
|
assert 'http://www.example.net' in [x[1] for x in connector.get_description_fields()]
|
|
|
|
connector = OpenGIS(slug='plop', wms_service_url='http://username:secret@www.example.net')
|
|
assert 'http://***:***@www.example.net' in [x[1] for x in connector.get_description_fields()]
|
|
|
|
connector = OpenGIS(slug='plop', wms_service_url='http://username@example.net:secret@www.example.net')
|
|
assert 'http://***:***@www.example.net' in [x[1] for x in connector.get_description_fields()]
|
|
|
|
|
|
def test_get_description_secret_fields(db):
|
|
connector = ClicRdv(slug='plop', apikey='secret1', username='plop', password='secret2')
|
|
assert 'secret1' not in [x[1] for x in connector.get_description_fields()]
|
|
assert 'secret2' not in [x[1] for x in connector.get_description_fields()]
|
|
|
|
|
|
def test_log_cleaning(app, db, admin_user, settings):
|
|
ResourceLog.objects.all().delete()
|
|
connector = OpenGIS(slug='plop', wms_service_url='http://www.example.net')
|
|
connector.save()
|
|
connector.logger.error('hello1')
|
|
connector.logger.error('hello2')
|
|
|
|
assert ResourceLog.objects.all().count() == 2
|
|
|
|
ResourceLog.objects.update(timestamp=timezone.now() - datetime.timedelta(days=10))
|
|
connector.logger.error('hello3')
|
|
assert ResourceLog.objects.all().count() == 3
|
|
|
|
settings.LOG_RETENTION_DAYS = 11
|
|
connector.daily()
|
|
assert ResourceLog.objects.all().count() == 3
|
|
settings.LOG_RETENTION_DAYS = 10
|
|
connector.daily()
|
|
assert ResourceLog.objects.all().count() == 1
|
|
|
|
ResourceLog.objects.all().delete()
|
|
connector.logger.error('hello1')
|
|
connector.logger.error('hello2')
|
|
assert ResourceLog.objects.all().count() == 2
|
|
ResourceLog.objects.update(timestamp=timezone.now() - datetime.timedelta(days=10))
|
|
connector.logger.error('hello3')
|
|
assert ResourceLog.objects.all().count() == 3
|
|
|
|
url = reverse(
|
|
'logging-parameters',
|
|
kwargs={
|
|
'resource_type': ContentType.objects.get_for_model(connector).id,
|
|
'resource_pk': connector.id,
|
|
},
|
|
)
|
|
app = login(app)
|
|
resp = app.get(url)
|
|
assert not resp.html.find('input', {'name': 'log_retention_days'}).has_attr('value')
|
|
resp.form['log_retention_days'] = '11'
|
|
resp.form.submit()
|
|
connector.daily()
|
|
assert ResourceLog.objects.all().count() == 3
|
|
|
|
resp = app.get(url)
|
|
assert int(resp.html.find('input', {'name': 'log_retention_days'})['value']) == 11
|
|
resp.form['log_retention_days'] = '10'
|
|
resp.form.submit()
|
|
connector.daily()
|
|
assert ResourceLog.objects.all().count() == 1
|
|
|
|
|
|
@pytest.fixture
|
|
def email_handler():
|
|
import logging
|
|
|
|
from django.utils.log import AdminEmailHandler
|
|
|
|
root = logging.getLogger()
|
|
handler = AdminEmailHandler(include_html=True)
|
|
handler.level = logging.ERROR
|
|
root.handlers.append(handler)
|
|
try:
|
|
yield
|
|
finally:
|
|
root.handlers.remove(handler)
|
|
|
|
|
|
def test_trace_emails(app, settings, dummy_csv_datasource, email_handler, mailoutbox):
|
|
from tests.utils import generic_endpoint_url
|
|
|
|
settings.ADMINS = [('admin', 'admin@example.net')]
|
|
|
|
logging_parameters = dummy_csv_datasource.logging_parameters
|
|
logging_parameters.save()
|
|
|
|
assert not mailoutbox
|
|
|
|
with patch.object(
|
|
dummy_csv_datasource.__class__, 'execute_query', side_effect=ValueError('coin'), autospec=True
|
|
):
|
|
|
|
app.get(
|
|
generic_endpoint_url(
|
|
connector='csvdatasource', endpoint='query/dummy-query/', slug=dummy_csv_datasource.slug
|
|
),
|
|
status=500,
|
|
)
|
|
|
|
assert mailoutbox[0].to == ['admin@example.net']
|
|
|
|
idx = len(mailoutbox)
|
|
logging_parameters.trace_emails = 'john.doe@example.net'
|
|
logging_parameters.save()
|
|
app.get(
|
|
generic_endpoint_url(
|
|
connector='csvdatasource', endpoint='query/dummy-query/', slug=dummy_csv_datasource.slug
|
|
),
|
|
status=500,
|
|
)
|
|
|
|
assert mailoutbox[0].to == ['admin@example.net']
|
|
assert mailoutbox[idx].to == ['john.doe@example.net']
|
|
|
|
|
|
@pytest.mark.xfail
|
|
def test_jsonb_migration(transactional_db):
|
|
app = 'csvdatasource'
|
|
|
|
migrate_from = [(app, '0017_auto_20200504_1402')]
|
|
migrate_to = [(app, '0018_text_to_jsonb')]
|
|
executor = MigrationExecutor(connection)
|
|
old_apps = executor.loader.project_state(migrate_from).apps
|
|
# state of the db is not important
|
|
executor.migrate(migrate_from, fake=True)
|
|
|
|
data = {'data': {'test': 1}}
|
|
CsvDataSource = old_apps.get_model(app, 'CsvDataSource')
|
|
connector = CsvDataSource.objects.create(csv_file=File(StringIO(''), 't.csv'), _dialect_options=data)
|
|
pk = connector.pk
|
|
|
|
field = CsvDataSource._meta.get_field('_dialect_options')
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(
|
|
'ALTER TABLE {table} ALTER COLUMN {col} TYPE text USING {col}::text;'.format(
|
|
table=CsvDataSource._meta.db_table, col=field.get_attname_column()[1]
|
|
)
|
|
)
|
|
connector = CsvDataSource.objects.get(pk=pk)
|
|
# db is in a broken state
|
|
assert connector._dialect_options != data
|
|
|
|
# ensure migration fixes it
|
|
executor = MigrationExecutor(connection)
|
|
executor.migrate(migrate_to)
|
|
executor.loader.build_graph()
|
|
|
|
apps = executor.loader.project_state(migrate_to).apps
|
|
CsvDataSource = apps.get_model(app, 'CsvDataSource')
|
|
connector = CsvDataSource.objects.get(pk=pk)
|
|
assert connector._dialect_options == data
|
|
|
|
|
|
def test_check_status_no_traceback_email(db, settings, email_handler, mailoutbox):
|
|
settings.ADMINS = [('admin', 'admin@example.net')]
|
|
connector = BaseAdresse.objects.create(slug='base-adresse')
|
|
|
|
with mock.patch('requests.sessions.HTTPAdapter.send', autospec=True, side_effect=ReadTimeout('timeout')):
|
|
connector.availability()
|
|
assert len(mailoutbox) == 1
|
|
assert 'is now down' in mailoutbox[0].subject
|
|
assert ResourceLog.objects.exists()
|