passerelle/tests/test_csv_datasource.py

826 lines
29 KiB
Python

# -*- coding: utf-8 -*-
#
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import os
import pytest
from StringIO import StringIO
import six
from django.core.files import File
from django.core.urlresolvers import reverse
from django.contrib.contenttypes.models import ContentType
from django.test import Client
from django.core.management import call_command
from django.utils.six.moves.urllib.parse import urlencode
from passerelle.base.models import ApiUser, AccessRight
from passerelle.apps.csvdatasource.models import CsvDataSource, Query, TableRow
from test_manager import login, admin_user
import webtest
data = """121;69981;DELANOUE;Eliot;H
525;6;DANIEL WILLIAMS;Shanone;F
253;67742;MARTIN;Sandra;F
511;38142;MARCHETTI;Lucie;F
235;22;MARTIN;Sandra;F
620;52156;ARNAUD;Mathis;H
902;36;BRIGAND;Coline;F
2179;48548;THEBAULT;Salima;F
3420;46;WILSON-LUZAYADIO;Anaëlle;F
1421;55486;WONE;Fadouma;F
2841;51;FIDJI;Zakia;F
2431;59;BELICARD;Sacha;H
4273;60;GOUBERT;Adrien;H
4049;64;MOVSESSIAN;Dimitri;H
4605;67;ABDOU BACAR;Kyle;H
4135;22231;SAVERIAS;Marius;H
4809;75;COROLLER;Maelys;F
5427;117;KANTE;Aliou;H
116642;118;ZAHMOUM;Yaniss;H
216352;38;Dupont;Benoît;H
"""
data_bom = data.decode('utf-8').encode('utf-8-sig')
pytestmark = pytest.mark.django_db
TEST_BASE_DIR = os.path.join(os.path.dirname(__file__), 'data', 'csvdatasource')
def get_file_content(filename):
return open(os.path.join(TEST_BASE_DIR, filename)).read()
@pytest.fixture
def setup():
def maker(columns_keynames='fam,id,lname,fname,sex', filename='data.csv',
sheet_name='Feuille2', data='', skip_header=False):
api = ApiUser.objects.create(
username='all',
keytype='',
key='')
csv = CsvDataSource.objects.create(
csv_file=File(StringIO(data), filename),
sheet_name=sheet_name,
columns_keynames=columns_keynames,
slug='test',
title='a title',
description='a description',
skip_header=skip_header)
assert TableRow.objects.filter(resource=csv).count() == len(csv.get_rows())
obj_type = ContentType.objects.get_for_model(csv)
AccessRight.objects.create(
codename='can_access',
apiuser=api,
resource_type=obj_type,
resource_pk=csv.pk)
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csv.slug,
'endpoint': 'data',
})
return csv, url
return maker
def parse_response(response):
return json.loads(response.content)['data']
@pytest.fixture
def client():
return Client()
@pytest.fixture(params=['data.csv', 'data.ods', 'data.xls', 'data.xlsx'])
def filetype(request):
return request.param
def test_default_column_keynames(setup, filetype):
csvdata = CsvDataSource.objects.create(csv_file=File(StringIO(get_file_content(filetype)), filetype),
sheet_name='Feuille2',
slug='test2',
title='a title',
description='a description')
assert len(csvdata.columns_keynames.split(',')) == 2
assert 'id' in csvdata.columns_keynames
assert 'text' in csvdata.columns_keynames
def test_sheet_name_error(setup, app, filetype, admin_user):
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
app = login(app)
resp = app.get('/manage/csvdatasource/test/edit')
edit_form = resp.forms[0]
edit_form['sheet_name'] = ''
resp = edit_form.submit()
if filetype != 'data.csv':
assert resp.status_code == 200
assert 'You must specify a sheet name' in resp.body
else:
assert resp.status_code == 302
def test_unfiltered_data(client, setup, filetype):
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
resp = client.get(url)
result = parse_response(resp)
for item in result:
assert 'field' in item
assert 'another_field' in item
def test_empty_file(client, setup):
csvdata, url = setup(
'field,,another_field,',
filename='data-empty.ods',
data=get_file_content('data-empty.ods'))
resp = client.get(url)
result = parse_response(resp)
assert len(result) == 0
def test_view_manage_page(setup, app, filetype, admin_user):
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
app = login(app)
app.get(csvdata.get_absolute_url())
def test_good_filter_data(client, setup, filetype):
filter_criteria = 'Zakia'
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filters = {'text': filter_criteria}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result)
for item in result:
assert 'id' in item
assert 'text' in item
assert filter_criteria in item['text']
def test_bad_filter_data(client, setup, filetype):
filter_criteria = 'bad'
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filters = {'text': filter_criteria}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 0
def test_useless_filter_data(client, setup, filetype):
csvdata, url = setup('id,,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Ali'}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 20
def test_columns_keynames_with_spaces(client, setup, filetype):
csvdata, url = setup('id , , nom,text , ', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Yaniss'}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 1
def test_skipped_header_data(client, setup, filetype):
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype), skip_header=True)
filters = {'q': 'Eliot'}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 0
def test_data(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Sacha'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0] == {'id': '59', 'text': 'Sacha',
'fam': '2431', 'sexe': 'H'}
def test_unicode_filter_data(client, setup, filetype):
filter_criteria = u'Benoît'
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filters = {'text': filter_criteria}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result)
for item in result:
assert 'id' in item
assert 'text' in item
assert filter_criteria in item['text']
def test_unicode_case_insensitive_filter_data(client, setup, filetype):
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filter_criteria = u'anaëlle'
filters = {'text': filter_criteria, 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result)
for item in result:
assert 'id' in item
assert 'text' in item
assert filter_criteria.lower() in item['text'].lower()
def test_data_bom(client, setup):
csvdata, url = setup('fam,id,, text,sexe ', data=data_bom)
filters = {'text': 'Eliot'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0] == {'id': '69981', 'text': 'Eliot', 'fam': '121', 'sexe': 'H'}
def test_multi_filter(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'sexe': 'F'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0] == {'id': '6', 'text': 'Shanone',
'fam': '525', 'sexe': 'F'}
assert len(result) == 10
def test_query(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'q': 'liot'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
def test_query_insensitive_and_unicode(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'q': 'elIo', 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
filters['q'] = 'élIo'
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
def test_query_insensitive_and_filter(client, setup, filetype):
csvdata, url = setup('fam,id,,text,sexe', filename=filetype, data=get_file_content(filetype))
filters = {'q': 'elIo', 'sexe': 'H', 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
def test_dialect(client, setup):
csvdata, url = setup(data=data)
expected = {
'lineterminator': '\r\n',
'skipinitialspace': False,
'quoting': 0,
'delimiter': ';',
'quotechar': '"',
'doublequote': False
}
assert expected == csvdata.dialect_options
filters = {'id': '22', 'fname': 'Sandra'}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 1
assert result[0]['id'] == '22'
assert result[0]['lname'] == 'MARTIN'
def test_on_the_fly_dialect_detection(client, setup):
# fake a connector that was not initialized during .save(), because it's
# been migrated and we didn't do dialect detection at save() time.
csvdata, url = setup(data=data)
CsvDataSource.objects.all().update(_dialect_options=None)
resp = client.get(url)
result = json.loads(resp.content)
assert result['err'] == 0
assert len(result['data']) == 20
def test_missing_columns(client, setup):
csvdata, url = setup(data=data + 'A;B;C\n')
resp = client.get(url)
result = json.loads(resp.content)
assert result['err'] == 0
assert len(result['data']) == 21
assert result['data'][-1] == {'lname': 'C', 'sex': None, 'id': 'B', 'fname': None, 'fam': 'A'}
def test_unknown_sheet_name(client, setup):
csvdata, url = setup('field,,another_field,', filename='data2.ods', data=get_file_content('data2.ods'))
csvdata.sheet_name = 'unknown'
csvdata.save()
resp = client.get(url)
result = json.loads(resp.content)
assert len(result['data']) == 20
def test_cache_new_shorter_file(client, setup):
csvdata, url = setup(data=data + 'A;B;C\n')
resp = client.get(url)
result = json.loads(resp.content)
assert result['err'] == 0
assert len(result['data']) == 21
csvdata.csv_file = File(StringIO(data), 'data.csv')
csvdata.save()
resp = client.get(url)
result = json.loads(resp.content)
assert len(result['data']) == 20
def test_query_array(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert len(response.json['data'])
for row in response.json['data']:
assert len(row) == 2
assert isinstance(row[0], int)
assert isinstance(row[1], six.text_type)
def test_query_q_filter(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'text:prenom'])
query.save()
response = app.get(url + '?q=Sandra')
assert response.json['err'] == 0
assert len(response.json['data']) == 2
response = app.get(url + '?q=sandra')
assert response.json['err'] == 0
assert len(response.json['data']) == 2
def test_query_dict(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='dict')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert len(response.json['data'])
for row in response.json['data']:
assert len(row) == 2
assert isinstance(row['id'], int)
assert isinstance(row['prenom'], six.text_type)
def test_query_tuples(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='tuples')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert len(response.json['data'])
for row in response.json['data']:
assert len(row) == 2
assert row[0][0] == 'id'
assert isinstance(row[0][1], int)
assert row[1][0] == 'prenom'
assert isinstance(row[1][1], six.text_type)
def test_query_onerow(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='onerow')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.filters = 'int(id) == 525'
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert isinstance(response.json['data'], dict)
assert response.json['data']['prenom'] == 'Shanone'
def test_query_one(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='one')
query.projections = 'wtf:prenom'
query.filters = 'int(id) == 525'
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert response.json['data'] == 'Shanone'
def test_query_filter_param(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='one')
query.projections = 'wtf:prenom'
query.filters = 'int(id) == int(query.get("foobar"))'
query.save()
response = app.get(url + '?foobar=525')
assert response.json['err'] == 0
assert response.json['data'] == 'Shanone'
def test_query_distinct(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, distinct='sexe')
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert isinstance(response.json['data'], list)
assert len(response.json['data']) == 2
def test_query_keyed_distinct(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, distinct='nom', structure='keyed-distinct')
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert isinstance(response.json['data'], dict)
assert response.json['data']['MARTIN']['prenom'] == 'Sandra'
def test_query_order(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, order='prenom.lower()')
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert isinstance(response.json['data'], list)
assert response.json['data'] == sorted(response.json['data'], key=lambda row:
row['prenom'].lower())
def test_query_error(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'no such query'
query = Query(slug='query-1_', resource=csvdata)
query.save()
query.projections = '1a:prenom'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid projection name'
assert response.json['data'] == '1a'
query.projections = 'id:prenom.'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid projection expression'
assert response.json['data']['name'] == 'id'
assert response.json['data']['expr'] == 'prenom.'
query.projections = 'id:prenom not'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid projection expression'
assert response.json['data']['name'] == 'id'
assert response.json['data']['expr'] == 'prenom not'
query.projections = 'id:zob'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid projection expression'
assert response.json['data']['name'] == 'id'
assert response.json['data']['expr'] == 'zob'
assert 'row' in response.json['data']
query.projections = ''
query.filters = 'prenom.'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid filters expression'
assert response.json['data']['expr'] == 'prenom.'
query.filters = 'zob'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid filters expression'
assert response.json['data']['error'] == "name 'zob' is not defined"
assert response.json['data']['expr'] == 'zob'
assert 'row' in response.json['data']
query.filters = ''
query.order = 'prenom.'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid order expression'
assert response.json['data']['expr'] == 'prenom.'
query.order = 'zob'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid order expression'
assert response.json['data']['expr'] == 'zob'
assert 'row' in response.json['data']
query.order = ''
query.distinct = 'prenom.'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid distinct expression'
assert response.json['data']['expr'] == 'prenom.'
query.distinct = 'zob'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid distinct expression'
assert response.json['data']['expr'] == 'zob'
assert 'row' in response.json['data']
def test_edit_connector_queries(admin_user, app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('view-connector', kwargs={'connector': 'csvdatasource', 'slug': csvdata.slug})
resp = app.get(url)
assert 'New Query' not in resp.body
new_query_url = reverse('csv-new-query', kwargs={'connector_slug': csvdata.slug})
resp = app.get(new_query_url, status=302)
assert resp.location.endswith('/login/?next=%s' % new_query_url)
app = login(app)
resp = app.get(url)
resp = resp.click('New Query')
resp.form['slug'] = 'foobar'
resp.form['label'] = 'Lucy alone'
resp.form['filters'] = 'int(id) =< 525'
resp = resp.form.submit()
assert 'Syntax error' in resp.body
resp.form['filters'] = 'int(id) == 525'
resp.form['projections'] = 'id'
resp = resp.form.submit()
assert 'Syntax error' in resp.body
resp.form['projections'] = 'id:id\nprenom:prenom'
resp = resp.form.submit().follow()
resp = resp.click('foobar', index=1) # 0th is the newly created endpoint
resp.form['filters'] = 'int(id) == 511'
resp.form['description'] = 'in the sky without diamonds'
resp = resp.form.submit().follow()
assert 'Lucy alone' in resp.body
assert 'in the sky without diamonds' in resp.body
resp = resp.click('foobar', index=0) # 0th is the newly created endpoint
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['prenom'] == 'Lucie'
def test_download_file(app, setup, filetype, admin_user):
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
assert '/login' in app.get('/manage/csvdatasource/test/download/').location
app = login(app)
resp = app.get('/manage/csvdatasource/test/download/', status=200)
if filetype == 'data.csv':
assert resp.headers['Content-Type'] == 'text/csv; charset=utf-8'
assert resp.headers['Content-Length'] == str(len(data))
elif filetype == 'data.ods':
assert resp.headers['Content-Type'] == 'application/vnd.oasis.opendocument.spreadsheet'
elif filetype == 'data.xls':
assert resp.headers['Content-Type'] == 'application/vnd.ms-excel'
elif filetype == 'data.xlsx':
assert resp.headers['Content-Type'] == 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
def test_query_filter_multiline(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata)
query.filters = '\n'.join(['int(id) <= 525', 'int(id) >= 511'])
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert len(response.json['data']) == 2
def test_query_builtin_id_filter(app, setup, filetype):
csvdata, _url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata)
query.save()
assert len(app.get(url).json['data']) == 20
assert len(app.get(url + '?id=121').json['data']) == 1
assert app.get(url + '?id=121').json['data'][0]['prenom'] == 'Eliot'
assert app.get(url + '?id=525').json['data'][0]['prenom'] == 'Shanone'
# check when combined with another filter
query.filters = 'sexe == "H"'
query.save()
assert len(app.get(url).json['data']) == 10
assert app.get(url + '?id=121').json['data'][0]['prenom'] == 'Eliot'
assert len(app.get(url + '?id=525').json['data']) == 0
def test_delete_connector_query(admin_user, app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('view-connector', kwargs={'connector': 'csvdatasource', 'slug': csvdata.slug})
resp = app.get(url)
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.save()
app = login(app)
resp = app.get(url)
resp = resp.click('delete query')
assert 'Are you sure' in resp.body
resp = resp.form.submit('delete')
resp = resp.follow()
assert 'No query are defined.' in resp.body
def test_csv_sniffer(admin_user, app):
app = login(app)
resp = app.get('/manage/csvdatasource/add')
form = resp.form
form.set('title', 'Title')
form.set('description', 'Description')
form.set('csv_file', webtest.Upload('test.csv', '\n', 'application/octet-stream'))
resp = form.submit()
assert 'Could not detect CSV dialect' in resp
def test_change_csv_command(setup):
csv, url = setup(data=data)
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data-empty.ods'))
csv.refresh_from_db()
assert list(csv.get_rows()) == []
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data.csv'))
csv.refresh_from_db()
assert list(csv.get_rows()) != []
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data-empty.ods'))
csv.refresh_from_db()
assert list(csv.get_rows()) == []
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data.ods'), sheet_name='Feuille2')
csv.refresh_from_db()
assert list(csv.get_rows()) != []
@pytest.mark.parametrize('payload,expected', [
({}, 20),
({'limit': 10}, 10),
({'limit': 10, 'offset': 0}, 10),
({'limit': 10, 'offset': 15}, 5),
({'limit': 10, 'offset': 42}, 0),
])
def test_pagination(app, setup, filetype, payload, expected):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
data=get_file_content(filetype))
# data endpoint
response = app.get(url + '?' + urlencode(payload))
assert len(response.json['data']) == expected
# query endpoint
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'text:prenom'])
query.save()
response = app.get(url + '?' + urlencode(payload))
assert len(response.json['data']) == expected
@pytest.mark.parametrize('payload,expected_error', [
({'limit': 'bla'}, 'invalid limit parameter'),
({'limit': 0}, 'invalid limit parameter'),
({'limit': -1}, 'invalid limit parameter'),
({'limit': 10, 'offset': 'bla'}, 'invalid offset parameter'),
({'limit': 10, 'offset': -1}, 'invalid offset parameter'),
])
def test_pagination_error(app, setup, filetype, payload, expected_error):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
data=get_file_content(filetype))
# data endpoint
response = app.get(url + '?' + urlencode(payload))
assert response.json['err'] == 1
assert response.json['err_desc'] == expected_error
# query endpoint
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'text:prenom'])
query.save()
response = app.get(url + '?' + urlencode(payload))
assert response.json['err'] == 1
assert response.json['err_desc'] == expected_error