858 lines
31 KiB
Python
858 lines
31 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2016 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import os
|
|
import time
|
|
import pytest
|
|
import mock
|
|
|
|
import six
|
|
|
|
from django.core.files import File
|
|
from django.core.urlresolvers import reverse
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.test import Client, override_settings
|
|
from django.core.management import call_command
|
|
from django.utils.encoding import force_bytes, force_str, force_text
|
|
from django.utils.six import StringIO
|
|
from django.utils.six.moves.urllib.parse import urlencode
|
|
|
|
from passerelle.base.models import ApiUser, AccessRight
|
|
from passerelle.compat import json_loads
|
|
from passerelle.apps.csvdatasource.models import CsvDataSource, Query, TableRow
|
|
|
|
from test_manager import login, admin_user
|
|
|
|
import webtest
|
|
|
|
data = """121;69981;DELANOUE;Eliot;H
|
|
525;6;DANIEL WILLIAMS;Shanone;F
|
|
253;67742;MARTIN;Sandra;F
|
|
511;38142;MARCHETTI;Lucie;F
|
|
235;22;MARTIN;Sandra;F
|
|
620;52156;ARNAUD;Mathis;H
|
|
902;36;BRIGAND;Coline;F
|
|
2179;48548;THEBAULT;Salima;F
|
|
3420;46;WILSON-LUZAYADIO;Anaëlle;F
|
|
1421;55486;WONE;Fadouma;F
|
|
2841;51;FIDJI;Zakia;F
|
|
2431;59;BELICARD;Sacha;H
|
|
4273;60;GOUBERT;Adrien;H
|
|
4049;64;MOVSESSIAN;Dimitri;H
|
|
4605;67;ABDOU BACAR;Kyle;H
|
|
4135;22231;SAVERIAS;Marius;H
|
|
4809;75;COROLLER;Maelys;F
|
|
5427;117;KANTE;Aliou;H
|
|
116642;118;ZAHMOUM;Yaniss;H
|
|
216352;38;Dupont;Benoît;H
|
|
"""
|
|
|
|
data_bom = force_str(force_text(data, 'utf-8').encode('utf-8-sig'))
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
TEST_BASE_DIR = os.path.join(os.path.dirname(__file__), 'data', 'csvdatasource')
|
|
|
|
|
|
def get_file_content(filename):
|
|
return open(os.path.join(TEST_BASE_DIR, filename), 'rb')
|
|
|
|
|
|
@pytest.fixture
|
|
def setup():
|
|
def maker(columns_keynames='fam,id,lname,fname,sex', filename='data.csv',
|
|
sheet_name='Feuille2', data='', skip_header=False):
|
|
api = ApiUser.objects.create(
|
|
username='all',
|
|
keytype='',
|
|
key='')
|
|
csv = CsvDataSource.objects.create(
|
|
csv_file=File(data, filename),
|
|
sheet_name=sheet_name,
|
|
columns_keynames=columns_keynames,
|
|
slug='test',
|
|
title='a title',
|
|
description='a description',
|
|
skip_header=skip_header)
|
|
assert TableRow.objects.filter(resource=csv).count() == len(csv.get_rows())
|
|
obj_type = ContentType.objects.get_for_model(csv)
|
|
AccessRight.objects.create(
|
|
codename='can_access',
|
|
apiuser=api,
|
|
resource_type=obj_type,
|
|
resource_pk=csv.pk)
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csv.slug,
|
|
'endpoint': 'data',
|
|
})
|
|
return csv, url
|
|
|
|
return maker
|
|
|
|
|
|
def parse_response(response):
|
|
return json_loads(response.content)['data']
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
return Client()
|
|
|
|
|
|
@pytest.fixture(params=['data.csv', 'data.ods', 'data.xls', 'data.xlsx'])
|
|
def filetype(request):
|
|
return request.param
|
|
|
|
def test_default_column_keynames(setup, filetype):
|
|
csvdata = CsvDataSource.objects.create(csv_file=File(get_file_content(filetype), filetype),
|
|
sheet_name='Feuille2',
|
|
slug='test2',
|
|
title='a title',
|
|
description='a description')
|
|
assert len(csvdata.columns_keynames.split(',')) == 2
|
|
assert 'id' in csvdata.columns_keynames
|
|
assert 'text' in csvdata.columns_keynames
|
|
|
|
|
|
def test_sheet_name_error(setup, app, filetype, admin_user):
|
|
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
|
|
app = login(app)
|
|
resp = app.get('/manage/csvdatasource/test/edit')
|
|
edit_form = resp.forms[0]
|
|
edit_form['sheet_name'] = ''
|
|
resp = edit_form.submit()
|
|
|
|
if filetype != 'data.csv':
|
|
assert resp.status_code == 200
|
|
assert 'You must specify a sheet name' in resp.text
|
|
else:
|
|
assert resp.status_code == 302
|
|
|
|
|
|
def test_unfiltered_data(client, setup, filetype):
|
|
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
|
|
resp = client.get(url)
|
|
result = parse_response(resp)
|
|
for item in result:
|
|
assert 'field' in item
|
|
assert 'another_field' in item
|
|
|
|
|
|
def test_empty_file(client, setup):
|
|
csvdata, url = setup(
|
|
'field,,another_field,',
|
|
filename='data-empty.ods',
|
|
data=get_file_content('data-empty.ods'))
|
|
resp = client.get(url)
|
|
result = parse_response(resp)
|
|
assert len(result) == 0
|
|
|
|
|
|
def test_view_manage_page(setup, app, filetype, admin_user):
|
|
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
app = login(app)
|
|
app.get(csvdata.get_absolute_url())
|
|
|
|
|
|
def test_good_filter_data(client, setup, filetype):
|
|
filter_criteria = 'Zakia'
|
|
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
filters = {'text': filter_criteria}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert len(result)
|
|
for item in result:
|
|
assert 'id' in item
|
|
assert 'text' in item
|
|
assert filter_criteria in item['text']
|
|
|
|
|
|
def test_bad_filter_data(client, setup, filetype):
|
|
filter_criteria = 'bad'
|
|
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
filters = {'text': filter_criteria}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert len(result) == 0
|
|
|
|
|
|
def test_useless_filter_data(client, setup, filetype):
|
|
csvdata, url = setup('id,,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
filters = {'text': 'Ali'}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert len(result) == 20
|
|
|
|
|
|
def test_columns_keynames_with_spaces(client, setup, filetype):
|
|
csvdata, url = setup('id , , nom,text , ', filename=filetype, data=get_file_content(filetype))
|
|
filters = {'text': 'Yaniss'}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert len(result) == 1
|
|
|
|
|
|
def test_skipped_header_data(client, setup, filetype):
|
|
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype), skip_header=True)
|
|
filters = {'q': 'Eliot'}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert len(result) == 0
|
|
|
|
|
|
def test_data(client, setup, filetype):
|
|
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
|
|
filters = {'text': 'Sacha'}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert result[0] == {'id': '59', 'text': 'Sacha',
|
|
'fam': '2431', 'sexe': 'H'}
|
|
|
|
|
|
def test_unicode_filter_data(client, setup, filetype):
|
|
filter_criteria = u'Benoît'
|
|
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
filters = {'text': filter_criteria}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert len(result)
|
|
for item in result:
|
|
assert 'id' in item
|
|
assert 'text' in item
|
|
assert filter_criteria in item['text']
|
|
|
|
|
|
def test_unicode_case_insensitive_filter_data(client, setup, filetype):
|
|
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
filter_criteria = u'anaëlle'
|
|
filters = {'text': filter_criteria, 'case-insensitive': ''}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert len(result)
|
|
for item in result:
|
|
assert 'id' in item
|
|
assert 'text' in item
|
|
assert filter_criteria.lower() in item['text'].lower()
|
|
|
|
|
|
def test_data_bom(client, setup):
|
|
csvdata, url = setup('fam,id,, text,sexe ', data=StringIO(data_bom))
|
|
filters = {'text': 'Eliot'}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert result[0] == {'id': '69981', 'text': 'Eliot', 'fam': '121', 'sexe': 'H'}
|
|
|
|
|
|
def test_multi_filter(client, setup, filetype):
|
|
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
|
|
filters = {'sexe': 'F'}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert result[0] == {'id': '6', 'text': 'Shanone',
|
|
'fam': '525', 'sexe': 'F'}
|
|
assert len(result) == 10
|
|
|
|
|
|
def test_query(client, setup, filetype):
|
|
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
|
|
filters = {'q': 'liot'}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert result[0]['text'] == 'Eliot'
|
|
assert len(result) == 1
|
|
|
|
|
|
def test_query_insensitive_and_unicode(client, setup, filetype):
|
|
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
|
|
filters = {'q': 'elIo', 'case-insensitive': ''}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert result[0]['text'] == 'Eliot'
|
|
assert len(result) == 1
|
|
filters['q'] = 'élIo'
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert result[0]['text'] == 'Eliot'
|
|
assert len(result) == 1
|
|
|
|
|
|
def test_query_insensitive_and_filter(client, setup, filetype):
|
|
csvdata, url = setup('fam,id,,text,sexe', filename=filetype, data=get_file_content(filetype))
|
|
filters = {'q': 'elIo', 'sexe': 'H', 'case-insensitive': ''}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert result[0]['text'] == 'Eliot'
|
|
assert len(result) == 1
|
|
|
|
|
|
def test_dialect(client, setup):
|
|
csvdata, url = setup(data=StringIO(data))
|
|
expected = {
|
|
'lineterminator': '\r\n',
|
|
'skipinitialspace': False,
|
|
'quoting': 0,
|
|
'delimiter': ';',
|
|
'quotechar': '"',
|
|
'doublequote': False
|
|
}
|
|
|
|
assert expected == csvdata.dialect_options
|
|
filters = {'id': '22', 'fname': 'Sandra'}
|
|
resp = client.get(url, filters)
|
|
result = parse_response(resp)
|
|
assert len(result) == 1
|
|
assert result[0]['id'] == '22'
|
|
assert result[0]['lname'] == 'MARTIN'
|
|
|
|
|
|
def test_on_the_fly_dialect_detection(client, setup):
|
|
# fake a connector that was not initialized during .save(), because it's
|
|
# been migrated and we didn't do dialect detection at save() time.
|
|
csvdata, url = setup(data=StringIO(data))
|
|
CsvDataSource.objects.all().update(_dialect_options=None)
|
|
resp = client.get(url)
|
|
result = json_loads(resp.content)
|
|
assert result['err'] == 0
|
|
assert len(result['data']) == 20
|
|
|
|
|
|
def test_missing_columns(client, setup):
|
|
csvdata, url = setup(data=StringIO(data + 'A;B;C\n'))
|
|
resp = client.get(url)
|
|
result = json_loads(resp.content)
|
|
assert result['err'] == 0
|
|
assert len(result['data']) == 21
|
|
assert result['data'][-1] == {'lname': 'C', 'sex': None, 'id': 'B', 'fname': None, 'fam': 'A'}
|
|
|
|
|
|
def test_unknown_sheet_name(client, setup):
|
|
csvdata, url = setup('field,,another_field,', filename='data2.ods', data=get_file_content('data2.ods'))
|
|
csvdata.sheet_name = 'unknown'
|
|
csvdata.save()
|
|
resp = client.get(url)
|
|
result = json_loads(resp.content)
|
|
assert len(result['data']) == 20
|
|
|
|
|
|
def test_cache_new_shorter_file(client, setup):
|
|
csvdata, url = setup(data=StringIO(data + 'A;B;C\n'))
|
|
resp = client.get(url)
|
|
result = json_loads(resp.content)
|
|
assert result['err'] == 0
|
|
assert len(result['data']) == 21
|
|
csvdata.csv_file = File(StringIO(data), 'data.csv')
|
|
csvdata.save()
|
|
resp = client.get(url)
|
|
result = json_loads(resp.content)
|
|
assert len(result['data']) == 20
|
|
|
|
|
|
def test_query_array(app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
query = Query(slug='query-1_', resource=csvdata, structure='array')
|
|
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 0
|
|
assert len(response.json['data'])
|
|
for row in response.json['data']:
|
|
assert len(row) == 2
|
|
assert isinstance(row[0], int)
|
|
assert isinstance(row[1], six.text_type)
|
|
|
|
|
|
def test_query_q_filter(app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
|
|
data=get_file_content(filetype))
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
query = Query(slug='query-1_', resource=csvdata, structure='array')
|
|
query.projections = '\n'.join(['id:int(id)', 'text:prenom'])
|
|
query.save()
|
|
response = app.get(url + '?q=Sandra')
|
|
assert response.json['err'] == 0
|
|
assert len(response.json['data']) == 2
|
|
response = app.get(url + '?q=sandra')
|
|
assert response.json['err'] == 0
|
|
assert len(response.json['data']) == 2
|
|
|
|
|
|
def test_query_dict(app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
query = Query(slug='query-1_', resource=csvdata, structure='dict')
|
|
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 0
|
|
assert len(response.json['data'])
|
|
for row in response.json['data']:
|
|
assert len(row) == 2
|
|
assert isinstance(row['id'], int)
|
|
assert isinstance(row['prenom'], six.text_type)
|
|
|
|
|
|
def test_query_tuples(app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
query = Query(slug='query-1_', resource=csvdata, structure='tuples')
|
|
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 0
|
|
assert len(response.json['data'])
|
|
for row in response.json['data']:
|
|
assert len(row) == 2
|
|
assert row[0][0] == 'id'
|
|
assert isinstance(row[0][1], int)
|
|
assert row[1][0] == 'prenom'
|
|
assert isinstance(row[1][1], six.text_type)
|
|
|
|
|
|
def test_query_onerow(app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
query = Query(slug='query-1_', resource=csvdata, structure='onerow')
|
|
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
|
|
query.filters = 'int(id) == 525'
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 0
|
|
assert isinstance(response.json['data'], dict)
|
|
assert response.json['data']['prenom'] == 'Shanone'
|
|
|
|
|
|
def test_query_one(app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
query = Query(slug='query-1_', resource=csvdata, structure='one')
|
|
query.projections = 'wtf:prenom'
|
|
query.filters = 'int(id) == 525'
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 0
|
|
assert response.json['data'] == 'Shanone'
|
|
|
|
|
|
def test_query_filter_param(app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
query = Query(slug='query-1_', resource=csvdata, structure='one')
|
|
query.projections = 'wtf:prenom'
|
|
query.filters = 'int(id) == int(query.get("foobar"))'
|
|
query.save()
|
|
response = app.get(url + '?foobar=525')
|
|
assert response.json['err'] == 0
|
|
assert response.json['data'] == 'Shanone'
|
|
|
|
|
|
def test_query_distinct(app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
query = Query(slug='query-1_', resource=csvdata, distinct='sexe')
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 0
|
|
assert isinstance(response.json['data'], list)
|
|
assert len(response.json['data']) == 2
|
|
|
|
|
|
def test_query_keyed_distinct(app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
query = Query(slug='query-1_', resource=csvdata, distinct='nom', structure='keyed-distinct')
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 0
|
|
assert isinstance(response.json['data'], dict)
|
|
assert response.json['data']['MARTIN']['prenom'] == 'Sandra'
|
|
|
|
|
|
def test_query_order(app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
query = Query(slug='query-1_', resource=csvdata, order='prenom.lower()')
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 0
|
|
assert isinstance(response.json['data'], list)
|
|
assert response.json['data'] == sorted(response.json['data'], key=lambda row:
|
|
row['prenom'].lower())
|
|
|
|
|
|
def test_query_error(app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
|
|
response = app.get(url)
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'no such query'
|
|
|
|
query = Query(slug='query-1_', resource=csvdata)
|
|
query.save()
|
|
|
|
query.projections = '1a:prenom'
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'invalid projection name'
|
|
assert response.json['data'] == '1a'
|
|
|
|
query.projections = 'id:prenom.'
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'invalid projection expression'
|
|
assert response.json['data']['name'] == 'id'
|
|
assert response.json['data']['expr'] == 'prenom.'
|
|
|
|
query.projections = 'id:prenom not'
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'invalid projection expression'
|
|
assert response.json['data']['name'] == 'id'
|
|
assert response.json['data']['expr'] == 'prenom not'
|
|
|
|
query.projections = 'id:zob'
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'invalid projection expression'
|
|
assert response.json['data']['name'] == 'id'
|
|
assert response.json['data']['expr'] == 'zob'
|
|
assert 'row' in response.json['data']
|
|
|
|
query.projections = ''
|
|
query.filters = 'prenom.'
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'invalid filters expression'
|
|
assert response.json['data']['expr'] == 'prenom.'
|
|
|
|
query.filters = 'zob'
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'invalid filters expression'
|
|
assert response.json['data']['error'] == "name 'zob' is not defined"
|
|
assert response.json['data']['expr'] == 'zob'
|
|
assert 'row' in response.json['data']
|
|
|
|
query.filters = ''
|
|
query.order = 'prenom.'
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'invalid order expression'
|
|
assert response.json['data']['expr'] == 'prenom.'
|
|
|
|
query.order = 'zob'
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'invalid order expression'
|
|
assert response.json['data']['expr'] == 'zob'
|
|
assert 'row' in response.json['data']
|
|
|
|
query.order = ''
|
|
query.distinct = 'prenom.'
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'invalid distinct expression'
|
|
assert response.json['data']['expr'] == 'prenom.'
|
|
|
|
query.distinct = 'zob'
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == 'invalid distinct expression'
|
|
assert response.json['data']['expr'] == 'zob'
|
|
assert 'row' in response.json['data']
|
|
|
|
|
|
def test_edit_connector_queries(admin_user, app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
url = reverse('view-connector', kwargs={'connector': 'csvdatasource', 'slug': csvdata.slug})
|
|
resp = app.get(url)
|
|
assert 'New Query' not in resp.text
|
|
new_query_url = reverse('csv-new-query', kwargs={'connector_slug': csvdata.slug})
|
|
resp = app.get(new_query_url, status=302)
|
|
assert resp.location.endswith('/login/?next=%s' % new_query_url)
|
|
|
|
app = login(app)
|
|
resp = app.get(url)
|
|
resp = resp.click('New Query')
|
|
resp.form['slug'] = 'foobar'
|
|
resp.form['label'] = 'Lucy alone'
|
|
resp.form['filters'] = 'int(id) =< 525'
|
|
resp = resp.form.submit()
|
|
assert 'Syntax error' in resp.text
|
|
resp.form['filters'] = 'int(id) == 525'
|
|
resp.form['projections'] = 'id'
|
|
resp = resp.form.submit()
|
|
assert 'Syntax error' in resp.text
|
|
resp.form['projections'] = 'id:id\nprenom:prenom'
|
|
resp = resp.form.submit().follow()
|
|
resp = resp.click('foobar', index=1) # 0th is the newly created endpoint
|
|
resp.form['filters'] = 'int(id) == 511'
|
|
resp.form['description'] = 'in the sky without diamonds'
|
|
resp = resp.form.submit().follow()
|
|
assert 'Lucy alone' in resp.text
|
|
assert 'in the sky without diamonds' in resp.text
|
|
resp = resp.click('foobar', index=0) # 0th is the newly created endpoint
|
|
assert len(resp.json['data']) == 1
|
|
assert resp.json['data'][0]['prenom'] == 'Lucie'
|
|
|
|
|
|
def test_download_file(app, setup, filetype, admin_user):
|
|
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
|
|
assert '/login' in app.get('/manage/csvdatasource/test/download/').location
|
|
app = login(app)
|
|
resp = app.get('/manage/csvdatasource/test/download/', status=200)
|
|
if filetype == 'data.csv':
|
|
assert resp.headers['Content-Type'] == 'text/csv; charset=utf-8'
|
|
assert resp.headers['Content-Length'] == str(len(force_bytes(data)))
|
|
elif filetype == 'data.ods':
|
|
assert resp.headers['Content-Type'] == 'application/vnd.oasis.opendocument.spreadsheet'
|
|
elif filetype == 'data.xls':
|
|
assert resp.headers['Content-Type'] == 'application/vnd.ms-excel'
|
|
elif filetype == 'data.xlsx':
|
|
assert resp.headers['Content-Type'] == 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
|
|
|
|
|
|
def test_query_filter_multiline(app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
|
|
data=get_file_content(filetype))
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
query = Query(slug='query-1_', resource=csvdata)
|
|
query.filters = '\n'.join(['int(id) <= 525', 'int(id) >= 511'])
|
|
query.save()
|
|
response = app.get(url)
|
|
assert response.json['err'] == 0
|
|
assert len(response.json['data']) == 2
|
|
|
|
def test_query_builtin_id_filter(app, setup, filetype):
|
|
csvdata, _url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
|
|
data=get_file_content(filetype))
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
|
|
query = Query(slug='query-1_', resource=csvdata)
|
|
query.save()
|
|
|
|
assert len(app.get(url).json['data']) == 20
|
|
assert len(app.get(url + '?id=121').json['data']) == 1
|
|
assert app.get(url + '?id=121').json['data'][0]['prenom'] == 'Eliot'
|
|
assert app.get(url + '?id=525').json['data'][0]['prenom'] == 'Shanone'
|
|
|
|
# check when combined with another filter
|
|
query.filters = 'sexe == "H"'
|
|
query.save()
|
|
assert len(app.get(url).json['data']) == 10
|
|
assert app.get(url + '?id=121').json['data'][0]['prenom'] == 'Eliot'
|
|
assert len(app.get(url + '?id=525').json['data']) == 0
|
|
|
|
def test_delete_connector_query(admin_user, app, setup, filetype):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
url = reverse('view-connector', kwargs={'connector': 'csvdatasource', 'slug': csvdata.slug})
|
|
resp = app.get(url)
|
|
|
|
query = Query(slug='query-1_', resource=csvdata, structure='array')
|
|
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
|
|
query.save()
|
|
|
|
app = login(app)
|
|
resp = app.get(url)
|
|
resp = resp.click('delete query')
|
|
assert 'Are you sure' in resp.text
|
|
resp = resp.form.submit('delete')
|
|
resp = resp.follow()
|
|
assert 'No query are defined.' in resp.text
|
|
|
|
|
|
def test_csv_sniffer(admin_user, app):
|
|
app = login(app)
|
|
resp = app.get('/manage/csvdatasource/add')
|
|
form = resp.form
|
|
form.set('title', 'Title')
|
|
form.set('description', 'Description')
|
|
form.set('csv_file', webtest.Upload('test.csv', force_bytes('\n'), 'application/octet-stream'))
|
|
resp = form.submit()
|
|
assert 'Could not detect CSV dialect' in resp
|
|
|
|
|
|
def test_csv_validation(admin_user, app):
|
|
app = login(app)
|
|
resp = app.get('/manage/csvdatasource/add')
|
|
form = resp.form
|
|
form.set('title', 'Title')
|
|
form.set('description', 'Description')
|
|
form.set('csv_file', webtest.Upload('test.csv', b'a,b,c\n1,2\0,3\n4,5,6', 'application/octet-stream'))
|
|
resp = form.submit()
|
|
assert 'Invalid CSV file: line contains NUL' in resp
|
|
|
|
|
|
def test_change_csv_command(setup):
|
|
csv, url = setup(data=StringIO(data))
|
|
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data-empty.ods'))
|
|
csv.refresh_from_db()
|
|
assert list(csv.get_rows()) == []
|
|
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data.csv'))
|
|
csv.refresh_from_db()
|
|
assert list(csv.get_rows()) != []
|
|
|
|
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data-empty.ods'))
|
|
csv.refresh_from_db()
|
|
assert list(csv.get_rows()) == []
|
|
|
|
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data.ods'), sheet_name='Feuille2')
|
|
csv.refresh_from_db()
|
|
assert list(csv.get_rows()) != []
|
|
|
|
|
|
@pytest.mark.parametrize('payload,expected', [
|
|
({}, 20),
|
|
({'limit': 10}, 10),
|
|
({'limit': 10, 'offset': 0}, 10),
|
|
({'limit': 10, 'offset': 15}, 5),
|
|
({'limit': 10, 'offset': 42}, 0),
|
|
])
|
|
def test_pagination(app, setup, filetype, payload, expected):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
|
|
data=get_file_content(filetype))
|
|
|
|
# data endpoint
|
|
response = app.get(url + '?' + urlencode(payload))
|
|
assert len(response.json['data']) == expected
|
|
|
|
# query endpoint
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
query = Query(slug='query-1_', resource=csvdata, structure='array')
|
|
query.projections = '\n'.join(['id:int(id)', 'text:prenom'])
|
|
query.save()
|
|
response = app.get(url + '?' + urlencode(payload))
|
|
assert len(response.json['data']) == expected
|
|
|
|
|
|
@pytest.mark.parametrize('payload,expected_error', [
|
|
({'limit': 'bla'}, 'invalid limit parameter'),
|
|
({'limit': 0}, 'invalid limit parameter'),
|
|
({'limit': -1}, 'invalid limit parameter'),
|
|
({'limit': 10, 'offset': 'bla'}, 'invalid offset parameter'),
|
|
({'limit': 10, 'offset': -1}, 'invalid offset parameter'),
|
|
])
|
|
def test_pagination_error(app, setup, filetype, payload, expected_error):
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype,
|
|
data=get_file_content(filetype))
|
|
|
|
# data endpoint
|
|
response = app.get(url + '?' + urlencode(payload))
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == expected_error
|
|
|
|
# query endpoint
|
|
url = reverse('generic-endpoint', kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csvdata.slug,
|
|
'endpoint': 'query/query-1_/',
|
|
})
|
|
query = Query(slug='query-1_', resource=csvdata, structure='array')
|
|
query.projections = '\n'.join(['id:int(id)', 'text:prenom'])
|
|
query.save()
|
|
response = app.get(url + '?' + urlencode(payload))
|
|
assert response.json['err'] == 1
|
|
assert response.json['err_desc'] == expected_error
|
|
|
|
def test_csv_dst(app, setup, admin_user):
|
|
csvdata, url = setup(
|
|
'field,,another_field,',
|
|
filename='data-empty.ods',
|
|
data=get_file_content('data-empty.ods'))
|
|
|
|
with mock.patch('os.fstat') as mocked_fstat, override_settings(TIME_ZONE='Europe/Paris'):
|
|
class MockStatResult:
|
|
st_ctime = time.mktime((2019, 10, 27, 2, 20, 50, 0, 0, 0))
|
|
mocked_fstat.return_value = MockStatResult()
|
|
|
|
# visit endpoint
|
|
app.get(url)
|
|
|
|
# visit manager page
|
|
app = login(app)
|
|
response = app.get(csvdata.get_absolute_url())
|
|
assert 'Oct. 27, 2019, 2:20 a.m.' in response
|