passerelle/tests/test_csv_datasource.py

847 lines
30 KiB
Python

# -*- coding: utf-8 -*-
#
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import time
import pytest
import mock
import six
from django.core.files import File
from django.core.urlresolvers import reverse
from django.contrib.contenttypes.models import ContentType
from django.test import Client, override_settings
from django.core.management import call_command
from django.utils.encoding import force_bytes, force_str, force_text
from django.utils.six import StringIO
from django.utils.six.moves.urllib.parse import urlencode
from passerelle.base.models import ApiUser, AccessRight
from passerelle.compat import json_loads
from passerelle.apps.csvdatasource.models import CsvDataSource, Query, TableRow
from test_manager import login, admin_user
import webtest
data = """121;69981;DELANOUE;Eliot;H
525;6;DANIEL WILLIAMS;Shanone;F
253;67742;MARTIN;Sandra;F
511;38142;MARCHETTI;Lucie;F
235;22;MARTIN;Sandra;F
620;52156;ARNAUD;Mathis;H
902;36;BRIGAND;Coline;F
2179;48548;THEBAULT;Salima;F
3420;46;WILSON-LUZAYADIO;Anaëlle;F
1421;55486;WONE;Fadouma;F
2841;51;FIDJI;Zakia;F
2431;59;BELICARD;Sacha;H
4273;60;GOUBERT;Adrien;H
4049;64;MOVSESSIAN;Dimitri;H
4605;67;ABDOU BACAR;Kyle;H
4135;22231;SAVERIAS;Marius;H
4809;75;COROLLER;Maelys;F
5427;117;KANTE;Aliou;H
116642;118;ZAHMOUM;Yaniss;H
216352;38;Dupont;Benoît;H
"""
data_bom = force_str(force_text(data, 'utf-8').encode('utf-8-sig'))
pytestmark = pytest.mark.django_db
TEST_BASE_DIR = os.path.join(os.path.dirname(__file__), 'data', 'csvdatasource')
def get_file_content(filename):
return open(os.path.join(TEST_BASE_DIR, filename), 'rb')
@pytest.fixture
def setup():
def maker(columns_keynames='fam,id,lname,fname,sex', filename='data.csv',
sheet_name='Feuille2', data='', skip_header=False):
api = ApiUser.objects.create(
username='all',
keytype='',
key='')
csv = CsvDataSource.objects.create(
csv_file=File(data, filename),
sheet_name=sheet_name,
columns_keynames=columns_keynames,
slug='test',
title='a title',
description='a description',
skip_header=skip_header)
assert TableRow.objects.filter(resource=csv).count() == len(csv.get_rows())
obj_type = ContentType.objects.get_for_model(csv)
AccessRight.objects.create(
codename='can_access',
apiuser=api,
resource_type=obj_type,
resource_pk=csv.pk)
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csv.slug,
'endpoint': 'data',
})
return csv, url
return maker
def parse_response(response):
return json_loads(response.content)['data']
@pytest.fixture
def client():
return Client()
@pytest.fixture(params=['data.csv', 'data.ods', 'data.xls', 'data.xlsx'])
def filetype(request):
return request.param
def test_default_column_keynames(setup, filetype):
csvdata = CsvDataSource.objects.create(csv_file=File(get_file_content(filetype), filetype),
sheet_name='Feuille2',
slug='test2',
title='a title',
description='a description')
assert len(csvdata.columns_keynames.split(',')) == 2
assert 'id' in csvdata.columns_keynames
assert 'text' in csvdata.columns_keynames
def test_sheet_name_error(setup, app, filetype, admin_user):
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
app = login(app)
resp = app.get('/manage/csvdatasource/test/edit')
edit_form = resp.forms[0]
edit_form['sheet_name'] = ''
resp = edit_form.submit()
if filetype != 'data.csv':
assert resp.status_code == 200
assert 'You must specify a sheet name' in resp.text
else:
assert resp.status_code == 302
def test_unfiltered_data(client, setup, filetype):
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
resp = client.get(url)
result = parse_response(resp)
for item in result:
assert 'field' in item
assert 'another_field' in item
def test_empty_file(client, setup):
csvdata, url = setup(
'field,,another_field,',
filename='data-empty.ods',
data=get_file_content('data-empty.ods'))
resp = client.get(url)
result = parse_response(resp)
assert len(result) == 0
def test_view_manage_page(setup, app, filetype, admin_user):
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
app = login(app)
app.get(csvdata.get_absolute_url())
def test_good_filter_data(client, setup, filetype):
filter_criteria = 'Zakia'
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filters = {'text': filter_criteria}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result)
for item in result:
assert 'id' in item
assert 'text' in item
assert filter_criteria in item['text']
def test_bad_filter_data(client, setup, filetype):
filter_criteria = 'bad'
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filters = {'text': filter_criteria}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 0
def test_useless_filter_data(client, setup, filetype):
csvdata, url = setup('id,,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Ali'}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 20
def test_columns_keynames_with_spaces(client, setup, filetype):
csvdata, url = setup('id , , nom,text , ', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Yaniss'}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 1
def test_skipped_header_data(client, setup, filetype):
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype), skip_header=True)
filters = {'q': 'Eliot'}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 0
def test_data(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Sacha'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0] == {'id': '59', 'text': 'Sacha',
'fam': '2431', 'sexe': 'H'}
def test_unicode_filter_data(client, setup, filetype):
filter_criteria = u'Benoît'
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filters = {'text': filter_criteria}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result)
for item in result:
assert 'id' in item
assert 'text' in item
assert filter_criteria in item['text']
def test_unicode_case_insensitive_filter_data(client, setup, filetype):
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filter_criteria = u'anaëlle'
filters = {'text': filter_criteria, 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result)
for item in result:
assert 'id' in item
assert 'text' in item
assert filter_criteria.lower() in item['text'].lower()
def test_data_bom(client, setup):
csvdata, url = setup('fam,id,, text,sexe ', data=StringIO(data_bom))
filters = {'text': 'Eliot'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0] == {'id': '69981', 'text': 'Eliot', 'fam': '121', 'sexe': 'H'}
def test_multi_filter(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'sexe': 'F'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0] == {'id': '6', 'text': 'Shanone',
'fam': '525', 'sexe': 'F'}
assert len(result) == 10
def test_query(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'q': 'liot'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
def test_query_insensitive_and_unicode(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'q': 'elIo', 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
filters['q'] = 'élIo'
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
def test_query_insensitive_and_filter(client, setup, filetype):
csvdata, url = setup('fam,id,,text,sexe', filename=filetype, data=get_file_content(filetype))
filters = {'q': 'elIo', 'sexe': 'H', 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
def test_dialect(client, setup):
csvdata, url = setup(data=StringIO(data))
expected = {
'lineterminator': '\r\n',
'skipinitialspace': False,
'quoting': 0,
'delimiter': ';',
'quotechar': '"',
'doublequote': False
}
assert expected == csvdata.dialect_options
filters = {'id': '22', 'fname': 'Sandra'}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result) == 1
assert result[0]['id'] == '22'
assert result[0]['lname'] == 'MARTIN'
def test_on_the_fly_dialect_detection(client, setup):
# fake a connector that was not initialized during .save(), because it's
# been migrated and we didn't do dialect detection at save() time.
csvdata, url = setup(data=StringIO(data))
CsvDataSource.objects.all().update(_dialect_options=None)
resp = client.get(url)
result = json_loads(resp.content)
assert result['err'] == 0
assert len(result['data']) == 20
def test_missing_columns(client, setup):
csvdata, url = setup(data=StringIO(data + 'A;B;C\n'))
resp = client.get(url)
result = json_loads(resp.content)
assert result['err'] == 0
assert len(result['data']) == 21
assert result['data'][-1] == {'lname': 'C', 'sex': None, 'id': 'B', 'fname': None, 'fam': 'A'}
def test_unknown_sheet_name(client, setup):
csvdata, url = setup('field,,another_field,', filename='data2.ods', data=get_file_content('data2.ods'))
csvdata.sheet_name = 'unknown'
csvdata.save()
resp = client.get(url)
result = json_loads(resp.content)
assert len(result['data']) == 20
def test_cache_new_shorter_file(client, setup):
csvdata, url = setup(data=StringIO(data + 'A;B;C\n'))
resp = client.get(url)
result = json_loads(resp.content)
assert result['err'] == 0
assert len(result['data']) == 21
csvdata.csv_file = File(StringIO(data), 'data.csv')
csvdata.save()
resp = client.get(url)
result = json_loads(resp.content)
assert len(result['data']) == 20
def test_query_array(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert len(response.json['data'])
for row in response.json['data']:
assert len(row) == 2
assert isinstance(row[0], int)
assert isinstance(row[1], six.text_type)
def test_query_q_filter(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'text:prenom'])
query.save()
response = app.get(url + '?q=Sandra')
assert response.json['err'] == 0
assert len(response.json['data']) == 2
response = app.get(url + '?q=sandra')
assert response.json['err'] == 0
assert len(response.json['data']) == 2
def test_query_dict(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='dict')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert len(response.json['data'])
for row in response.json['data']:
assert len(row) == 2
assert isinstance(row['id'], int)
assert isinstance(row['prenom'], six.text_type)
def test_query_tuples(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='tuples')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert len(response.json['data'])
for row in response.json['data']:
assert len(row) == 2
assert row[0][0] == 'id'
assert isinstance(row[0][1], int)
assert row[1][0] == 'prenom'
assert isinstance(row[1][1], six.text_type)
def test_query_onerow(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='onerow')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.filters = 'int(id) == 525'
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert isinstance(response.json['data'], dict)
assert response.json['data']['prenom'] == 'Shanone'
def test_query_one(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='one')
query.projections = 'wtf:prenom'
query.filters = 'int(id) == 525'
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert response.json['data'] == 'Shanone'
def test_query_filter_param(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='one')
query.projections = 'wtf:prenom'
query.filters = 'int(id) == int(query.get("foobar"))'
query.save()
response = app.get(url + '?foobar=525')
assert response.json['err'] == 0
assert response.json['data'] == 'Shanone'
def test_query_distinct(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, distinct='sexe')
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert isinstance(response.json['data'], list)
assert len(response.json['data']) == 2
def test_query_keyed_distinct(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, distinct='nom', structure='keyed-distinct')
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert isinstance(response.json['data'], dict)
assert response.json['data']['MARTIN']['prenom'] == 'Sandra'
def test_query_order(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, order='prenom.lower()')
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert isinstance(response.json['data'], list)
assert response.json['data'] == sorted(response.json['data'], key=lambda row:
row['prenom'].lower())
def test_query_error(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'no such query'
query = Query(slug='query-1_', resource=csvdata)
query.save()
query.projections = '1a:prenom'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid projection name'
assert response.json['data'] == '1a'
query.projections = 'id:prenom.'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid projection expression'
assert response.json['data']['name'] == 'id'
assert response.json['data']['expr'] == 'prenom.'
query.projections = 'id:prenom not'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid projection expression'
assert response.json['data']['name'] == 'id'
assert response.json['data']['expr'] == 'prenom not'
query.projections = 'id:zob'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid projection expression'
assert response.json['data']['name'] == 'id'
assert response.json['data']['expr'] == 'zob'
assert 'row' in response.json['data']
query.projections = ''
query.filters = 'prenom.'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid filters expression'
assert response.json['data']['expr'] == 'prenom.'
query.filters = 'zob'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid filters expression'
assert response.json['data']['error'] == "name 'zob' is not defined"
assert response.json['data']['expr'] == 'zob'
assert 'row' in response.json['data']
query.filters = ''
query.order = 'prenom.'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid order expression'
assert response.json['data']['expr'] == 'prenom.'
query.order = 'zob'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid order expression'
assert response.json['data']['expr'] == 'zob'
assert 'row' in response.json['data']
query.order = ''
query.distinct = 'prenom.'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid distinct expression'
assert response.json['data']['expr'] == 'prenom.'
query.distinct = 'zob'
query.save()
response = app.get(url)
assert response.json['err'] == 1
assert response.json['err_desc'] == 'invalid distinct expression'
assert response.json['data']['expr'] == 'zob'
assert 'row' in response.json['data']
def test_edit_connector_queries(admin_user, app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('view-connector', kwargs={'connector': 'csvdatasource', 'slug': csvdata.slug})
resp = app.get(url)
assert 'New Query' not in resp.text
new_query_url = reverse('csv-new-query', kwargs={'connector_slug': csvdata.slug})
resp = app.get(new_query_url, status=302)
assert resp.location.endswith('/login/?next=%s' % new_query_url)
app = login(app)
resp = app.get(url)
resp = resp.click('New Query')
resp.form['slug'] = 'foobar'
resp.form['label'] = 'Lucy alone'
resp.form['filters'] = 'int(id) =< 525'
resp = resp.form.submit()
assert 'Syntax error' in resp.text
resp.form['filters'] = 'int(id) == 525'
resp.form['projections'] = 'id'
resp = resp.form.submit()
assert 'Syntax error' in resp.text
resp.form['projections'] = 'id:id\nprenom:prenom'
resp = resp.form.submit().follow()
resp = resp.click('foobar', index=1) # 0th is the newly created endpoint
resp.form['filters'] = 'int(id) == 511'
resp.form['description'] = 'in the sky without diamonds'
resp = resp.form.submit().follow()
assert 'Lucy alone' in resp.text
assert 'in the sky without diamonds' in resp.text
resp = resp.click('foobar', index=0) # 0th is the newly created endpoint
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['prenom'] == 'Lucie'
def test_download_file(app, setup, filetype, admin_user):
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
assert '/login' in app.get('/manage/csvdatasource/test/download/').location
app = login(app)
resp = app.get('/manage/csvdatasource/test/download/', status=200)
if filetype == 'data.csv':
assert resp.headers['Content-Type'] == 'text/csv; charset=utf-8'
assert resp.headers['Content-Length'] == str(len(force_bytes(data)))
elif filetype == 'data.ods':
assert resp.headers['Content-Type'] == 'application/vnd.oasis.opendocument.spreadsheet'
elif filetype == 'data.xls':
assert resp.headers['Content-Type'] == 'application/vnd.ms-excel'
elif filetype == 'data.xlsx':
assert resp.headers['Content-Type'] == 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
def test_query_filter_multiline(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata)
query.filters = '\n'.join(['int(id) <= 525', 'int(id) >= 511'])
query.save()
response = app.get(url)
assert response.json['err'] == 0
assert len(response.json['data']) == 2
def test_query_builtin_id_filter(app, setup, filetype):
csvdata, _url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata)
query.save()
assert len(app.get(url).json['data']) == 20
assert len(app.get(url + '?id=121').json['data']) == 1
assert app.get(url + '?id=121').json['data'][0]['prenom'] == 'Eliot'
assert app.get(url + '?id=525').json['data'][0]['prenom'] == 'Shanone'
# check when combined with another filter
query.filters = 'sexe == "H"'
query.save()
assert len(app.get(url).json['data']) == 10
assert app.get(url + '?id=121').json['data'][0]['prenom'] == 'Eliot'
assert len(app.get(url + '?id=525').json['data']) == 0
def test_delete_connector_query(admin_user, app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('view-connector', kwargs={'connector': 'csvdatasource', 'slug': csvdata.slug})
resp = app.get(url)
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
query.save()
app = login(app)
resp = app.get(url)
resp = resp.click('delete query')
assert 'Are you sure' in resp.text
resp = resp.form.submit('delete')
resp = resp.follow()
assert 'No query are defined.' in resp.text
def test_csv_sniffer(admin_user, app):
app = login(app)
resp = app.get('/manage/csvdatasource/add')
form = resp.form
form.set('title', 'Title')
form.set('description', 'Description')
form.set('csv_file', webtest.Upload('test.csv', force_bytes('\n'), 'application/octet-stream'))
resp = form.submit()
assert 'Could not detect CSV dialect' in resp
def test_change_csv_command(setup):
csv, url = setup(data=StringIO(data))
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data-empty.ods'))
csv.refresh_from_db()
assert list(csv.get_rows()) == []
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data.csv'))
csv.refresh_from_db()
assert list(csv.get_rows()) != []
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data-empty.ods'))
csv.refresh_from_db()
assert list(csv.get_rows()) == []
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data.ods'), sheet_name='Feuille2')
csv.refresh_from_db()
assert list(csv.get_rows()) != []
@pytest.mark.parametrize('payload,expected', [
({}, 20),
({'limit': 10}, 10),
({'limit': 10, 'offset': 0}, 10),
({'limit': 10, 'offset': 15}, 5),
({'limit': 10, 'offset': 42}, 0),
])
def test_pagination(app, setup, filetype, payload, expected):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
data=get_file_content(filetype))
# data endpoint
response = app.get(url + '?' + urlencode(payload))
assert len(response.json['data']) == expected
# query endpoint
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'text:prenom'])
query.save()
response = app.get(url + '?' + urlencode(payload))
assert len(response.json['data']) == expected
@pytest.mark.parametrize('payload,expected_error', [
({'limit': 'bla'}, 'invalid limit parameter'),
({'limit': 0}, 'invalid limit parameter'),
({'limit': -1}, 'invalid limit parameter'),
({'limit': 10, 'offset': 'bla'}, 'invalid offset parameter'),
({'limit': 10, 'offset': -1}, 'invalid offset parameter'),
])
def test_pagination_error(app, setup, filetype, payload, expected_error):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
data=get_file_content(filetype))
# data endpoint
response = app.get(url + '?' + urlencode(payload))
assert response.json['err'] == 1
assert response.json['err_desc'] == expected_error
# query endpoint
url = reverse('generic-endpoint', kwargs={
'connector': 'csvdatasource',
'slug': csvdata.slug,
'endpoint': 'query/query-1_/',
})
query = Query(slug='query-1_', resource=csvdata, structure='array')
query.projections = '\n'.join(['id:int(id)', 'text:prenom'])
query.save()
response = app.get(url + '?' + urlencode(payload))
assert response.json['err'] == 1
assert response.json['err_desc'] == expected_error
def test_csv_dst(app, setup, admin_user):
csvdata, url = setup(
'field,,another_field,',
filename='data-empty.ods',
data=get_file_content('data-empty.ods'))
with mock.patch('os.fstat') as mocked_fstat, override_settings(TIME_ZONE='Europe/Paris'):
class MockStatResult:
st_ctime = time.mktime((2019, 10, 27, 2, 20, 50, 0, 0, 0))
mocked_fstat.return_value = MockStatResult()
# visit endpoint
app.get(url)
# visit manager page
app = login(app)
response = app.get(csvdata.get_absolute_url())
assert 'Oct. 27, 2019, 2:20 a.m.' in response