# # passerelle - uniform access to multiple data sources and services # Copyright (C) 2016 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import datetime import json import os import time import uuid from io import StringIO from posix import stat_result from stat import ST_MTIME from unittest import mock from urllib.parse import urlencode import pytest import webtest from django.contrib.contenttypes.models import ContentType from django.core.files import File from django.core.files.storage import default_storage from django.core.management import call_command from django.test import Client, override_settings from django.urls import reverse from django.utils.encoding import force_bytes, force_str from django.utils.timezone import now from passerelle.apps.csvdatasource.models import CsvDataSource, Query, TableRow, upload_to from passerelle.base.models import AccessRight, ApiUser from tests.test_manager import login data = """121;69981;DELANOUE;Eliot;H 525;6;DANIEL WILLIAMS;Shanone;F 253;67742;MARTIN;Sandra;F 511;38142;MARCHETTI;Lucie;F 235;22;MARTIN;Sandra;F 620;52156;ARNAUD;Mathis;H 902;36;BRIGAND;Coline;F 2179;48548;THEBAULT;Salima;F 3420;46;WILSON-LUZAYADIO;Anaëlle;F 1421;55486;WONE;Fadouma;F 2841;51;FIDJI;Zakia;F 2431;59;BELICARD;Sacha;H 4273;60;GOUBERT;Adrien;H 4049;64;MOVSESSIAN;Dimitri;H 4605;67;ABDOU BACAR;Kyle;H 4135;22231;SAVERIAS;Marius;H 4809;75;COROLLER;Maelys;F 5427;117;KANTE;Aliou;H 116642;118;ZAHMOUM;Yaniss;H 216352;38;Dupont;Benoît;H """ data_bom = force_str(force_str(data, 'utf-8').encode('utf-8-sig')) pytestmark = pytest.mark.django_db TEST_BASE_DIR = os.path.join(os.path.dirname(__file__), 'data', 'csvdatasource') def get_file_content(filename): return open(os.path.join(TEST_BASE_DIR, filename), 'rb') @pytest.fixture def setup(): def maker( columns_keynames='fam,id,lname,fname,sex', filename='data.csv', sheet_name='Feuille2', data='', skip_header=False, ): api = ApiUser.objects.create(username='all', keytype='', key='') csv = CsvDataSource.objects.create( csv_file=File(data, filename), sheet_name=sheet_name, columns_keynames=columns_keynames, slug='test', title='a title', description='a description', skip_header=skip_header, ) assert TableRow.objects.filter(resource=csv).count() == len(csv.get_rows()) obj_type = ContentType.objects.get_for_model(csv) AccessRight.objects.create( codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=csv.pk ) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csv.slug, 'endpoint': 'data', }, ) return csv, url return maker def parse_response(response): return json.loads(response.content)['data'] @pytest.fixture def client(): return Client() @pytest.fixture(params=['data.csv', 'data.ods', 'data.xls', 'data.xlsx']) def filetype(request): return request.param @pytest.fixture def sheet_name(filetype): return 'Feuille2' if filetype != 'data.csv' else '' @pytest.fixture def file_content(filetype): with get_file_content(filetype) as fd: yield fd.read() def test_default_column_keynames(setup, filetype): csvdata = CsvDataSource.objects.create( csv_file=File(get_file_content(filetype), filetype), sheet_name='Feuille2', slug='test2', title='a title', description='a description', ) assert len(csvdata.columns_keynames.split(',')) == 2 assert 'id' in csvdata.columns_keynames assert 'text' in csvdata.columns_keynames def test_sheet_name_error(setup, app, filetype, admin_user): setup('field,,another_field,', filename=filetype, data=get_file_content(filetype)) app = login(app) resp = app.get('/manage/csvdatasource/test/edit') edit_form = resp.forms[0] edit_form['sheet_name'] = '' resp = edit_form.submit() if filetype != 'data.csv': assert resp.status_code == 200 assert 'You must specify a sheet name' in resp.text else: assert resp.status_code == 302 def test_unfiltered_data(client, setup, filetype): _, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype)) resp = client.get(url) result = parse_response(resp) for item in result: assert 'field' in item assert 'another_field' in item def test_empty_file(client, setup): _, url = setup( 'field,,another_field,', filename='data-empty.ods', data=get_file_content('data-empty.ods') ) resp = client.get(url) result = parse_response(resp) assert len(result) == 0 def test_view_manage_page(setup, app, filetype, admin_user): csvdata, _ = setup(',id,,text,', filename=filetype, data=get_file_content(filetype)) app = login(app) app.get(csvdata.get_absolute_url()) def test_good_filter_data(client, setup, filetype): filter_criteria = 'Zakia' _, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype)) filters = {'text': filter_criteria} resp = client.get(url, filters) result = parse_response(resp) assert len(result) for item in result: assert 'id' in item assert 'text' in item assert filter_criteria in item['text'] def test_bad_filter_data(client, setup, filetype): filter_criteria = 'bad' _, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype)) filters = {'text': filter_criteria} resp = client.get(url, filters) result = parse_response(resp) assert len(result) == 0 def test_useless_filter_data(client, setup, filetype): _, url = setup('id,,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) filters = {'text': 'Ali'} resp = client.get(url, filters) result = parse_response(resp) assert len(result) == 20 def test_columns_keynames_with_spaces(client, setup, filetype): _, url = setup('id , , nom,text , ', filename=filetype, data=get_file_content(filetype)) filters = {'text': 'Yaniss'} resp = client.get(url, filters) result = parse_response(resp) assert len(result) == 1 def test_skipped_header_data(client, setup, filetype): _, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype), skip_header=True) filters = {'q': 'Eliot'} resp = client.get(url, filters) result = parse_response(resp) assert len(result) == 0 def test_data(client, setup, filetype): _, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype)) filters = {'text': 'Sacha'} resp = client.get(url, filters) result = parse_response(resp) assert result[0] == {'id': '59', 'text': 'Sacha', 'fam': '2431', 'sexe': 'H'} def test_unicode_filter_data(client, setup, filetype): filter_criteria = 'Benoît' _, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype)) filters = {'text': filter_criteria} resp = client.get(url, filters) result = parse_response(resp) assert len(result) for item in result: assert 'id' in item assert 'text' in item assert filter_criteria in item['text'] def test_unicode_case_insensitive_filter_data(client, setup, filetype): _, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype)) filter_criteria = 'anaëlle' filters = {'text': filter_criteria, 'case-insensitive': ''} resp = client.get(url, filters) result = parse_response(resp) assert len(result) for item in result: assert 'id' in item assert 'text' in item assert filter_criteria.lower() in item['text'].lower() def test_data_bom(client, setup): _, url = setup('fam,id,, text,sexe ', data=StringIO(data_bom)) filters = {'text': 'Eliot'} resp = client.get(url, filters) result = parse_response(resp) assert result[0] == {'id': '69981', 'text': 'Eliot', 'fam': '121', 'sexe': 'H'} def test_multi_filter(client, setup, filetype): _, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype)) filters = {'sexe': 'F'} resp = client.get(url, filters) result = parse_response(resp) assert result[0] == {'id': '6', 'text': 'Shanone', 'fam': '525', 'sexe': 'F'} assert len(result) == 10 def test_query(client, setup, filetype): _, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype)) filters = {'q': 'liot'} resp = client.get(url, filters) result = parse_response(resp) assert result[0]['text'] == 'Eliot' assert len(result) == 1 def test_query_insensitive_and_unicode(client, setup, filetype): _, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype)) filters = {'q': 'elIo', 'case-insensitive': ''} resp = client.get(url, filters) result = parse_response(resp) assert result[0]['text'] == 'Eliot' assert len(result) == 1 filters['q'] = 'élIo' resp = client.get(url, filters) result = parse_response(resp) assert result[0]['text'] == 'Eliot' assert len(result) == 1 def test_query_insensitive_and_filter(client, setup, filetype): _, url = setup('fam,id,,text,sexe', filename=filetype, data=get_file_content(filetype)) filters = {'q': 'elIo', 'sexe': 'H', 'case-insensitive': ''} resp = client.get(url, filters) result = parse_response(resp) assert result[0]['text'] == 'Eliot' assert len(result) == 1 def test_dialect(client, setup): csvdata, url = setup(data=StringIO(data)) expected = { 'lineterminator': '\r\n', 'skipinitialspace': False, 'quoting': 0, 'delimiter': ';', 'quotechar': '"', 'doublequote': False, } assert expected == csvdata.dialect_options filters = {'id': '22', 'fname': 'Sandra'} resp = client.get(url, filters) result = parse_response(resp) assert len(result) == 1 assert result[0]['id'] == '22' assert result[0]['lname'] == 'MARTIN' def test_on_the_fly_dialect_detection(client, setup): # fake a connector that was not initialized during .save(), because it's # been migrated and we didn't do dialect detection at save() time. _, url = setup(data=StringIO(data)) CsvDataSource.objects.all().update(_dialect_options=None) resp = client.get(url) result = json.loads(resp.content) assert result['err'] == 0 assert len(result['data']) == 20 def test_missing_columns(client, setup): _, url = setup(data=StringIO(data + 'A;B;C\n')) resp = client.get(url) result = json.loads(resp.content) assert result['err'] == 0 assert len(result['data']) == 21 assert result['data'][-1] == {'lname': 'C', 'sex': '', 'id': 'B', 'fname': '', 'fam': 'A'} def test_unknown_sheet_name(client, setup): csvdata, url = setup('field,,another_field,', filename='data2.ods', data=get_file_content('data2.ods')) csvdata.sheet_name = 'unknown' csvdata.save() resp = client.get(url) result = json.loads(resp.content) assert len(result['data']) == 20 def test_cache_new_shorter_file(client, setup): csvdata, url = setup(data=StringIO(data + 'A;B;C\n')) resp = client.get(url) result = json.loads(resp.content) assert result['err'] == 0 assert len(result['data']) == 21 csvdata.csv_file = File(StringIO(data), 'data.csv') csvdata.save() resp = client.get(url) result = json.loads(resp.content) assert len(result['data']) == 20 def test_query_array(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, structure='array') query.projections = '\n'.join(['id:int(id)', 'prenom:prenom']) query.save() response = app.get(url) assert response.json['err'] == 0 assert len(response.json['data']) for row in response.json['data']: assert len(row) == 2 assert isinstance(row[0], int) assert isinstance(row[1], str) def test_query_q_filter(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, structure='array') query.projections = '\n'.join(['id:int(id)', 'text:prenom']) query.save() response = app.get(url + '?q=Sandra') assert response.json['err'] == 0 assert len(response.json['data']) == 2 response = app.get(url + '?q=sandra') assert response.json['err'] == 0 assert len(response.json['data']) == 2 response = app.get(url + '?' + urlencode({'q': 'Benoît'})) assert response.json['err'] == 0 assert len(response.json['data']) == 1 response = app.get(url + '?q=benoit') assert response.json['err'] == 0 assert len(response.json['data']) == 1 def test_query_dict(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, structure='dict') query.projections = '\n'.join(['id:int(id)', 'prenom:prenom']) query.save() response = app.get(url) assert response.json['err'] == 0 assert len(response.json['data']) for row in response.json['data']: assert len(row) == 2 assert isinstance(row['id'], int) assert isinstance(row['prenom'], str) def test_query_tuples(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, structure='tuples') query.projections = '\n'.join(['id:int(id)', 'prenom:prenom']) query.save() response = app.get(url) assert response.json['err'] == 0 assert len(response.json['data']) for row in response.json['data']: assert len(row) == 2 assert row[0][0] == 'id' assert isinstance(row[0][1], int) assert row[1][0] == 'prenom' assert isinstance(row[1][1], str) def test_query_onerow(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, structure='onerow') query.projections = '\n'.join(['id:int(id)', 'prenom:prenom']) query.filters = 'int(id) == 525' query.save() response = app.get(url) assert response.json['err'] == 0 assert isinstance(response.json['data'], dict) assert response.json['data']['prenom'] == 'Shanone' def test_query_one(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, structure='one') query.projections = 'wtf:prenom' query.filters = 'int(id) == 525' query.save() response = app.get(url) assert response.json['err'] == 0 assert response.json['data'] == 'Shanone' def test_query_filter_param(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, structure='one') query.projections = 'wtf:prenom' query.filters = 'int(id) == int(query.get("foobar"))' query.save() response = app.get(url + '?foobar=525') assert response.json['err'] == 0 assert response.json['data'] == 'Shanone' query.filters = 'int(id) == int(query.get("query"))' query.save() response = app.get(url + '?query=525') assert response.json['err'] == 0 assert response.json['data'] == 'Shanone' def test_query_distinct(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, distinct='sexe') query.save() response = app.get(url) assert response.json['err'] == 0 assert isinstance(response.json['data'], list) assert len(response.json['data']) == 2 def test_query_keyed_distinct(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, distinct='nom', structure='keyed-distinct') query.save() response = app.get(url) assert response.json['err'] == 0 assert isinstance(response.json['data'], dict) assert response.json['data']['MARTIN']['prenom'] == 'Sandra' def test_query_order(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, order='prenom.lower()') query.save() response = app.get(url) assert response.json['err'] == 0 assert isinstance(response.json['data'], list) assert response.json['data'] == sorted(response.json['data'], key=lambda row: row['prenom'].lower()) def test_query_order_missing_column(app, setup): csvdata, url = setup(data=StringIO(data + '42;42;STOPHERE\n')) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, order='fname') query.save() response = app.get(url) assert response.json['err'] == 0 assert isinstance(response.json['data'], list) assert response.json['data'] == sorted(response.json['data'], key=lambda row: row['fname']) def test_query_error(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) response = app.get(url) assert response.json['err'] == 1 assert response.json['err_desc'] == 'no such query' query = Query(slug='query-1_', resource=csvdata) query.save() query.projections = '1a:prenom' query.save() response = app.get(url) assert response.json['err'] == 1 assert response.json['err_desc'] == 'invalid projection name' assert response.json['data'] == '1a' query.projections = 'id:prenom.' query.save() response = app.get(url) assert response.json['err'] == 1 assert response.json['err_desc'] == 'invalid projection expression' assert response.json['data']['name'] == 'id' assert response.json['data']['expr'] == 'prenom.' query.projections = 'id:prenom not' query.save() response = app.get(url) assert response.json['err'] == 1 assert response.json['err_desc'] == 'invalid projection expression' assert response.json['data']['name'] == 'id' assert response.json['data']['expr'] == 'prenom not' query.projections = 'id:zob' query.save() response = app.get(url) assert response.json['err'] == 1 assert response.json['err_desc'] == 'invalid projection expression' assert response.json['data']['name'] == 'id' assert response.json['data']['expr'] == 'zob' assert 'row' in response.json['data'] query.projections = '' query.filters = 'prenom.' query.save() response = app.get(url) assert response.json['err'] == 1 assert response.json['err_desc'] == 'invalid filters expression' assert response.json['data']['expr'] == 'prenom.' query.filters = 'zob' query.save() response = app.get(url) assert response.json['err'] == 1 assert response.json['err_desc'] == 'invalid filters expression' assert response.json['data']['error'] == "name 'zob' is not defined" assert response.json['data']['expr'] == 'zob' assert 'row' in response.json['data'] query.filters = '' query.order = 'prenom.' query.save() response = app.get(url) assert response.json['err'] == 1 assert response.json['err_desc'] == 'invalid order expression' assert response.json['data']['expr'] == 'prenom.' query.order = 'zob' query.save() response = app.get(url) assert response.json['err'] == 1 assert response.json['err_desc'] == 'invalid order expression' assert response.json['data']['expr'] == 'zob' assert 'row' in response.json['data'] query.order = '' query.distinct = 'prenom.' query.save() response = app.get(url) assert response.json['err'] == 1 assert response.json['err_desc'] == 'invalid distinct expression' assert response.json['data']['expr'] == 'prenom.' query.distinct = 'zob' query.save() response = app.get(url) assert response.json['err'] == 1 assert response.json['err_desc'] == 'invalid distinct expression' assert response.json['data']['expr'] == 'zob' assert 'row' in response.json['data'] def test_edit_connector_queries(admin_user, app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse('view-connector', kwargs={'connector': 'csvdatasource', 'slug': csvdata.slug}) resp = app.get(url) assert 'New Query' not in resp.text new_query_url = reverse('csv-new-query', kwargs={'connector_slug': csvdata.slug}) resp = app.get(new_query_url, status=302) assert resp.location.endswith('/login/?next=%s' % new_query_url) app = login(app) resp = app.get(url) resp = resp.click('New Query') resp.form['slug'] = 'foobar' resp.form['label'] = 'Lucy alone' resp.form['filters'] = 'int(id) =< 525' resp = resp.form.submit() assert 'Syntax error' in resp.text resp.form['filters'] = 'int(id) == 525' resp.form['projections'] = 'id' resp = resp.form.submit() assert 'Syntax error' in resp.text resp.form['projections'] = 'id:id\nprenom:prenom' resp = resp.form.submit().follow() resp = resp.click('foobar', index=1) # 0th is the newly created endpoint resp.form['filters'] = 'int(id) == 511' resp.form['description'] = 'in the sky without diamonds' resp = resp.form.submit().follow() assert 'Lucy alone' in resp.text assert 'in the sky without diamonds' in resp.text resp = resp.click('foobar', index=0) # 0th is the newly created endpoint assert len(resp.json['data']) == 1 assert resp.json['data'][0]['prenom'] == 'Lucie' def test_download_file(app, setup, filetype, admin_user): setup('field,,another_field,', filename=filetype, data=get_file_content(filetype)) assert '/login' in app.get('/manage/csvdatasource/test/download/').location app = login(app) resp = app.get('/manage/csvdatasource/test/download/', status=200) if filetype == 'data.csv': assert resp.headers['Content-Type'] == 'text/csv; charset=utf-8' assert resp.headers['Content-Length'] == str(len(force_bytes(data))) elif filetype == 'data.ods': assert resp.headers['Content-Type'] == 'application/vnd.oasis.opendocument.spreadsheet' elif filetype == 'data.xls': assert resp.headers['Content-Type'] == 'application/vnd.ms-excel' elif filetype == 'data.xlsx': assert ( resp.headers['Content-Type'] == 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) def test_query_filter_multiline(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata) query.filters = '\n'.join(['int(id) <= 525', 'int(id) >= 511']) query.save() response = app.get(url) assert response.json['err'] == 0 assert len(response.json['data']) == 2 def test_query_builtin_id_filter(app, setup, filetype): csvdata, _url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata) query.save() assert len(app.get(url).json['data']) == 20 assert len(app.get(url + '?id=121').json['data']) == 1 assert app.get(url + '?id=121').json['data'][0]['prenom'] == 'Eliot' assert app.get(url + '?id=525').json['data'][0]['prenom'] == 'Shanone' # check when combined with another filter query.filters = 'sexe == "H"' query.save() assert len(app.get(url).json['data']) == 10 assert app.get(url + '?id=121').json['data'][0]['prenom'] == 'Eliot' assert len(app.get(url + '?id=525').json['data']) == 0 def test_delete_connector_query(admin_user, app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse('view-connector', kwargs={'connector': 'csvdatasource', 'slug': csvdata.slug}) resp = app.get(url) query = Query(slug='query-1_', resource=csvdata, structure='array') query.projections = '\n'.join(['id:int(id)', 'prenom:prenom']) query.save() app = login(app) resp = app.get(url) resp = resp.click('delete query') assert 'Are you sure' in resp.text resp = resp.form.submit('delete') resp = resp.follow() assert 'No query are defined.' in resp.text def test_csv_sniffer(admin_user, app): app = login(app) resp = app.get('/manage/csvdatasource/add') form = resp.form form.set('title', 'Title') form.set('description', 'Description') form.set('csv_file', webtest.Upload('test.csv', force_bytes('\n'), 'application/octet-stream')) resp = form.submit() assert 'Could not detect CSV dialect' in resp def test_csv_validation(admin_user, app): app = login(app) resp = app.get('/manage/csvdatasource/add') form = resp.form form.set('title', 'Title') form.set('description', 'Description') form.set('csv_file', webtest.Upload('test.csv', b'a,b,c\n1,2\0,3\n4,5,6', 'application/octet-stream')) resp = form.submit() assert 'Invalid CSV file: line contains NUL' in resp def test_change_csv_command(setup): csv, _ = setup(data=StringIO(data)) call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data-empty.ods')) csv.refresh_from_db() assert list(csv.get_rows()) == [] call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data.csv')) csv.refresh_from_db() assert list(csv.get_rows()) != [] call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data-empty.ods')) csv.refresh_from_db() assert list(csv.get_rows()) == [] call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data.ods'), sheet_name='Feuille2') csv.refresh_from_db() assert list(csv.get_rows()) != [] def test_update(admin_user, app, setup): csv, url = setup(data=StringIO(data), filename='api-uploaded-file.csv') upload_dir = default_storage.path(upload_to(csv, '')) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csv.slug, 'endpoint': 'update', }, ) assert CsvDataSource.objects.get().get_rows()[0]['fam'] == '121' assert [files for root, dirs, files in os.walk(upload_dir)][0] == ['api-uploaded-file.csv'] # curl -H "Content-Type: text/csv" -T data.csv headers = {'CONTENT-TYPE': 'text/csv'} body = '212' + data[3:] # restricted access resp = app.put(url, params=body, headers=headers, status=403) assert resp.json['err'] assert 'PermissionDenied' in resp.json['err_class'] assert resp.json['err_class'] == 'django.core.exceptions.PermissionDenied' # add can_update_file access api = ApiUser.objects.get() obj_type = ContentType.objects.get_for_model(csv) AccessRight.objects.create( codename='can_update_file', apiuser=api, resource_type=obj_type, resource_pk=csv.pk ) resp = app.put(url, params=body, headers=headers) assert not resp.json['err'] assert CsvDataSource.objects.get().get_rows()[0]['fam'] == '212' assert len([files for root, dirs, files in os.walk(upload_dir)][0]) == 2 resp = app.put(url, params=body, headers={}, status=400) assert resp.json['err'] assert "can't guess filename extension" in resp.json['err_desc'] resp = app.put(url, params='\n', headers=headers, status=400) assert resp.json['err'] assert 'Could not detect CSV dialect' in resp.json['err_desc'] headers = {'CONTENT-TYPE': 'application/vnd.oasis.opendocument.spreadsheet'} resp = app.put(url, params=body, headers=headers, status=400) assert resp.json['err'] assert 'Invalid CSV file: File is not a zip file' in resp.json['err_desc'] csv.sheet_name = '' csv.save() resp = app.put(url, params=body, headers=headers, status=400) assert resp.json['err'] assert 'You must specify a sheet name' in resp.json['err_desc'] @pytest.mark.parametrize( 'payload,expected', [ ({}, 20), ({'limit': 10}, 10), ({'limit': 10, 'offset': 0}, 10), ({'limit': 10, 'offset': 15}, 5), ({'limit': 10, 'offset': 42}, 0), ], ) def test_pagination(app, setup, filetype, payload, expected): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) # data endpoint response = app.get(url + '?' + urlencode(payload)) assert len(response.json['data']) == expected # query endpoint url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, structure='array') query.projections = '\n'.join(['id:int(id)', 'text:prenom']) query.save() response = app.get(url + '?' + urlencode(payload)) assert len(response.json['data']) == expected @pytest.mark.parametrize( 'payload,expected_error', [ ({'limit': 'bla'}, 'invalid limit parameter'), ({'limit': 0}, 'invalid limit parameter'), ({'limit': -1}, 'invalid limit parameter'), ({'limit': 10, 'offset': 'bla'}, 'invalid offset parameter'), ({'limit': 10, 'offset': -1}, 'invalid offset parameter'), ], ) def test_pagination_error(app, setup, filetype, payload, expected_error): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) # data endpoint response = app.get(url + '?' + urlencode(payload)) assert response.json['err'] == 1 assert response.json['err_desc'] == expected_error # query endpoint url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, structure='array') query.projections = '\n'.join(['id:int(id)', 'text:prenom']) query.save() response = app.get(url + '?' + urlencode(payload)) assert response.json['err'] == 1 assert response.json['err_desc'] == expected_error def test_csv_dst(app, setup, admin_user): csvdata, url = setup( 'field,,another_field,', filename='data-empty.ods', data=get_file_content('data-empty.ods') ) with mock.patch('os.fstat') as mocked_fstat, override_settings(TIME_ZONE='Europe/Paris'): class MockStatResult: st_ctime = time.mktime((2019, 10, 27, 2, 20, 50, 0, 0, 0)) mocked_fstat.return_value = MockStatResult() # visit endpoint app.get(url) # visit manager page app = login(app) response = app.get(csvdata.get_absolute_url()) assert 'Oct. 27, 2019, 2:20 a.m.' in response def test_view_manage_create(app, admin_user, filetype, file_content, sheet_name): app = login(app) response = app.get(reverse('create-connector', kwargs={'connector': 'csvdatasource'})) response.form.set('title', 'test title') response.form.set('slug', 'test-slug') response.form.set('description', 'test description') response.form.set('csv_file', webtest.Upload(filetype, file_content, 'application/octet-stream')) response.form.set('columns_keynames', 'a,b,c,d,e') if sheet_name: response.form.set('sheet_name', sheet_name) response = response.form.submit() assert response.location response = response.follow() assert 'test title' in response assert 'test description' in response assert CsvDataSource.objects.count() == 1 resource = CsvDataSource.objects.get() assert resource.title == 'test title' assert resource.slug == 'test-slug' assert resource.description == 'test description' assert resource.csv_file.read() == file_content assert resource.columns_keynames == 'a,b,c,d,e' assert resource.sheet_name == sheet_name @pytest.mark.parametrize('remove_files', [False, True]) def test_csv_daily_clean(settings, remove_files): settings.CSVDATASOURCE_REMOVE_ON_CLEAN = remove_files csvdata = CsvDataSource.objects.create( csv_file=File(StringIO('a;z;e;r;t;y'), 'data.csv'), sheet_name='Feuille2', slug='test-%s' % str(uuid.uuid4()), title='a title', description='a description', ) old_dir = os.path.join(os.path.join(settings.MEDIA_ROOT), 'csv') os.makedirs(old_dir) csvdata_dir = os.path.dirname(csvdata.csv_file.path) other_dir = os.path.join(settings.MEDIA_ROOT, 'foo', csvdata.slug) os.makedirs(other_dir) # create additional file in MEDIA/csv with open(os.path.join(old_dir, 'csv-file.csv'), 'w'): pass # create additional file in csvdata dir with open(os.path.join(csvdata_dir, 'csv-file.csv'), 'w'): pass # create additional file in other dir with open(os.path.join(other_dir, 'csv-file.csv'), 'w'): pass call_command('cron', 'daily') # not changed assert os.listdir(old_dir) == ['csv-file.csv'] assert os.listdir(other_dir) == ['csv-file.csv'] # too soon to be removed dir_list = os.listdir(csvdata_dir) dir_list.sort() assert dir_list == ['csv-file.csv', 'data.csv'] orig_os_stat = os.stat def _fake_stat(arg, delta): faked = list(orig_os_stat(arg)) faked[ST_MTIME] = (now() + delta).timestamp() return stat_result(faked) try: # 1 week ago but one minute too soon os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7, minutes=1)) call_command('cron', 'daily') # not changed assert os.listdir(old_dir) == ['csv-file.csv'] assert os.listdir(other_dir) == ['csv-file.csv'] # still too soon to be removed dir_list = os.listdir(csvdata_dir) dir_list.sort() assert dir_list == ['csv-file.csv', 'data.csv'] # 1 week ago os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7)) call_command('cron', 'daily') # not changed assert os.listdir(old_dir) == ['csv-file.csv'] assert os.listdir(other_dir) == ['csv-file.csv'] # still too soon to be removed dir_list = os.listdir(csvdata_dir) if remove_files: assert dir_list == ['data.csv'] else: dir_list.sort() assert dir_list == ['data.csv', 'unused-files'] assert os.listdir(os.path.join(csvdata_dir, 'unused-files')) == ['csv-file.csv'] # wrong storage directory, do nothing with open(os.path.join(other_dir, 'bar.csv'), 'w'): pass csvdata.csv_file.name = 'foo/%s/csv-file.csv' % csvdata.slug csvdata.save() assert set(os.listdir(other_dir)) == {'bar.csv', 'csv-file.csv'} call_command('cron', 'daily') assert set(os.listdir(other_dir)) == {'bar.csv', 'csv-file.csv'} # unknown file csvdata.csv_file.name = 'csvdatasource/%s/bar.csv' % csvdata.slug csvdata.save() call_command('cron', 'daily') finally: os.stat = orig_os_stat def spy_decorator(method_to_decorate): # https://stackoverflow.com/a/41599695/6686829 method_mock = mock.MagicMock() def wrapper(self, *args, **kwargs): method_mock(*args, **kwargs) return method_to_decorate(self, *args, **kwargs) wrapper.mock = method_mock return wrapper def test_query_onerow_model_filters_no_projection(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, structure='onerow') query.filters = '''id == query['id']''' query.save() get_cached_rows = spy_decorator(CsvDataSource.get_cached_rows) with mock.patch.object(CsvDataSource, 'get_cached_rows', get_cached_rows): response = app.get(url + '?id=525') assert get_cached_rows.mock.call_count == 1 assert get_cached_rows.mock.call_args[1] == {'model_filters': {'data__contains': {'id': '525'}}} assert response.json['err'] == 0 assert isinstance(response.json['data'], dict) assert response.json['data']['prenom'] == 'Shanone' def test_query_onerow_model_filters_projection(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, structure='onerow') query.projections = '''id:prenom''' query.save() get_cached_rows = spy_decorator(CsvDataSource.get_cached_rows) with mock.patch.object(CsvDataSource, 'get_cached_rows', get_cached_rows): response = app.get(url + '?id=Shanone') assert get_cached_rows.mock.call_count == 1 assert get_cached_rows.mock.call_args[1] == {'model_filters': {'data__contains': {'prenom': 'Shanone'}}} assert response.json['err'] == 0 assert isinstance(response.json['data'], dict) assert response.json['data']['id'] == 'Shanone' def test_query_onerow_id_no_model_filters(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, structure='onerow') query.projections = '''id:prenom.lower()''' query.save() get_cached_rows = spy_decorator(CsvDataSource.get_cached_rows) with mock.patch.object(CsvDataSource, 'get_cached_rows', get_cached_rows): response = app.get(url + '?id=shanone') assert get_cached_rows.mock.call_count == 1 assert get_cached_rows.mock.call_args[1] == {'model_filters': {}} assert response.json['err'] == 0 assert isinstance(response.json['data'], dict) assert response.json['data']['id'] == 'shanone' def test_model_filters_empty_cache_data(app, setup, filetype): csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csvdata.slug, 'endpoint': 'query/query-1_/', }, ) query = Query(slug='query-1_', resource=csvdata, structure='onerow') query.filters = '''id == query['id']''' query.save() cache_data = spy_decorator(CsvDataSource.cache_data) get_cached_rows = spy_decorator(CsvDataSource.get_cached_rows) with mock.patch.object(CsvDataSource, 'cache_data', cache_data): with mock.patch.object(CsvDataSource, 'get_cached_rows', get_cached_rows): # if rows are cached, cache_data is never called response = app.get(url + '?id=525') assert response.json['data'] assert get_cached_rows.mock.call_args[1] == {'model_filters': {'data__contains': {'id': '525'}}} assert cache_data.mock.call_count == 0 response = app.get(url + '?id=99999') assert not response.json['data'] assert get_cached_rows.mock.call_args[1] == {'model_filters': {'data__contains': {'id': '99999'}}} assert cache_data.mock.call_count == 0 # if the row table is emptied, cache_data is called, one time csvdata.tablerow_set.all().delete() response = app.get(url + '?id=99999') assert not response.json['data'] assert get_cached_rows.mock.call_args[1] == { 'model_filters': {'data__contains': {'id': '99999'}}, 'initial': False, } assert cache_data.mock.call_count == 1 response = app.get(url + '?id=99999') assert not response.json['data'] assert get_cached_rows.mock.call_args[1] == {'model_filters': {'data__contains': {'id': '99999'}}} assert cache_data.mock.call_count == 1