import datetime from io import StringIO from unittest import mock from unittest.mock import patch import pytest from django.contrib.contenttypes.models import ContentType from django.core.files import File from django.db import connection from django.db.migrations.executor import MigrationExecutor from django.urls import reverse from django.utils import timezone from requests.exceptions import ReadTimeout from passerelle.apps.base_adresse.models import BaseAdresse from passerelle.apps.clicrdv.models import ClicRdv from passerelle.apps.opengis.models import OpenGIS from passerelle.base.models import ResourceLog from tests.test_manager import login def test_get_description_url_fields(db): connector = OpenGIS(slug='plop', wms_service_url='http://www.example.net') assert 'http://www.example.net' in [x[1] for x in connector.get_description_fields()] connector = OpenGIS(slug='plop', wms_service_url='http://username:secret@www.example.net') assert 'http://***:***@www.example.net' in [x[1] for x in connector.get_description_fields()] connector = OpenGIS(slug='plop', wms_service_url='http://username@example.net:secret@www.example.net') assert 'http://***:***@www.example.net' in [x[1] for x in connector.get_description_fields()] def test_get_description_secret_fields(db): connector = ClicRdv(slug='plop', apikey='secret1', username='plop', password='secret2') assert 'secret1' not in [x[1] for x in connector.get_description_fields()] assert 'secret2' not in [x[1] for x in connector.get_description_fields()] def test_log_cleaning(app, db, admin_user, settings): ResourceLog.objects.all().delete() connector = OpenGIS(slug='plop', wms_service_url='http://www.example.net') connector.save() connector.logger.error('hello1') connector.logger.error('hello2') assert ResourceLog.objects.all().count() == 2 ResourceLog.objects.update(timestamp=timezone.now() - datetime.timedelta(days=10)) connector.logger.error('hello3') assert ResourceLog.objects.all().count() == 3 settings.LOG_RETENTION_DAYS = 11 connector.daily() assert ResourceLog.objects.all().count() == 3 settings.LOG_RETENTION_DAYS = 10 connector.daily() assert ResourceLog.objects.all().count() == 1 ResourceLog.objects.all().delete() connector.logger.error('hello1') connector.logger.error('hello2') assert ResourceLog.objects.all().count() == 2 ResourceLog.objects.update(timestamp=timezone.now() - datetime.timedelta(days=10)) connector.logger.error('hello3') assert ResourceLog.objects.all().count() == 3 url = reverse( 'logging-parameters', kwargs={ 'resource_type': ContentType.objects.get_for_model(connector).id, 'resource_pk': connector.id, }, ) app = login(app) resp = app.get(url) assert not resp.html.find('input', {'name': 'log_retention_days'}).has_attr('value') resp.form['log_retention_days'] = '11' resp.form.submit() connector.daily() assert ResourceLog.objects.all().count() == 3 resp = app.get(url) assert int(resp.html.find('input', {'name': 'log_retention_days'})['value']) == 11 resp.form['log_retention_days'] = '10' resp.form.submit() connector.daily() assert ResourceLog.objects.all().count() == 1 @pytest.fixture def email_handler(): import logging from django.utils.log import AdminEmailHandler root = logging.getLogger() handler = AdminEmailHandler(include_html=True) handler.level = logging.ERROR root.handlers.append(handler) try: yield finally: root.handlers.remove(handler) def test_trace_emails(app, settings, dummy_csv_datasource, email_handler, mailoutbox): from tests.utils import generic_endpoint_url settings.ADMINS = [('admin', 'admin@example.net')] logging_parameters = dummy_csv_datasource.logging_parameters logging_parameters.save() assert not mailoutbox with patch.object( dummy_csv_datasource.__class__, 'execute_query', side_effect=ValueError('coin'), autospec=True ): app.get( generic_endpoint_url( connector='csvdatasource', endpoint='query/dummy-query/', slug=dummy_csv_datasource.slug ), status=500, ) assert mailoutbox[0].to == ['admin@example.net'] idx = len(mailoutbox) logging_parameters.trace_emails = 'john.doe@example.net' logging_parameters.save() app.get( generic_endpoint_url( connector='csvdatasource', endpoint='query/dummy-query/', slug=dummy_csv_datasource.slug ), status=500, ) assert mailoutbox[0].to == ['admin@example.net'] assert mailoutbox[idx].to == ['john.doe@example.net'] @pytest.mark.xfail def test_jsonb_migration(transactional_db): app = 'csvdatasource' migrate_from = [(app, '0017_auto_20200504_1402')] migrate_to = [(app, '0018_text_to_jsonb')] executor = MigrationExecutor(connection) old_apps = executor.loader.project_state(migrate_from).apps # state of the db is not important executor.migrate(migrate_from, fake=True) data = {'data': {'test': 1}} CsvDataSource = old_apps.get_model(app, 'CsvDataSource') connector = CsvDataSource.objects.create(csv_file=File(StringIO(''), 't.csv'), _dialect_options=data) pk = connector.pk field = CsvDataSource._meta.get_field('_dialect_options') with connection.cursor() as cursor: cursor.execute( 'ALTER TABLE {table} ALTER COLUMN {col} TYPE text USING {col}::text;'.format( table=CsvDataSource._meta.db_table, col=field.get_attname_column()[1] ) ) connector = CsvDataSource.objects.get(pk=pk) # db is in a broken state assert connector._dialect_options != data # ensure migration fixes it executor = MigrationExecutor(connection) executor.migrate(migrate_to) executor.loader.build_graph() apps = executor.loader.project_state(migrate_to).apps CsvDataSource = apps.get_model(app, 'CsvDataSource') connector = CsvDataSource.objects.get(pk=pk) assert connector._dialect_options == data def test_check_status_no_traceback_email(db, settings, email_handler, mailoutbox): settings.ADMINS = [('admin', 'admin@example.net')] connector = BaseAdresse.objects.create(slug='base-adresse') with mock.patch('requests.sessions.HTTPAdapter.send', autospec=True, side_effect=ReadTimeout('timeout')): connector.availability() assert len(mailoutbox) == 1 assert 'is now down' in mailoutbox[0].subject assert ResourceLog.objects.exists()