csv_datasource: PEP8ness, code style (#30458)

This commit is contained in:
Benjamin Dauvergne 2019-02-08 08:14:29 +01:00
parent 204c214026
commit 5e23162573
2 changed files with 129 additions and 53 deletions

View File

@ -17,9 +17,11 @@
import os
import re
import csv
import itertools
import unicodedata
from collections import OrderedDict
import six
from pyexcel_ods import get_data as get_data_ods
from pyexcel_xls import get_data as get_data_xls
@ -29,7 +31,6 @@ from django.utils.encoding import smart_text
from django.utils.timezone import datetime, make_aware
from django.db import models, transaction
from django.core.exceptions import ValidationError
from django.shortcuts import get_object_or_404
from django.utils.translation import ugettext_lazy as _
from passerelle.base.models import BaseResource
@ -52,6 +53,7 @@ def get_code(expr):
code_cache[expr] = compile(expr, '<inline>', 'eval')
return code_cache[expr]
def normalize(value):
return unicodedata.normalize('NFKD', value).encode('ascii', 'ignore')
@ -61,25 +63,34 @@ class Query(models.Model):
slug = models.SlugField(_('Name (slug)'))
label = models.CharField(_('Label'), max_length=100)
description = models.TextField(_('Description'), blank=True)
filters = models.TextField(_('Filters'), blank=True,
help_text=_('List of filter clauses (Python expression)'))
order = models.TextField(_('Order'), blank=True,
help_text=_('Ordering columns'))
distinct = models.TextField(_('Distinct'), blank=True,
help_text=_('Distinct columns'))
projections = models.TextField(_('Projections'), blank=True,
help_text=_('List of projections (name:expression)'))
structure = models.CharField(_('Structure'),
max_length=20,
choices=[
('array', _('Array')),
('dict', _('Dictionary')),
('keyed-distinct', _('Keyed Dictionary')),
('tuples', _('Tuples')),
('onerow', _('Single Row')),
('one', _('Single Value'))],
default='dict',
help_text=_('Data structure used for the response'))
filters = models.TextField(
_('Filters'),
blank=True,
help_text=_('List of filter clauses (Python expression)'))
order = models.TextField(
_('Order'),
blank=True,
help_text=_('Ordering columns'))
distinct = models.TextField(
_('Distinct'),
blank=True,
help_text=_('Distinct columns'))
projections = models.TextField(
_('Projections'),
blank=True,
help_text=_('List of projections (name:expression)'))
structure = models.CharField(
_('Structure'),
max_length=20,
choices=[
('array', _('Array')),
('dict', _('Dictionary')),
('keyed-distinct', _('Keyed Dictionary')),
('tuples', _('Tuples')),
('onerow', _('Single Row')),
('one', _('Single Value'))],
default='dict',
help_text=_('Data structure used for the response'))
class Meta:
ordering = ['slug']
@ -107,7 +118,9 @@ class Query(models.Model):
class CsvDataSource(BaseResource):
csv_file = models.FileField(_('Spreadsheet file'), upload_to='csv',
csv_file = models.FileField(
_('Spreadsheet file'),
upload_to='csv',
help_text=_('Supported file formats: csv, ods, xls, xlsx'))
columns_keynames = models.CharField(
max_length=256,
@ -149,6 +162,7 @@ class CsvDataSource(BaseResource):
return result
def cache_data(self):
# FIXME: why are those dead variables computed ?
titles = [t.strip() for t in self.columns_keynames.split(',')]
indexes = [titles.index(t) for t in titles if t]
caption = [titles[i] for i in indexes]
@ -177,7 +191,7 @@ class CsvDataSource(BaseResource):
options = {}
for k, v in self._dialect_options.items():
if isinstance(v, unicode):
if isinstance(v, six.text_type):
v = v.encode('ascii')
options[k.encode('ascii')] = v
@ -256,7 +270,7 @@ class CsvDataSource(BaseResource):
return [smart_text(t.strip()) for t in self.columns_keynames.split(',')]
@endpoint(perm='can_access', methods=['get'],
name='query', pattern='^(?P<query_name>[\w-]+)/$')
name='query', pattern=r'^(?P<query_name>[\w-]+)/$')
def select(self, request, query_name, **kwargs):
try:
query = Query.objects.get(resource=self.id, slug=query_name)
@ -315,7 +329,7 @@ class CsvDataSource(BaseResource):
if order:
generator = stream_expressions(order, data, kind='order')
new_data = [(tuple(new_row), row) for new_row, row in generator]
new_data.sort(key=lambda (k, row): k)
new_data.sort(key=lambda x: x[0])
data = [row for key, row in new_data]
distinct = query.get_list('distinct')

View File

@ -1,10 +1,28 @@
# -*- coding: utf-8 -*-
#
# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import os
import pytest
from StringIO import StringIO
from django.contrib.auth.models import User
import six
from django.core.files import File
from django.core.urlresolvers import reverse
from django.contrib.contenttypes.models import ContentType
@ -12,6 +30,8 @@ from django.test import Client
from django.core.management import call_command
from passerelle.base.models import ApiUser, AccessRight
from passerelle.apps.csvdatasource.models import CsvDataSource, Query, TableRow
from test_manager import login, admin_user
import webtest
@ -40,45 +60,58 @@ data = """121;69981;DELANOUE;Eliot;H
data_bom = data.decode('utf-8').encode('utf-8-sig')
from passerelle.apps.csvdatasource.models import CsvDataSource, Query, TableRow
pytestmark = pytest.mark.django_db
TEST_BASE_DIR = os.path.join(os.path.dirname(__file__), 'data', 'csvdatasource')
def get_file_content(filename):
return file(os.path.join(TEST_BASE_DIR, filename)).read()
return open(os.path.join(TEST_BASE_DIR, filename)).read()
@pytest.fixture
def setup():
def maker(columns_keynames='fam,id,lname,fname,sex', filename='data.csv', sheet_name='Feuille2', data='', skip_header=False):
api = ApiUser.objects.create(username='all',
keytype='', key='')
csv = CsvDataSource.objects.create(csv_file=File(StringIO(data), filename), sheet_name=sheet_name,
columns_keynames=columns_keynames, slug='test', title='a title', description='a description',
skip_header=skip_header)
def maker(columns_keynames='fam,id,lname,fname,sex', filename='data.csv',
sheet_name='Feuille2', data='', skip_header=False):
api = ApiUser.objects.create(
username='all',
keytype='',
key='')
csv = CsvDataSource.objects.create(
csv_file=File(StringIO(data), filename),
sheet_name=sheet_name,
columns_keynames=columns_keynames,
slug='test',
title='a title',
description='a description',
skip_header=skip_header)
assert TableRow.objects.filter(resource=csv).count() == len(csv.get_rows())
obj_type = ContentType.objects.get_for_model(csv)
AccessRight.objects.create(codename='can_access', apiuser=api,
resource_type=obj_type, resource_pk=csv.pk)
AccessRight.objects.create(
codename='can_access',
apiuser=api,
resource_type=obj_type,
resource_pk=csv.pk)
url = reverse('csvdatasource-data', kwargs={'slug': csv.slug})
return csv, url
return maker
def parse_response(response):
return json.loads(response.content)['data']
@pytest.fixture
def client():
return Client()
@pytest.fixture(params=['data.csv', 'data.ods', 'data.xls', 'data.xlsx'])
def filetype(request):
return request.param
def test_default_column_keynames(setup, filetype):
csvdata = CsvDataSource.objects.create(csv_file=File(StringIO(get_file_content(filetype)), filetype),
sheet_name='Feuille2',
@ -89,6 +122,7 @@ def test_default_column_keynames(setup, filetype):
assert 'id' in csvdata.columns_keynames
assert 'text' in csvdata.columns_keynames
def test_sheet_name_error(setup, app, filetype, admin_user):
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
app = login(app)
@ -103,6 +137,7 @@ def test_sheet_name_error(setup, app, filetype, admin_user):
else:
assert resp.status_code == 302
def test_unfiltered_data(client, setup, filetype):
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
resp = client.get(url)
@ -111,17 +146,22 @@ def test_unfiltered_data(client, setup, filetype):
assert 'field' in item
assert 'another_field' in item
def test_empty_file(client, setup):
csvdata, url = setup('field,,another_field,', filename='data-empty.ods',
data=get_file_content('data-empty.ods'))
csvdata, url = setup(
'field,,another_field,',
filename='data-empty.ods',
data=get_file_content('data-empty.ods'))
resp = client.get(url)
result = parse_response(resp)
assert len(result) == 0
def test_view_manage_page(setup, app, filetype, admin_user):
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
app = login(app)
resp = app.get(csvdata.get_absolute_url())
app.get(csvdata.get_absolute_url())
def test_good_filter_data(client, setup, filetype):
filter_criteria = 'Zakia'
@ -135,6 +175,7 @@ def test_good_filter_data(client, setup, filetype):
assert 'text' in item
assert filter_criteria in item['text']
def test_bad_filter_data(client, setup, filetype):
filter_criteria = 'bad'
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
@ -143,6 +184,7 @@ def test_bad_filter_data(client, setup, filetype):
result = parse_response(resp)
assert len(result) == 0
def test_useless_filter_data(client, setup, filetype):
csvdata, url = setup('id,,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Ali'}
@ -150,6 +192,7 @@ def test_useless_filter_data(client, setup, filetype):
result = parse_response(resp)
assert len(result) == 20
def test_columns_keynames_with_spaces(client, setup, filetype):
csvdata, url = setup('id , , nom,text , ', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Yaniss'}
@ -157,6 +200,7 @@ def test_columns_keynames_with_spaces(client, setup, filetype):
result = parse_response(resp)
assert len(result) == 1
def test_skipped_header_data(client, setup, filetype):
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype), skip_header=True)
filters = {'q': 'Eliot'}
@ -164,6 +208,7 @@ def test_skipped_header_data(client, setup, filetype):
result = parse_response(resp)
assert len(result) == 0
def test_data(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'text': 'Sacha'}
@ -172,6 +217,7 @@ def test_data(client, setup, filetype):
assert result[0] == {'id': '59', 'text': 'Sacha',
'fam': '2431', 'sexe': 'H'}
def test_unicode_filter_data(client, setup, filetype):
filter_criteria = u'Benoît'
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
@ -184,10 +230,11 @@ def test_unicode_filter_data(client, setup, filetype):
assert 'text' in item
assert filter_criteria in item['text']
def test_unicode_case_insensitive_filter_data(client, setup, filetype):
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
filter_criteria = u'anaëlle'
filters = {'text':filter_criteria, 'case-insensitive':''}
filters = {'text': filter_criteria, 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
assert len(result)
@ -196,33 +243,37 @@ def test_unicode_case_insensitive_filter_data(client, setup, filetype):
assert 'text' in item
assert filter_criteria.lower() in item['text'].lower()
def test_data_bom(client, setup):
csvdata, url = setup('fam,id,, text,sexe ', data=data_bom)
filters = {'text':'Eliot'}
filters = {'text': 'Eliot'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0] == {'id': '69981', 'text': 'Eliot', 'fam': '121', 'sexe': 'H'}
def test_multi_filter(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'sexe':'F'}
filters = {'sexe': 'F'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0] == {'id': '6', 'text': 'Shanone',
'fam': '525', 'sexe': 'F'}
assert len(result) == 10
def test_query(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'q':'liot'}
filters = {'q': 'liot'}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
def test_query_insensitive_and_unicode(client, setup, filetype):
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
filters = {'q':'elIo', 'case-insensitive':''}
filters = {'q': 'elIo', 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
@ -233,9 +284,10 @@ def test_query_insensitive_and_unicode(client, setup, filetype):
assert result[0]['text'] == 'Eliot'
assert len(result) == 1
def test_query_insensitive_and_filter(client, setup, filetype):
csvdata, url = setup('fam,id,,text,sexe', filename=filetype, data=get_file_content(filetype))
filters = {'q':'elIo', 'sexe': 'H', 'case-insensitive':''}
filters = {'q': 'elIo', 'sexe': 'H', 'case-insensitive': ''}
resp = client.get(url, filters)
result = parse_response(resp)
assert result[0]['text'] == 'Eliot'
@ -261,6 +313,7 @@ def test_dialect(client, setup):
assert result[0]['id'] == '22'
assert result[0]['lname'] == 'MARTIN'
def test_on_the_fly_dialect_detection(client, setup):
# fake a connector that was not initialized during .save(), because it's
# been migrated and we didn't do dialect detection at save() time.
@ -271,6 +324,7 @@ def test_on_the_fly_dialect_detection(client, setup):
assert result['err'] == 0
assert len(result['data']) == 20
def test_missing_columns(client, setup):
csvdata, url = setup(data=data + 'A;B;C\n')
resp = client.get(url)
@ -279,6 +333,7 @@ def test_missing_columns(client, setup):
assert len(result['data']) == 21
assert result['data'][-1] == {'lname': 'C', 'sex': None, 'id': 'B', 'fname': None, 'fam': 'A'}
def test_unknown_sheet_name(client, setup):
csvdata, url = setup('field,,another_field,', filename='data2.ods', data=get_file_content('data2.ods'))
csvdata.sheet_name = 'unknown'
@ -287,6 +342,7 @@ def test_unknown_sheet_name(client, setup):
result = json.loads(resp.content)
assert len(result['data']) == 20
def test_cache_new_shorter_file(client, setup):
csvdata, url = setup(data=data + 'A;B;C\n')
resp = client.get(url)
@ -299,6 +355,7 @@ def test_cache_new_shorter_file(client, setup):
result = json.loads(resp.content)
assert len(result['data']) == 20
def test_query_array(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
@ -315,7 +372,7 @@ def test_query_array(app, setup, filetype):
for row in response.json['data']:
assert len(row) == 2
assert isinstance(row[0], int)
assert isinstance(row[1], unicode)
assert isinstance(row[1], six.text_type)
def test_query_q_filter(app, setup, filetype):
@ -353,7 +410,7 @@ def test_query_dict(app, setup, filetype):
for row in response.json['data']:
assert len(row) == 2
assert isinstance(row['id'], int)
assert isinstance(row['prenom'], unicode)
assert isinstance(row['prenom'], six.text_type)
def test_query_tuples(app, setup, filetype):
@ -374,7 +431,7 @@ def test_query_tuples(app, setup, filetype):
assert row[0][0] == 'id'
assert isinstance(row[0][1], int)
assert row[1][0] == 'prenom'
assert isinstance(row[1][1], unicode)
assert isinstance(row[1][1], six.text_type)
def test_query_onerow(app, setup, filetype):
@ -440,6 +497,7 @@ def test_query_distinct(app, setup, filetype):
assert isinstance(response.json['data'], list)
assert len(response.json['data']) == 2
def test_query_keyed_distinct(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
@ -454,6 +512,7 @@ def test_query_keyed_distinct(app, setup, filetype):
assert isinstance(response.json['data'], dict)
assert response.json['data']['MARTIN']['prenom'] == 'Sandra'
def test_query_order(app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('generic-endpoint', kwargs={
@ -566,11 +625,12 @@ def test_query_error(app, setup, filetype):
assert response.json['data']['expr'] == 'zob'
assert 'row' in response.json['data']
def test_edit_connector_queries(admin_user, app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('view-connector', kwargs={'connector': 'csvdatasource', 'slug': csvdata.slug})
resp = app.get(url)
assert not 'New Query' in resp.body
assert 'New Query' not in resp.body
new_query_url = reverse('csv-new-query', kwargs={'connector_slug': csvdata.slug})
resp = app.get(new_query_url, status=302)
assert resp.location.endswith('/login/?next=%s' % new_query_url)
@ -589,16 +649,17 @@ def test_edit_connector_queries(admin_user, app, setup, filetype):
assert 'Syntax error' in resp.body
resp.form['projections'] = 'id:id\nprenom:prenom'
resp = resp.form.submit().follow()
resp = resp.click('foobar', index=1) # 0th is the newly created endpoint
resp = resp.click('foobar', index=1) # 0th is the newly created endpoint
resp.form['filters'] = 'int(id) == 511'
resp.form['description'] = 'in the sky without diamonds'
resp = resp.form.submit().follow()
assert 'Lucy alone' in resp.body
assert 'in the sky without diamonds' in resp.body
resp = resp.click('foobar', index=0) # 0th is the newly created endpoint
resp = resp.click('foobar', index=0) # 0th is the newly created endpoint
assert len(resp.json['data']) == 1
assert resp.json['data'][0]['prenom'] == 'Lucie'
def test_download_file(app, setup, filetype, admin_user):
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
assert '/login' in app.get('/manage/csvdatasource/test/download/').location
@ -630,6 +691,7 @@ def test_query_filter_multiline(app, setup, filetype):
assert response.json['err'] == 0
assert len(response.json['data']) == 2
def test_delete_connector_query(admin_user, app, setup, filetype):
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
url = reverse('view-connector', kwargs={'connector': 'csvdatasource', 'slug': csvdata.slug})