passerelle/tests/test_csv_datasource.py

1253 lines
44 KiB
Python

#
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
import os
import time
import uuid
from io import StringIO
from posix import stat_result
from stat import ST_MTIME
from unittest import mock
from urllib.parse import urlencode
import pytest
import webtest
from django.contrib.contenttypes.models import ContentType
from django.core.files import File
from django.core.files.storage import default_storage
from django.core.management import call_command
from django.test import Client, override_settings
from django.urls import reverse
from django.utils.encoding import force_bytes, force_str
from django.utils.timezone import now
from passerelle.apps.csvdatasource.models import CsvDataSource, Query, TableRow, upload_to
from passerelle.base.models import AccessRight, ApiUser
from tests.test_manager import login
data = """121;69981;DELANOUE;Eliot;H
525;6;DANIEL WILLIAMS;Shanone;F
253;67742;MARTIN;Sandra;F
511;38142;MARCHETTI;Lucie;F
235;22;MARTIN;Sandra;F
620;52156;ARNAUD;Mathis;H
902;36;BRIGAND;Coline;F
2179;48548;THEBAULT;Salima;F
3420;46;WILSON-LUZAYADIO;Anaëlle;F
1421;55486;WONE;Fadouma;F
2841;51;FIDJI;Zakia;F
2431;59;BELICARD;Sacha;H
4273;60;GOUBERT;Adrien;H
4049;64;MOVSESSIAN;Dimitri;H
4605;67;ABDOU BACAR;Kyle;H
4135;22231;SAVERIAS;Marius;H
4809;75;COROLLER;Maelys;F
5427;117;KANTE;Aliou;H
116642;118;ZAHMOUM;Yaniss;H
216352;38;Dupont;Benoît;H
"""
data_bom = force_str(force_str(data, 'utf-8').encode('utf-8-sig'))
pytestmark = pytest.mark.django_db
TEST_BASE_DIR = os.path.join(os.path.dirname(__file__), 'data', 'csvdatasource')
def get_file_content(filename):
return open(os.path.join(TEST_BASE_DIR, filename), 'rb')
@pytest.fixture
def setup():
def maker(
columns_keynames='fam,id,lname,fname,sex',
filename='data.csv',
sheet_name='Feuille2',
data='',
skip_header=False,
):
api = ApiUser.objects.create(username='all', keytype='', key='')
csv = CsvDataSource.objects.create(
csv_file=File(data, filename),
sheet_name=sheet_name,
columns_keynames=columns_keynames,
slug='test',
title='a title',
description='a description',
skip_header=skip_header,
)
assert TableRow.objects.filter(resource=csv).count() == len(csv.get_rows())
obj_type = ContentType.objects.get_for_model(csv)
AccessRight.objects.create(
codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=csv.pk
)
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csv.slug,
'endpoint': 'data',
},
)
return csv, url
return maker
def parse_response(response):
return json.loads(response.content)['data']
@pytest.fixture
def client():
return Client()
@pytest.fixture(params=['data.csv', 'data.ods', 'data.xls', 'data.xlsx'])
def filetype(request):
return request.param
@pytest.fixture
def sheet_name(filetype):
return 'Feuille2' if filetype != 'data.csv' else ''
@pytest.fixture
def file_content(filetype):
with get_file_content(filetype) as fd:
yield fd.read()
def test_default_column_keynames(setup, filetype):
csvdata = CsvDataSource.objects.create(
csv_file=File(get_file_content(filetype), filetype),
sheet_name='Feuille2',
slug='test2',
title='a title',
description='a description',
)
assert len(csvdata.columns_keynames.split(',')) == 2
assert 'id' in csvdata.columns_keynames
assert 'text' in csvdata.columns_keynames
def test_sheet_name_error(setup, app, filetype, admin_user):
setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
app = login(app)
resp = app.get('/manage/csvdatasource/test/edit')
edit_form = resp.forms[0]
edit_form['sheet_name'] = ''
resp = edit_form.submit()
if filetype != 'data.csv':
assert resp.status_code == 200
assert 'You must specify a sheet name' in resp.text
else:
assert resp.status_code == 302
def test_unfiltered_data(client, setup, filetype):
_, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
resp = client.get(url)
result = parse_response(resp)
for item in result:
assert 'field' in item
assert 'another_field' in item
def test_empty_file(client, setup):
_, url = setup(
'field,,another_field,', filename='data-empty.ods', data=get_file_content('data-empty.ods')
)
resp = client.get(url)
result = parse_response(resp)
assert len(result) == 0
def test_view_manage_page(setup, app, filetype, admin_user):
csvdata, _ = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
app = login(app)
app.get(csvdata.get_absolute_url())
def test_good_filter_data(client, setup, filetype):
filter_criteria = 'Zakia'
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filters = {'text': filter_criteria}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result)
for item in result:
assert 'id' in item
assert 'text' in item
assert filter_criteria in item['text']
def test_bad_filter_data(client, setup, filetype):
filter_criteria = 'bad'
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filters = {'text': filter_criteria}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 0
def test_useless_filter_data(client, setup, filetype):
_, url = setup('id,,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Ali'}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 20
def test_columns_keynames_with_spaces(client, setup, filetype):
_, url = setup('id , , nom,text , ', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Yaniss'}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 1
def test_skipped_header_data(client, setup, filetype):
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype), skip_header=True)
filters = {'q': 'Eliot'}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 0
def test_data(client, setup, filetype):
_, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Sacha'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0] == {'id': '59', 'text': 'Sacha', 'fam': '2431', 'sexe': 'H'}
def test_unicode_filter_data(client, setup, filetype):
filter_criteria = 'Benoît'
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filters = {'text': filter_criteria}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result)
for item in result:
assert 'id' in item
assert 'text' in item
assert filter_criteria in item['text']
def test_unicode_case_insensitive_filter_data(client, setup, filetype):
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filter_criteria = 'anaëlle'
filters = {'text': filter_criteria, 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result)
for item in result:
assert 'id' in item
assert 'text' in item
assert filter_criteria.lower() in item['text'].lower()
def test_data_bom(client, setup):
_, url = setup('fam,id,, text,sexe ', data=StringIO(data_bom))
filters = {'text': 'Eliot'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0] == {'id': '69981', 'text': 'Eliot', 'fam': '121', 'sexe': 'H'}
def test_multi_filter(client, setup, filetype):
_, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'sexe': 'F'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0] == {'id': '6', 'text': 'Shanone', 'fam': '525', 'sexe': 'F'}
assert len(result) == 10
def test_query(client, setup, filetype):
_, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'q': 'liot'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
def test_query_insensitive_and_unicode(client, setup, filetype):
_, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'q': 'elIo', 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
filters['q'] = 'élIo'
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
def test_query_insensitive_and_filter(client, setup, filetype):
_, url = setup('fam,id,,text,sexe', filename=filetype, data=get_file_content(filetype))
filters = {'q': 'elIo', 'sexe': 'H', 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
def test_dialect(client, setup):
csvdata, url = setup(data=StringIO(data))
expected = {
'lineterminator': '\r\n',
'skipinitialspace': False,
'quoting': 0,
'delimiter': ';',
'quotechar': '"',
'doublequote': False,
}
assert expected == csvdata.dialect_options
filters = {'id': '22', 'fname': 'Sandra'}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 1
assert result[0]['id'] == '22'
assert result[0]['lname'] == 'MARTIN'
def test_on_the_fly_dialect_detection(client, setup):
# fake a connector that was not initialized during .save(), because it's
# been migrated and we didn't do dialect detection at save() time.
_, url = setup(data=StringIO(data))
CsvDataSource.objects.all().update(_dialect_options=None)
resp = client.get(url)
result = json.loads(resp.content)
assert result['err'] == 0
assert len(result['data']) == 20
def test_missing_columns(client, setup):
_, url = setup(data=StringIO(data + 'A;B;C\n'))
resp = client.get(url)
result = json.loads(resp.content)
assert result['err'] == 0
assert len(result['data']) == 21
assert result['data'][-1] == {'lname': 'C', 'sex': '', 'id': 'B', 'fname': '', 'fam': 'A'}
def test_unknown_sheet_name(client, setup):
csvdata, url = setup('field,,another_field,', filename='data2.ods', data=get_file_content('data2.ods'))
csvdata.sheet_name = 'unknown'
csvdata.save()
resp = client.get(url)
result = json.loads(resp.content)
assert len(result['data']) == 20
def test_cache_new_shorter_file(client, setup):
csvdata, url = setup(data=StringIO(data + 'A;B;C\n'))
resp = client.get(url)
result = json.loads(resp.content)
assert result['err'] == 0
assert len(result['data']) == 21
csvdata.csv_file = File(StringIO(data), 'data.csv')
csvdata.save()
resp = client.get(url)
result = json.loads(resp.content)
assert len(result['data']) == 20
def test_query_array(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert len(response.json['data'])
for row in response.json['data']:
assert len(row) == 2
assert isinstance(row[0], int)
assert isinstance(row[1], str)
def test_query_q_filter(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'text:prenom'])
query.save()
response = app.get(url + '?q=Sandra')
assert response.json['err'] == 0
assert len(response.json['data']) == 2
response = app.get(url + '?q=sandra')
assert response.json['err'] == 0
assert len(response.json['data']) == 2
response = app.get(url + '?' + urlencode({'q': 'Benoît'}))
assert response.json['err'] == 0
assert len(response.json['data']) == 1
response = app.get(url + '?q=benoit')
assert response.json['err'] == 0
assert len(response.json['data']) == 1
def test_query_dict(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, structure='dict')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert len(response.json['data'])
for row in response.json['data']:
assert len(row) == 2
assert isinstance(row['id'], int)
assert isinstance(row['prenom'], str)
def test_query_tuples(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, structure='tuples')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert len(response.json['data'])
for row in response.json['data']:
assert len(row) == 2
assert row[0][0] == 'id'
assert isinstance(row[0][1], int)
assert row[1][0] == 'prenom'
assert isinstance(row[1][1], str)
def test_query_onerow(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, structure='onerow')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.filters = 'int(id) == 525'
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert isinstance(response.json['data'], dict)
assert response.json['data']['prenom'] == 'Shanone'
def test_query_one(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, structure='one')
query.projections = 'wtf:prenom'
query.filters = 'int(id) == 525'
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert response.json['data'] == 'Shanone'
def test_query_filter_param(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, structure='one')
query.projections = 'wtf:prenom'
query.filters = 'int(id) == int(query.get("foobar"))'
query.save()
response = app.get(url + '?foobar=525')
assert response.json['err'] == 0
assert response.json['data'] == 'Shanone'
query.filters = 'int(id) == int(query.get("query"))'
query.save()
response = app.get(url + '?query=525')
assert response.json['err'] == 0
assert response.json['data'] == 'Shanone'
def test_query_distinct(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, distinct='sexe')
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert isinstance(response.json['data'], list)
assert len(response.json['data']) == 2
def test_query_keyed_distinct(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, distinct='nom', structure='keyed-distinct')
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert isinstance(response.json['data'], dict)
assert response.json['data']['MARTIN']['prenom'] == 'Sandra'
def test_query_order(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, order='prenom.lower()')
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert isinstance(response.json['data'], list)
assert response.json['data'] == sorted(response.json['data'], key=lambda row: row['prenom'].lower())
def test_query_order_missing_column(app, setup):
csvdata, url = setup(data=StringIO(data + '42;42;STOPHERE\n'))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, order='fname')
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert isinstance(response.json['data'], list)
assert response.json['data'] == sorted(response.json['data'], key=lambda row: row['fname'])
def test_query_error(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'no such query'
query = Query(slug='query-1_', resource=csvdata)
query.save()
query.projections = '1a:prenom'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid projection name'
assert response.json['data'] == '1a'
query.projections = 'id:prenom.'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid projection expression'
assert response.json['data']['name'] == 'id'
assert response.json['data']['expr'] == 'prenom.'
query.projections = 'id:prenom not'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid projection expression'
assert response.json['data']['name'] == 'id'
assert response.json['data']['expr'] == 'prenom not'
query.projections = 'id:zob'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid projection expression'
assert response.json['data']['name'] == 'id'
assert response.json['data']['expr'] == 'zob'
assert 'row' in response.json['data']
query.projections = ''
query.filters = 'prenom.'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid filters expression'
assert response.json['data']['expr'] == 'prenom.'
query.filters = 'zob'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid filters expression'
assert response.json['data']['error'] == "name 'zob' is not defined"
assert response.json['data']['expr'] == 'zob'
assert 'row' in response.json['data']
query.filters = ''
query.order = 'prenom.'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid order expression'
assert response.json['data']['expr'] == 'prenom.'
query.order = 'zob'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid order expression'
assert response.json['data']['expr'] == 'zob'
assert 'row' in response.json['data']
query.order = ''
query.distinct = 'prenom.'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid distinct expression'
assert response.json['data']['expr'] == 'prenom.'
query.distinct = 'zob'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid distinct expression'
assert response.json['data']['expr'] == 'zob'
assert 'row' in response.json['data']
def test_edit_connector_queries(admin_user, app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('view-connector', kwargs={'connector': 'csvdatasource', 'slug': csvdata.slug})
resp = app.get(url)
assert 'New Query' not in resp.text
new_query_url = reverse('csv-new-query', kwargs={'connector_slug': csvdata.slug})
resp = app.get(new_query_url, status=302)
assert resp.location.endswith('/login/?next=%s' % new_query_url)
app = login(app)
resp = app.get(url)
resp = resp.click('New Query')
resp.form['slug'] = 'foobar'
resp.form['label'] = 'Lucy alone'
resp.form['filters'] = 'int(id) =< 525'
resp = resp.form.submit()
assert 'Syntax error' in resp.text
resp.form['filters'] = 'int(id) == 525'
resp.form['projections'] = 'id'
resp = resp.form.submit()
assert 'Syntax error' in resp.text
resp.form['projections'] = 'id:id\nprenom:prenom'
resp = resp.form.submit().follow()
resp = resp.click('foobar', index=1) # 0th is the newly created endpoint
resp.form['filters'] = 'int(id) == 511'
resp.form['description'] = 'in the sky without diamonds'
resp = resp.form.submit().follow()
assert 'Lucy alone' in resp.text
assert 'in the sky without diamonds' in resp.text
resp = resp.click('foobar', index=0) # 0th is the newly created endpoint
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['prenom'] == 'Lucie'
def test_download_file(app, setup, filetype, admin_user):
setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
assert '/login' in app.get('/manage/csvdatasource/test/download/').location
app = login(app)
resp = app.get('/manage/csvdatasource/test/download/', status=200)
if filetype == 'data.csv':
assert resp.headers['Content-Type'] == 'text/csv; charset=utf-8'
assert resp.headers['Content-Length'] == str(len(force_bytes(data)))
elif filetype == 'data.ods':
assert resp.headers['Content-Type'] == 'application/vnd.oasis.opendocument.spreadsheet'
elif filetype == 'data.xls':
assert resp.headers['Content-Type'] == 'application/vnd.ms-excel'
elif filetype == 'data.xlsx':
assert (
resp.headers['Content-Type']
== 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
def test_query_filter_multiline(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata)
query.filters = '\n'.join(['int(id) <= 525', 'int(id) >= 511'])
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert len(response.json['data']) == 2
def test_query_builtin_id_filter(app, setup, filetype):
csvdata, _url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata)
query.save()
assert len(app.get(url).json['data']) == 20
assert len(app.get(url + '?id=121').json['data']) == 1
assert app.get(url + '?id=121').json['data'][0]['prenom'] == 'Eliot'
assert app.get(url + '?id=525').json['data'][0]['prenom'] == 'Shanone'
# check when combined with another filter
query.filters = 'sexe == "H"'
query.save()
assert len(app.get(url).json['data']) == 10
assert app.get(url + '?id=121').json['data'][0]['prenom'] == 'Eliot'
assert len(app.get(url + '?id=525').json['data']) == 0
def test_delete_connector_query(admin_user, app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('view-connector', kwargs={'connector': 'csvdatasource', 'slug': csvdata.slug})
resp = app.get(url)
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.save()
app = login(app)
resp = app.get(url)
resp = resp.click('delete query')
assert 'Are you sure' in resp.text
resp = resp.form.submit('delete')
resp = resp.follow()
assert 'No query are defined.' in resp.text
def test_csv_sniffer(admin_user, app):
app = login(app)
resp = app.get('/manage/csvdatasource/add')
form = resp.form
form.set('title', 'Title')
form.set('description', 'Description')
form.set('csv_file', webtest.Upload('test.csv', force_bytes('\n'), 'application/octet-stream'))
resp = form.submit()
assert 'Could not detect CSV dialect' in resp
def test_csv_validation(admin_user, app):
app = login(app)
resp = app.get('/manage/csvdatasource/add')
form = resp.form
form.set('title', 'Title')
form.set('description', 'Description')
form.set('csv_file', webtest.Upload('test.csv', b'a,b,c\n1,2\0,3\n4,5,6', 'application/octet-stream'))
resp = form.submit()
assert 'Invalid CSV file: line contains NUL' in resp
def test_change_csv_command(setup):
csv, _ = setup(data=StringIO(data))
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data-empty.ods'))
csv.refresh_from_db()
assert list(csv.get_rows()) == []
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data.csv'))
csv.refresh_from_db()
assert list(csv.get_rows()) != []
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data-empty.ods'))
csv.refresh_from_db()
assert list(csv.get_rows()) == []
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data.ods'), sheet_name='Feuille2')
csv.refresh_from_db()
assert list(csv.get_rows()) != []
def test_update(admin_user, app, setup):
csv, url = setup(data=StringIO(data), filename='api-uploaded-file.csv')
upload_dir = default_storage.path(upload_to(csv, ''))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csv.slug,
'endpoint': 'update',
},
)
assert CsvDataSource.objects.get().get_rows()[0]['fam'] == '121'
assert [files for root, dirs, files in os.walk(upload_dir)][0] == ['api-uploaded-file.csv']
# curl -H "Content-Type: text/csv" -T data.csv
headers = {'CONTENT-TYPE': 'text/csv'}
body = '212' + data[3:]
# restricted access
resp = app.put(url, params=body, headers=headers, status=403)
assert resp.json['err']
assert 'PermissionDenied' in resp.json['err_class']
assert resp.json['err_class'] == 'django.core.exceptions.PermissionDenied'
# add can_update_file access
api = ApiUser.objects.get()
obj_type = ContentType.objects.get_for_model(csv)
AccessRight.objects.create(
codename='can_update_file', apiuser=api, resource_type=obj_type, resource_pk=csv.pk
)
resp = app.put(url, params=body, headers=headers)
assert not resp.json['err']
assert CsvDataSource.objects.get().get_rows()[0]['fam'] == '212'
assert len([files for root, dirs, files in os.walk(upload_dir)][0]) == 2
resp = app.put(url, params=body, headers={}, status=400)
assert resp.json['err']
assert "can't guess filename extension" in resp.json['err_desc']
resp = app.put(url, params='\n', headers=headers, status=400)
assert resp.json['err']
assert 'Could not detect CSV dialect' in resp.json['err_desc']
headers = {'CONTENT-TYPE': 'application/vnd.oasis.opendocument.spreadsheet'}
resp = app.put(url, params=body, headers=headers, status=400)
assert resp.json['err']
assert 'Invalid CSV file: File is not a zip file' in resp.json['err_desc']
csv.sheet_name = ''
csv.save()
resp = app.put(url, params=body, headers=headers, status=400)
assert resp.json['err']
assert 'You must specify a sheet name' in resp.json['err_desc']
@pytest.mark.parametrize(
'payload,expected',
[
({}, 20),
({'limit': 10}, 10),
({'limit': 10, 'offset': 0}, 10),
({'limit': 10, 'offset': 15}, 5),
({'limit': 10, 'offset': 42}, 0),
],
)
def test_pagination(app, setup, filetype, payload, expected):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
# data endpoint
response = app.get(url + '?' + urlencode(payload))
assert len(response.json['data']) == expected
# query endpoint
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'text:prenom'])
query.save()
response = app.get(url + '?' + urlencode(payload))
assert len(response.json['data']) == expected
@pytest.mark.parametrize(
'payload,expected_error',
[
({'limit': 'bla'}, 'invalid limit parameter'),
({'limit': 0}, 'invalid limit parameter'),
({'limit': -1}, 'invalid limit parameter'),
({'limit': 10, 'offset': 'bla'}, 'invalid offset parameter'),
({'limit': 10, 'offset': -1}, 'invalid offset parameter'),
],
)
def test_pagination_error(app, setup, filetype, payload, expected_error):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
# data endpoint
response = app.get(url + '?' + urlencode(payload))
assert response.json['err'] == 1
assert response.json['err_desc'] == expected_error
# query endpoint
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'text:prenom'])
query.save()
response = app.get(url + '?' + urlencode(payload))
assert response.json['err'] == 1
assert response.json['err_desc'] == expected_error
def test_csv_dst(app, setup, admin_user):
csvdata, url = setup(
'field,,another_field,', filename='data-empty.ods', data=get_file_content('data-empty.ods')
)
with mock.patch('os.fstat') as mocked_fstat, override_settings(TIME_ZONE='Europe/Paris'):
class MockStatResult:
st_ctime = time.mktime((2019, 10, 27, 2, 20, 50, 0, 0, 0))
mocked_fstat.return_value = MockStatResult()
# visit endpoint
app.get(url)
# visit manager page
app = login(app)
response = app.get(csvdata.get_absolute_url())
assert 'Oct. 27, 2019, 2:20 a.m.' in response
def test_view_manage_create(app, admin_user, filetype, file_content, sheet_name):
app = login(app)
response = app.get(reverse('create-connector', kwargs={'connector': 'csvdatasource'}))
response.form.set('title', 'test title')
response.form.set('slug', 'test-slug')
response.form.set('description', 'test description')
response.form.set('csv_file', webtest.Upload(filetype, file_content, 'application/octet-stream'))
response.form.set('columns_keynames', 'a,b,c,d,e')
if sheet_name:
response.form.set('sheet_name', sheet_name)
response = response.form.submit()
assert response.location
response = response.follow()
assert 'test title' in response
assert 'test description' in response
assert CsvDataSource.objects.count() == 1
resource = CsvDataSource.objects.get()
assert resource.title == 'test title'
assert resource.slug == 'test-slug'
assert resource.description == 'test description'
assert resource.csv_file.read() == file_content
assert resource.columns_keynames == 'a,b,c,d,e'
assert resource.sheet_name == sheet_name
@pytest.mark.parametrize('remove_files', [False, True])
def test_csv_daily_clean(settings, remove_files):
settings.CSVDATASOURCE_REMOVE_ON_CLEAN = remove_files
csvdata = CsvDataSource.objects.create(
csv_file=File(StringIO('a;z;e;r;t;y'), 'data.csv'),
sheet_name='Feuille2',
slug='test-%s' % str(uuid.uuid4()),
title='a title',
description='a description',
)
old_dir = os.path.join(os.path.join(settings.MEDIA_ROOT), 'csv')
os.makedirs(old_dir)
csvdata_dir = os.path.dirname(csvdata.csv_file.path)
other_dir = os.path.join(settings.MEDIA_ROOT, 'foo', csvdata.slug)
os.makedirs(other_dir)
# create additional file in MEDIA/csv
with open(os.path.join(old_dir, 'csv-file.csv'), 'w'):
pass
# create additional file in csvdata dir
with open(os.path.join(csvdata_dir, 'csv-file.csv'), 'w'):
pass
# create additional file in other dir
with open(os.path.join(other_dir, 'csv-file.csv'), 'w'):
pass
call_command('cron', 'daily')
# not changed
assert os.listdir(old_dir) == ['csv-file.csv']
assert os.listdir(other_dir) == ['csv-file.csv']
# too soon to be removed
dir_list = os.listdir(csvdata_dir)
dir_list.sort()
assert dir_list == ['csv-file.csv', 'data.csv']
orig_os_stat = os.stat
def _fake_stat(arg, delta):
faked = list(orig_os_stat(arg))
faked[ST_MTIME] = (now() + delta).timestamp()
return stat_result(faked)
try:
# 1 week ago but one minute too soon
os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7, minutes=1))
call_command('cron', 'daily')
# not changed
assert os.listdir(old_dir) == ['csv-file.csv']
assert os.listdir(other_dir) == ['csv-file.csv']
# still too soon to be removed
dir_list = os.listdir(csvdata_dir)
dir_list.sort()
assert dir_list == ['csv-file.csv', 'data.csv']
# 1 week ago
os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7))
call_command('cron', 'daily')
# not changed
assert os.listdir(old_dir) == ['csv-file.csv']
assert os.listdir(other_dir) == ['csv-file.csv']
# still too soon to be removed
dir_list = os.listdir(csvdata_dir)
if remove_files:
assert dir_list == ['data.csv']
else:
dir_list.sort()
assert dir_list == ['data.csv', 'unused-files']
assert os.listdir(os.path.join(csvdata_dir, 'unused-files')) == ['csv-file.csv']
# wrong storage directory, do nothing
with open(os.path.join(other_dir, 'bar.csv'), 'w'):
pass
csvdata.csv_file.name = 'foo/%s/csv-file.csv' % csvdata.slug
csvdata.save()
assert set(os.listdir(other_dir)) == {'bar.csv', 'csv-file.csv'}
call_command('cron', 'daily')
assert set(os.listdir(other_dir)) == {'bar.csv', 'csv-file.csv'}
# unknown file
csvdata.csv_file.name = 'csvdatasource/%s/bar.csv' % csvdata.slug
csvdata.save()
call_command('cron', 'daily')
finally:
os.stat = orig_os_stat
def spy_decorator(method_to_decorate):
# https://stackoverflow.com/a/41599695/6686829
method_mock = mock.MagicMock()
def wrapper(self, *args, **kwargs):
method_mock(*args, **kwargs)
return method_to_decorate(self, *args, **kwargs)
wrapper.mock = method_mock
return wrapper
def test_query_onerow_model_filters_no_projection(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, structure='onerow')
query.filters = '''id == query['id']'''
query.save()
get_cached_rows = spy_decorator(CsvDataSource.get_cached_rows)
with mock.patch.object(CsvDataSource, 'get_cached_rows', get_cached_rows):
response = app.get(url + '?id=525')
assert get_cached_rows.mock.call_count == 1
assert get_cached_rows.mock.call_args[1] == {'model_filters': {'data__contains': {'id': '525'}}}
assert response.json['err'] == 0
assert isinstance(response.json['data'], dict)
assert response.json['data']['prenom'] == 'Shanone'
def test_query_onerow_model_filters_projection(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, structure='onerow')
query.projections = '''id:prenom'''
query.save()
get_cached_rows = spy_decorator(CsvDataSource.get_cached_rows)
with mock.patch.object(CsvDataSource, 'get_cached_rows', get_cached_rows):
response = app.get(url + '?id=Shanone')
assert get_cached_rows.mock.call_count == 1
assert get_cached_rows.mock.call_args[1] == {'model_filters': {'data__contains': {'prenom': 'Shanone'}}}
assert response.json['err'] == 0
assert isinstance(response.json['data'], dict)
assert response.json['data']['id'] == 'Shanone'
def test_query_onerow_id_no_model_filters(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, structure='onerow')
query.projections = '''id:prenom.lower()'''
query.save()
get_cached_rows = spy_decorator(CsvDataSource.get_cached_rows)
with mock.patch.object(CsvDataSource, 'get_cached_rows', get_cached_rows):
response = app.get(url + '?id=shanone')
assert get_cached_rows.mock.call_count == 1
assert get_cached_rows.mock.call_args[1] == {'model_filters': {}}
assert response.json['err'] == 0
assert isinstance(response.json['data'], dict)
assert response.json['data']['id'] == 'shanone'
def test_model_filters_empty_cache_data(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
},
)
query = Query(slug='query-1_', resource=csvdata, structure='onerow')
query.filters = '''id == query['id']'''
query.save()
cache_data = spy_decorator(CsvDataSource.cache_data)
get_cached_rows = spy_decorator(CsvDataSource.get_cached_rows)
with mock.patch.object(CsvDataSource, 'cache_data', cache_data):
with mock.patch.object(CsvDataSource, 'get_cached_rows', get_cached_rows):
# if rows are cached, cache_data is never called
response = app.get(url + '?id=525')
assert response.json['data']
assert get_cached_rows.mock.call_args[1] == {'model_filters': {'data__contains': {'id': '525'}}}
assert cache_data.mock.call_count == 0
response = app.get(url + '?id=99999')
assert not response.json['data']
assert get_cached_rows.mock.call_args[1] == {'model_filters': {'data__contains': {'id': '99999'}}}
assert cache_data.mock.call_count == 0
# if the row table is emptied, cache_data is called, one time
csvdata.tablerow_set.all().delete()
response = app.get(url + '?id=99999')
assert not response.json['data']
assert get_cached_rows.mock.call_args[1] == {
'model_filters': {'data__contains': {'id': '99999'}},
'initial': False,
}
assert cache_data.mock.call_count == 1
response = app.get(url + '?id=99999')
assert not response.json['data']
assert get_cached_rows.mock.call_args[1] == {'model_filters': {'data__contains': {'id': '99999'}}}
assert cache_data.mock.call_count == 1