# passerelle - uniform access to multiple data sources and services # Copyright (C) 2020 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json from unittest import mock import pytest from requests.exceptions import ConnectionError import tests.utils from passerelle.apps.opendatasoft.models import OpenDataSoft, Query from passerelle.utils import import_site from tests.test_manager import login pytestmark = pytest.mark.django_db FAKED_CONTENT_Q_SEARCH = json.dumps( { 'nhits': 76, 'parameters': { 'dataset': 'referentiel-adresse-test', 'format': 'json', 'q': "rue de l'aubepine", 'rows': 3, 'timezone': 'UTC', }, 'records': [ { 'datasetid': 'referentiel-adresse-test', 'fields': { 'adresse_complete': "33 RUE DE L'AUBEPINE STRASBOURG", 'date_exprt': '2019-10-23', 'geo_point': [48.6060963542, 7.76978279836], 'nom_commun': 'Strasbourg', 'nom_rue': "RUE DE L'AUBEPINE", 'num_com': 482, 'numero': '33', 'source': 'Ville et Eurométropole de Strasbourg', }, 'geometry': {'coordinates': [7.76978279836, 48.6060963542], 'type': 'Point'}, 'record_timestamp': '2019-12-02T14:15:08.376000+00:00', 'recordid': 'e00cf6161e52a4c8fe510b2b74d4952036cb3473', }, { 'datasetid': 'referentiel-adresse-test', 'fields': { 'adresse_complete': "19 RUE DE L'AUBEPINE LIPSHEIM", 'date_exprt': '2019-10-23', 'geo_point': [48.4920620548, 7.66177412454], 'nom_commun': 'Lipsheim', 'nom_rue': "RUE DE L'AUBEPINE", 'num_com': 268, 'numero': '19', 'source': 'Ville et Eurométropole de Strasbourg', }, 'geometry': {'coordinates': [7.66177412454, 48.4920620548], 'type': 'Point'}, 'record_timestamp': '2019-12-02T14:15:08.376000+00:00', 'recordid': '7cafcd5c692773e8b863587b2d38d6be82e023d8', }, { 'datasetid': 'referentiel-adresse-test', 'fields': { 'adresse_complete': "29 RUE DE L'AUBEPINE STRASBOURG", 'date_exprt': '2019-10-23', 'geo_point': [48.6056497224, 7.76988497729], 'nom_commun': 'Strasbourg', 'nom_rue': "RUE DE L'AUBEPINE", 'num_com': 482, 'numero': '29', 'source': 'Ville et Eurométropole de Strasbourg', }, 'geometry': {'coordinates': [7.76988497729, 48.6056497224], 'type': 'Point'}, 'record_timestamp': '2019-12-02T14:15:08.376000+00:00', 'recordid': '0984a5e1745701f71c91af73ce764e1f7132e0ff', }, ], } ) FAKED_CONTENT_ID_SEARCH = json.dumps( { 'nhits': 1, 'parameters': { 'dataset': 'referentiel-adresse-test', 'format': 'json', 'q': 'recordid:7cafcd5c692773e8b863587b2d38d6be82e023d8', 'rows': 1, 'timezone': 'UTC', }, 'records': [ { 'datasetid': 'referentiel-adresse-test', 'fields': { 'adresse_complete': "19 RUE DE L'AUBEPINE LIPSHEIM", 'date_exprt': '2019-10-23', 'geo_point': [48.4920620548, 7.66177412454], 'nom_commun': 'Lipsheim', 'nom_rue': "RUE DE L'AUBEPINE", 'num_com': 268, 'numero': '19', 'source': 'Ville et Eurométropole de Strasbourg', }, 'geometry': {'coordinates': [7.66177412454, 48.4920620548], 'type': 'Point'}, 'record_timestamp': '2019-12-02T14:15:08.376000+00:00', 'recordid': '7cafcd5c692773e8b863587b2d38d6be82e023d8', } ], } ) @pytest.fixture def connector(): return tests.utils.setup_access_rights( OpenDataSoft.objects.create( slug='my_connector', service_url='http://www.example.net', api_key='my_secret', ) ) @pytest.fixture def query(connector): return Query.objects.create( resource=connector, name='Référenciel adresses de test', slug='my_query', description='Rechercher une adresse', dataset='referentiel-adresse-test', text_template='{{numero}} {{nom_rue}} {{nom_commun}}', filter_expression=''' refine.source=Ville et Eurométropole de Strasbourg exclude.numero=42 exclude.numero=43 ''', sort='-nom_rue', limit=3, ) def test_views(db, admin_user, app, connector): app = login(app) resp = app.get('/opendatasoft/my_connector/', status=200) resp = resp.click('New Query') resp.form['name'] = 'my query' resp.form['slug'] = 'my-query' resp.form['dataset'] = 'my-dataset' resp = resp.form.submit() resp = resp.follow() assert resp.html.find('div', {'id': 'panel-queries'}).ul.li.a.text == 'my query' def test_export_import(query): assert OpenDataSoft.objects.count() == 1 assert Query.objects.count() == 1 serialization = {'resources': [query.resource.export_json()]} OpenDataSoft.objects.all().delete() assert OpenDataSoft.objects.count() == 0 assert Query.objects.count() == 0 import_site(serialization) assert OpenDataSoft.objects.count() == 1 assert Query.objects.count() == 1 @mock.patch('passerelle.utils.Request.get') def test_search_empty_contents(mocked_get, app, connector): endpoint = tests.utils.generic_endpoint_url('opendatasoft', 'search', slug=connector.slug) assert endpoint == '/opendatasoft/my_connector/search' # error returned by opendadasoft server json_response = json.dumps({'error': "The query is invalid : Field 00 doesn't exist"}) mocked_get.return_value = tests.utils.FakedResponse(content=json_response, status_code=200) resp = app.get(endpoint, status=200) assert resp.json['err'] assert resp.json['err_desc'] == "The query is invalid : Field 00 doesn't exist" @mock.patch('passerelle.utils.Request.get') def test_search(mocked_get, app, connector): endpoint = tests.utils.generic_endpoint_url('opendatasoft', 'search', slug=connector.slug) assert endpoint == '/opendatasoft/my_connector/search' params = { 'dataset': 'referentiel-adresse-test', 'text_template': '{{numero}} {{nom_rue}} {{nom_commun}}', 'sort': '-nom_rue', 'limit': 3, } mocked_get.return_value = tests.utils.FakedResponse(content=FAKED_CONTENT_Q_SEARCH, status_code=200) resp = app.get(endpoint, params=params, status=200) assert mocked_get.call_args[1]['params'] == { 'apikey': 'my_secret', 'dataset': 'referentiel-adresse-test', 'sort': '-nom_rue', 'rows': '3', } assert not resp.json['err'] assert len(resp.json['data']) == 3 # check order is kept assert [x['id'] for x in resp.json['data']] == [ 'e00cf6161e52a4c8fe510b2b74d4952036cb3473', '7cafcd5c692773e8b863587b2d38d6be82e023d8', '0984a5e1745701f71c91af73ce764e1f7132e0ff', ] # check text results assert [x['text'] for x in resp.json['data']] == [ "33 RUE DE L'AUBEPINE Strasbourg", "19 RUE DE L'AUBEPINE Lipsheim", "29 RUE DE L'AUBEPINE Strasbourg", ] # check additional attributes assert [x['numero'] for x in resp.json['data']] == ['33', '19', '29'] @mock.patch('passerelle.utils.Request.get') def test_search_using_q(mocked_get, app, connector): endpoint = tests.utils.generic_endpoint_url('opendatasoft', 'search', slug=connector.slug) assert endpoint == '/opendatasoft/my_connector/search' params = { 'dataset': 'referentiel-adresse-test', 'text_template': '{{numero}} {{nom_rue}} {{nom_commun}}', 'sort': '-nom_rue', 'limit': '3', 'q': "rue de l'aubepine", } mocked_get.return_value = tests.utils.FakedResponse(content=FAKED_CONTENT_Q_SEARCH, status_code=200) resp = app.get(endpoint, params=params, status=200) assert mocked_get.call_args[1]['params'] == { 'apikey': 'my_secret', 'dataset': 'referentiel-adresse-test', 'rows': '3', 'q': 'rue de aubepine', } assert not resp.json['err'] assert len(resp.json['data']) == 3 # check order is kept assert [x['id'] for x in resp.json['data']] == [ 'e00cf6161e52a4c8fe510b2b74d4952036cb3473', '7cafcd5c692773e8b863587b2d38d6be82e023d8', '0984a5e1745701f71c91af73ce764e1f7132e0ff', ] # check text results assert [x['text'] for x in resp.json['data']] == [ "33 RUE DE L'AUBEPINE Strasbourg", "19 RUE DE L'AUBEPINE Lipsheim", "29 RUE DE L'AUBEPINE Strasbourg", ] # check additional attributes assert [x['numero'] for x in resp.json['data']] == ['33', '19', '29'] # check operators are removed params['q'] = 'please, do NOT send boolean operators like And, OR and nOt' resp = app.get(endpoint, params=params, status=200) assert mocked_get.call_args[1]['params']['q'] == 'please do send boolean operators like' params['q'] = 'field operators are almost ignored too:, -, ==, >, <, >=, <=, [start_date TO end_date]' resp = app.get(endpoint, params=params, status=200) assert ( mocked_get.call_args[1]['params']['q'] == 'field operators are almost ignored too start_date TO end_date' ) @mock.patch('passerelle.utils.Request.get') def test_search_using_id(mocked_get, app, connector): endpoint = tests.utils.generic_endpoint_url('opendatasoft', 'search', slug=connector.slug) assert endpoint == '/opendatasoft/my_connector/search' params = { 'dataset': 'referentiel-adresse-test', 'text_template': '{{numero}} {{nom_rue}} {{nom_commun}}', 'id': '7cafcd5c692773e8b863587b2d38d6be82e023d8', } mocked_get.return_value = tests.utils.FakedResponse(content=FAKED_CONTENT_ID_SEARCH, status_code=200) resp = app.get(endpoint, params=params, status=200) assert mocked_get.call_args[1]['params'] == { 'apikey': 'my_secret', 'dataset': 'referentiel-adresse-test', 'q': 'recordid:7cafcd5c692773e8b863587b2d38d6be82e023d8', } assert len(resp.json['data']) == 1 assert resp.json['data'][0]['text'] == "19 RUE DE L'AUBEPINE Lipsheim" @mock.patch('passerelle.utils.Request.get') def test_query_q(mocked_get, app, query): endpoint = '/opendatasoft/my_connector/q/my_query/' params = { 'limit': 3, } mocked_get.return_value = tests.utils.FakedResponse(content=FAKED_CONTENT_Q_SEARCH, status_code=200) resp = app.get(endpoint, params=params, status=200) assert mocked_get.call_args[1]['params'] == { 'apikey': 'my_secret', 'dataset': 'referentiel-adresse-test', 'refine.source': ['Ville et Eurométropole de Strasbourg'], 'exclude.numero': ['42', '43'], 'sort': '-nom_rue', 'rows': 3, } assert not resp.json['err'] assert len(resp.json['data']) == 3 # check order is kept assert [x['id'] for x in resp.json['data']] == [ 'e00cf6161e52a4c8fe510b2b74d4952036cb3473', '7cafcd5c692773e8b863587b2d38d6be82e023d8', '0984a5e1745701f71c91af73ce764e1f7132e0ff', ] # check text results assert [x['text'] for x in resp.json['data']] == [ "33 RUE DE L'AUBEPINE Strasbourg", "19 RUE DE L'AUBEPINE Lipsheim", "29 RUE DE L'AUBEPINE Strasbourg", ] # check additional attributes assert [x['numero'] for x in resp.json['data']] == ['33', '19', '29'] @mock.patch('passerelle.utils.Request.get') def test_query_q_using_q(mocked_get, app, query): endpoint = '/opendatasoft/my_connector/q/my_query/' params = { 'q': "rue de l'aubepine", } mocked_get.return_value = tests.utils.FakedResponse(content=FAKED_CONTENT_Q_SEARCH, status_code=200) resp = app.get(endpoint, params=params, status=200) assert mocked_get.call_args[1]['params'] == { 'apikey': 'my_secret', 'dataset': 'referentiel-adresse-test', 'refine.source': ['Ville et Eurométropole de Strasbourg'], 'exclude.numero': ['42', '43'], 'rows': 3, 'q': 'rue de aubepine', } assert not resp.json['err'] assert len(resp.json['data']) == 3 # check order is kept assert [x['id'] for x in resp.json['data']] == [ 'e00cf6161e52a4c8fe510b2b74d4952036cb3473', '7cafcd5c692773e8b863587b2d38d6be82e023d8', '0984a5e1745701f71c91af73ce764e1f7132e0ff', ] # check text results assert [x['text'] for x in resp.json['data']] == [ "33 RUE DE L'AUBEPINE Strasbourg", "19 RUE DE L'AUBEPINE Lipsheim", "29 RUE DE L'AUBEPINE Strasbourg", ] # check additional attributes assert [x['numero'] for x in resp.json['data']] == ['33', '19', '29'] @mock.patch('passerelle.utils.Request.get') def test_query_q_using_id(mocked_get, app, query): endpoint = '/opendatasoft/my_connector/q/my_query/' params = { 'id': '7cafcd5c692773e8b863587b2d38d6be82e023d8', } mocked_get.return_value = tests.utils.FakedResponse(content=FAKED_CONTENT_ID_SEARCH, status_code=200) resp = app.get(endpoint, params=params, status=200) assert mocked_get.call_args[1]['params'] == { 'apikey': 'my_secret', 'dataset': 'referentiel-adresse-test', 'refine.source': ['Ville et Eurométropole de Strasbourg'], 'exclude.numero': ['42', '43'], 'rows': 3, 'q': 'recordid:7cafcd5c692773e8b863587b2d38d6be82e023d8', } assert len(resp.json['data']) == 1 assert resp.json['data'][0]['text'] == "19 RUE DE L'AUBEPINE Lipsheim" def test_opendatasoft_query_unicity(admin_user, app, connector, query): connector2 = OpenDataSoft.objects.create( slug='my_connector2', api_key='my_secret', ) Query.objects.create( resource=connector2, name='Foo Bar', slug='foo-bar', ) app = login(app) resp = app.get('/manage/opendatasoft/%s/query/new/' % connector.slug) resp.form['slug'] = query.slug resp.form['name'] = 'Foo Bar' resp.form['dataset'] = 'my-dataset' resp = resp.form.submit() assert resp.status_code == 200 assert Query.objects.filter(resource=connector).count() == 1 assert 'A query with this slug already exists' in resp.text resp.form['slug'] = 'foo-bar' resp.form['name'] = query.name resp.form['dataset'] = 'my-dataset' resp = resp.form.submit() assert Query.objects.filter(resource=connector).count() == 1 assert resp.status_code == 200 assert 'A query with this name already exists' in resp.text resp.form['slug'] = 'foo-bar' resp.form['name'] = 'Foo Bar' resp.form['dataset'] = 'my-dataset' resp = resp.form.submit() assert resp.status_code == 302 assert Query.objects.filter(resource=connector).count() == 2 new_query = Query.objects.latest('pk') assert new_query.resource == connector resp = app.get('/manage/opendatasoft/%s/query/%s/' % (connector.slug, new_query.pk)) resp.form['slug'] = query.slug resp.form['name'] = 'Foo Bar' resp = resp.form.submit() assert resp.status_code == 200 assert 'A query with this slug already exists' in resp.text resp.form['slug'] = 'foo-bar' resp.form['name'] = query.name resp = resp.form.submit() assert resp.status_code == 200 assert 'A query with this name already exists' in resp.text resp.form['slug'] = 'foo-bar' resp.form['name'] = 'Foo Bar' resp = resp.form.submit() assert resp.status_code == 302 @mock.patch('passerelle.utils.Request.get') def test_query_q_having_original_fields(mocked_get, app, query): endpoint = '/opendatasoft/my_connector/q/my_query/' params = { 'id': '7cafcd5c692773e8b863587b2d38d6be82e023d8', } content = json.loads(FAKED_CONTENT_ID_SEARCH) content['records'][0]['fields']['id'] = 'original id' content['records'][0]['fields']['text'] = 'original text' query.text_template = '{{id}} - {{original_id}} - {{original_text}}' query.save() mocked_get.return_value = tests.utils.FakedResponse(content=json.dumps(content), status_code=200) resp = app.get(endpoint, params=params, status=200) assert resp.json['data'][0]['original_id'] == 'original id' assert resp.json['data'][0]['original_text'] == 'original text' assert ( resp.json['data'][0]['text'] == '7cafcd5c692773e8b863587b2d38d6be82e023d8 - original id - original text' ) def test_call_search_errors(app, connector): endpoint = tests.utils.generic_endpoint_url('opendatasoft', 'search', slug=connector.slug) assert endpoint == '/opendatasoft/my_connector/search' url = connector.service_url + '/api/records/1.0/search/' # Connection error exception = ConnectionError('Remote end closed connection without response') with tests.utils.mock_url(url=url, exception=exception): resp = app.get(endpoint) assert resp.json['err'] assert resp.json['err_desc'] == 'OpenDataSoft error: Remote end closed connection without response' # API error, provides HTTP status code with tests.utils.mock_url(url=url, response='{"error": "Unknown dataset: foo"}', status_code=404): resp = app.get(endpoint) assert resp.json['err'] assert resp.json['err_desc'] == 'Unknown dataset: foo' # HTTP error with tests.utils.mock_url(url=url, response='not a json content', reason='Not Found', status_code=404): resp = app.get(endpoint) assert resp.json['err'] assert 'OpenDataSoft error: 404 Client Error: Not Found' in resp.json['err_desc'] # bad JSON response with tests.utils.mock_url(url=url, response='not a json content', status_code=200): resp = app.get(endpoint) assert resp.json['err'] assert resp.json['err_desc'] == 'OpenDataSoft error: bad JSON response'