440 lines
17 KiB
Python
440 lines
17 KiB
Python
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2021 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
import os
|
|
from io import StringIO
|
|
from posix import stat_result
|
|
from stat import ST_MTIME
|
|
|
|
import pytest
|
|
import webtest
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.files import File
|
|
from django.core.management import call_command
|
|
from django.urls import reverse
|
|
from django.utils.encoding import force_str
|
|
from django.utils.timezone import now
|
|
|
|
import tests.utils
|
|
from passerelle.apps.sector.models import Sectorization, SectorResource
|
|
from passerelle.base.models import AccessRight
|
|
from tests.test_manager import login
|
|
|
|
CSV = """street_id,street_name,parity,min_housenumber,max_housenumber,sector_id,sector_name
|
|
75114_1912,rue du Château,P,,, gs-moulin, Groupe Scolaire Moulin
|
|
75114_1912,rue du Château,I,0,999999,gs-zola,Groupe Scolaire Zola
|
|
75114_1913,rue des Moulins ,N,0,999999,ecole-hugo,École Hugo
|
|
75114_1914,rue du Vent,,,10,ecole-hugo, École Hugo
|
|
|
|
75114_1914,rue du Vent,,11,, ecole-hugo2, École Hugo 2
|
|
75114_1915, ,,,,ecole-hugo2 , École Hugo 2
|
|
"""
|
|
|
|
CSV_BOM = force_str(force_str(CSV, 'utf-8').encode('utf-8-sig'))
|
|
|
|
CSV_NO_FIRST_LINE = """75114_1912,rue du Château,P,,, gs-moulin, Groupe Scolaire Moulin
|
|
75114_1912,rue du Château,I,0,999999,gs-zola,Groupe Scolaire Zola
|
|
75114_1913,,N,0,999999,ecole-hugo,École Hugo
|
|
75114_1914,rue du Vent,,,10,ecole-hugo, École Hugo
|
|
|
|
75114_1914,rue du Vent,,11,, ecole-hugo, École Hugo"""
|
|
|
|
CSV_REORDERED = """sector_id,sector_name,street_id,parity,min_housenumber,max_housenumber,foo,street_name,bar
|
|
gs-moulin, Groupe Scolaire Moulin, 75114_1912,P,,,aaa,rue du Château,bbb
|
|
gs-zola,Groupe Scolaire Zola,75114_1912,I,0,999999,xxx,rue du Château,yyy
|
|
ecole-hugo,École Hugo,75114_1913,N,0,999999,000,rue des Moulins,1
|
|
,,75114_1999,N,0,999999,,avenue 999,
|
|
"""
|
|
|
|
CSV_MISSING_COLUMN = """street_id,street_name,min_housenumber,max_housenumber,sector_id,sector_name
|
|
75114_1912,,,,foo,
|
|
,,0,999999,gs-zola,Groupe Scolaire Zola"""
|
|
|
|
CSV_MISSING_SECTOR = """street_id,street_name,parity,min_housenumber,max_housenumber,sector_id,sector_name
|
|
75114_1912,rue du Château,P,,, ,
|
|
75114_1912,rue du Château,I,0,999999,gs-zola,Groupe Scolaire Zola"""
|
|
|
|
CSV_MISSING_STREET = """street_id,street_name,parity,min_housenumber,max_housenumber,sector_id,sector_name
|
|
75114_1912,,P,,,foo,
|
|
,,I,0,999999,gs-zola,Groupe Scolaire Zola"""
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
|
|
@pytest.fixture
|
|
def sector(db):
|
|
return tests.utils.setup_access_rights(
|
|
SectorResource.objects.create(
|
|
slug='test',
|
|
title='title',
|
|
csv_file=File(StringIO(CSV), 'sectorization.csv'),
|
|
)
|
|
)
|
|
|
|
|
|
def test_sector_creation(sector):
|
|
assert '%s' % sector == 'title [test]'
|
|
assert sector.sector_set.count() == 4
|
|
rue1913 = Sectorization.objects.get(street_id='75114_1913')
|
|
assert rue1913.street_name == 'rue des Moulins'
|
|
assert (
|
|
'%s' % rue1913
|
|
== '75114_1913 (rue des Moulins), parity:all, min:0, max:999999 → title [test] > École Hugo [ecole-hugo]'
|
|
)
|
|
rue1915 = Sectorization.objects.get(street_id='75114_1915')
|
|
assert rue1915.street_name == ''
|
|
assert (
|
|
'%s' % rue1915
|
|
== '75114_1915, parity:all, min:0, max:999999 → title [test] > École Hugo 2 [ecole-hugo2]'
|
|
)
|
|
|
|
hugo = sector.sector_set.get(slug='ecole-hugo')
|
|
assert Sectorization.objects.filter(sector=hugo).count() == 2
|
|
hugo2 = sector.sector_set.get(slug='gs-zola')
|
|
assert Sectorization.objects.filter(sector=hugo2).count() == 1
|
|
sector.clean()
|
|
sector.save()
|
|
assert sector.sector_set.count() == 4 # no change
|
|
# forced reset
|
|
sector.sector_set.all().delete()
|
|
assert sector.sector_set.count() == 0
|
|
sector.save()
|
|
assert sector.sector_set.count() == 4
|
|
|
|
|
|
def test_sector_creation_bom(sector):
|
|
# forced reset
|
|
sector.sector_set.all().delete()
|
|
assert sector.sector_set.count() == 0
|
|
sector.csv_file = File(StringIO(CSV_BOM), 'sectorization.csv')
|
|
sector.clean()
|
|
sector.save()
|
|
assert sector.sector_set.count() == 4
|
|
|
|
|
|
def test_sector_creation_nofirstline(sector):
|
|
sector.csv_file = File(StringIO(CSV_NO_FIRST_LINE), 'sectorization.csv')
|
|
with pytest.raises(ValidationError, match='Invalid CSV file:.*missing column'):
|
|
sector.clean()
|
|
assert sector.sector_set.count() == 4 # nothing change from initial creation
|
|
sector.titles_in_first_line = False
|
|
sector.save()
|
|
assert sector.sector_set.count() == 3
|
|
|
|
|
|
def test_sector_reordered(sector):
|
|
assert sector.sector_set.count() == 4
|
|
sector.csv_file = File(StringIO(CSV_REORDERED), 'sectorization.csv')
|
|
sector.save()
|
|
assert sector.sector_set.count() == 3
|
|
|
|
|
|
def test_sector_empty_file(sector):
|
|
sector.csv_file = File(StringIO(''), 'sectorization.csv')
|
|
with pytest.raises(ValidationError, match='Invalid CSV file:.*failed to read CSV'):
|
|
sector.clean()
|
|
with pytest.raises(ValidationError, match='failed to read CSV'):
|
|
sector.save()
|
|
assert sector.sector_set.count() == 4 # nothing change
|
|
|
|
|
|
def test_sector_missing_sector(sector):
|
|
sector.csv_file = File(StringIO(CSV_MISSING_SECTOR), 'sectorization.csv')
|
|
with pytest.raises(ValidationError, match='Invalid CSV file:.*missing sector_id, line 2'):
|
|
sector.clean()
|
|
with pytest.raises(ValidationError, match='missing sector_id, line 2'):
|
|
sector.save()
|
|
assert sector.sector_set.count() == 4 # nothing change
|
|
|
|
|
|
def test_sector_missing_street(sector):
|
|
sector.csv_file = File(StringIO(CSV_MISSING_STREET), 'sectorization.csv')
|
|
with pytest.raises(ValidationError, match='Invalid CSV file:.*missing street_id, line 3'):
|
|
sector.clean()
|
|
with pytest.raises(ValidationError, match='missing street_id, line 3'):
|
|
sector.save()
|
|
assert sector.sector_set.count() == 4 # nothing change
|
|
|
|
|
|
def test_sector_missing_column(sector):
|
|
sector.csv_file = File(StringIO(CSV_MISSING_COLUMN), 'sectorization.csv')
|
|
with pytest.raises(ValidationError, match=r'Invalid CSV file:.*missing column.*: parity\.'):
|
|
sector.clean()
|
|
with pytest.raises(ValidationError, match=r'missing column.*: parity\.'):
|
|
sector.save()
|
|
assert sector.sector_set.count() == 4 # nothing change
|
|
|
|
|
|
def test_sector_endpoint_sectors(app, sector):
|
|
url = reverse(
|
|
'generic-endpoint',
|
|
kwargs={
|
|
'connector': 'sector',
|
|
'slug': sector.slug,
|
|
'endpoint': 'sectors',
|
|
},
|
|
)
|
|
result = app.get(url).json
|
|
assert result['err'] == 0
|
|
assert len(result['data']) == 4
|
|
assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data']
|
|
|
|
result = app.get(url, params={'id': 'ecole-hugo'}).json
|
|
assert result['err'] == 0
|
|
assert len(result['data']) == 1
|
|
assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data']
|
|
|
|
result = app.get(url, params={'q': 'hugo'}).json
|
|
assert result['err'] == 0
|
|
assert len(result['data']) == 2
|
|
assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data']
|
|
assert {'id': 'ecole-hugo2', 'text': 'École Hugo 2'} in result['data']
|
|
|
|
result = app.get(url, params={'q': 'foobar'}).json
|
|
assert result['err'] == 0
|
|
assert len(result['data']) == 0
|
|
|
|
# search a sector by street and house number
|
|
result = app.get(url, params={'street_id': '75114_1915'}).json
|
|
assert result['err'] == 0
|
|
assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}]
|
|
result = app.get(url, params={'street_id': '75114_1915', 'house_number': '123'}).json
|
|
assert result['err'] == 0
|
|
assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}]
|
|
result = app.get(url, params={'street_id': '75114_1912', 'house_number': '12'}).json # even
|
|
assert result['err'] == 0
|
|
assert result['data'] == [{'id': 'gs-moulin', 'text': 'Groupe Scolaire Moulin'}]
|
|
result = app.get(url, params={'street_id': '75114_1912', 'house_number': '13'}).json # odd
|
|
assert result['err'] == 0
|
|
assert result['data'] == [{'id': 'gs-zola', 'text': 'Groupe Scolaire Zola'}]
|
|
result = app.get(url, params={'street_id': '75114_1914', 'house_number': '5'}).json # 5 <= 10
|
|
assert result['err'] == 0
|
|
assert result['data'] == [{'id': 'ecole-hugo', 'text': 'École Hugo'}]
|
|
result = app.get(url, params={'street_id': '75114_1914', 'house_number': '20'}).json # 20 >= 11
|
|
assert result['err'] == 0
|
|
assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}]
|
|
# bad searches
|
|
result = app.get(url, params={'street_id': '75114_1915', 'house_number': 'abc'}, status=400).json
|
|
assert result['err'] == 1
|
|
assert result['err_desc'] == 'invalid value for parameter "house_number"' # not an integer
|
|
result = app.get(url, params={'house_number': '123'}, status=400).json
|
|
assert result['err'] == 1
|
|
assert result['err_desc'] == 'house_number requires a street_id'
|
|
|
|
# access right is needed
|
|
AccessRight.objects.all().delete()
|
|
result = app.get(url, status=403).json
|
|
assert result['err'] == 1
|
|
assert 'PermissionDenied' in result['err_class']
|
|
assert result['data'] is None
|
|
|
|
|
|
def test_sector_endpoint_export(app, sector):
|
|
url = reverse(
|
|
'generic-endpoint',
|
|
kwargs={
|
|
'connector': 'sector',
|
|
'slug': sector.slug,
|
|
'endpoint': 'export',
|
|
},
|
|
)
|
|
resp = app.get(url)
|
|
assert resp.headers['content-type'] == 'text/csv'
|
|
assert resp.text.startswith(
|
|
'"street_id","street_name","parity","min_housenumber","max_housenumber","sector_id","sector_name"'
|
|
)
|
|
assert len(resp.text.splitlines()) == 7
|
|
sector.titles_in_first_line = False
|
|
sector.save(import_csv=False)
|
|
resp = app.get(url)
|
|
assert resp.headers['content-type'] == 'text/csv'
|
|
assert 'street_id' not in resp.text
|
|
assert len(resp.text.splitlines()) == 6
|
|
sector.titles_in_first_line = True
|
|
sector.save()
|
|
|
|
resp = app.get(url)
|
|
assert resp.headers['content-type'] == 'text/csv'
|
|
# "street_id","parity","min_housenumber","max_housenumber","sector_id","sector_name"
|
|
# "75114_1913","all","","","ecole-hugo","École Hugo"
|
|
# "75114_1914","all","","10","",""
|
|
# "75114_1915","all","","","ecole-hugo2","École Hugo 2"
|
|
# "75114_1914","all","11","","",""
|
|
# "75114_1912","odd","","","gs-zola","Groupe Scolaire Zola"
|
|
# "75114_1912","even","","","gs-moulin","Groupe Scolaire Moulin"
|
|
assert len(resp.text.splitlines()) == 7
|
|
assert resp.text.count('"all"') == 4
|
|
assert resp.text.count('"odd"') == 1
|
|
assert resp.text.count('"even"') == 1
|
|
assert resp.text.count('"ecole-hugo"') == 1
|
|
assert resp.text.count('"0"') == 0
|
|
assert resp.text.count('"999999"') == 0
|
|
|
|
# import -> export again
|
|
initial_export = resp.text
|
|
sector.csv_file = File(StringIO(initial_export), 'data.csv')
|
|
sector.save()
|
|
resp = app.get(url)
|
|
assert resp.text == initial_export
|
|
|
|
# modify export format
|
|
resp = app.get(
|
|
url, params={'odd': 'IMPAIRS', 'even': 'PAIRS', 'mix': 'TOUS', 'repeat': 'true', 'limits': 'true'}
|
|
)
|
|
assert len(resp.text.splitlines()) == 7
|
|
assert resp.text.count('"TOUS"') == 4
|
|
assert resp.text.count('"IMPAIRS"') == 1
|
|
assert resp.text.count('"PAIRS"') == 1
|
|
assert resp.text.count('"ecole-hugo"') == 2 # repeat
|
|
assert resp.text.count('"0"') == 5 # limits
|
|
assert resp.text.count('"999999"') == 5
|
|
|
|
# access right is needed
|
|
AccessRight.objects.all().delete()
|
|
result = app.get(url, status=403).json
|
|
assert result['err'] == 1
|
|
assert 'PermissionDenied' in result['err_class']
|
|
assert result['data'] is None
|
|
|
|
|
|
def test_sector_endpoint_update(app, sector):
|
|
url = reverse(
|
|
'generic-endpoint',
|
|
kwargs={
|
|
'connector': 'sector',
|
|
'slug': sector.slug,
|
|
'endpoint': 'update',
|
|
},
|
|
)
|
|
assert sector.sector_set.count() == 4
|
|
result = app.put(url, params=CSV_REORDERED, headers={'Content-Type': 'text/csv'}).json
|
|
assert sector.sector_set.count() == 3
|
|
assert result['err'] == 0
|
|
assert len(result['data']) == 3
|
|
assert result['updated'] == 'sector/test/api-uploaded-file.csv'
|
|
|
|
result = app.put(url, params=CSV_MISSING_COLUMN, headers={'Content-Type': 'text/csv'}, status=400).json
|
|
assert result['err'] == 1
|
|
assert "missing column" in result['err_desc']
|
|
|
|
result = app.put(url, params=CSV_REORDERED, headers={}, status=400).json
|
|
assert result['err'] == 1
|
|
assert "can't guess filename extension" in result['err_desc']
|
|
|
|
# access right is needed
|
|
AccessRight.objects.all().delete()
|
|
result = app.put(url, params=CSV_REORDERED, headers={'Content-Type': 'text/csv'}, status=403).json
|
|
assert result['err'] == 1
|
|
assert 'PermissionDenied' in result['err_class']
|
|
|
|
|
|
@pytest.mark.parametrize('remove_files', [False, True])
|
|
def test_daily_clean(settings, remove_files, sector):
|
|
settings.SECTOR_REMOVE_ON_CLEAN = remove_files
|
|
|
|
sectordata_dir = os.path.dirname(sector.csv_file.path)
|
|
other_dir = os.path.join(settings.MEDIA_ROOT, 'foo', sector.slug)
|
|
os.makedirs(other_dir)
|
|
|
|
# create additional file in sector dir
|
|
with open(os.path.join(sectordata_dir, 'csv-file.csv'), 'w'):
|
|
pass
|
|
os.makedirs(os.path.join(sectordata_dir, 'not-a-file'))
|
|
# create additional file in other dir
|
|
with open(os.path.join(other_dir, 'csv-file.csv'), 'w'):
|
|
pass
|
|
|
|
call_command('cron', 'daily')
|
|
|
|
# not changed
|
|
assert os.listdir(other_dir) == ['csv-file.csv']
|
|
# too soon to be removed
|
|
dir_list = os.listdir(sectordata_dir)
|
|
dir_list.sort()
|
|
assert dir_list == ['csv-file.csv', 'not-a-file', 'sectorization.csv']
|
|
|
|
orig_os_stat = os.stat
|
|
|
|
def _fake_stat(arg, delta):
|
|
faked = list(orig_os_stat(arg))
|
|
faked[ST_MTIME] = (now() + delta).timestamp()
|
|
return stat_result(faked)
|
|
|
|
try:
|
|
# 1 week ago but one minute too soon
|
|
os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7, minutes=1))
|
|
call_command('cron', 'daily')
|
|
|
|
# not changed
|
|
assert os.listdir(other_dir) == ['csv-file.csv']
|
|
# still too soon to be removed
|
|
dir_list = os.listdir(sectordata_dir)
|
|
dir_list.sort()
|
|
assert dir_list == ['csv-file.csv', 'not-a-file', 'sectorization.csv']
|
|
|
|
# 1 week ago
|
|
os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7))
|
|
call_command('cron', 'daily')
|
|
|
|
# not changed
|
|
assert os.listdir(other_dir) == ['csv-file.csv']
|
|
# removed or moved
|
|
dir_list = os.listdir(sectordata_dir)
|
|
dir_list.sort()
|
|
if remove_files:
|
|
assert dir_list == ['not-a-file', 'sectorization.csv']
|
|
else:
|
|
assert dir_list == ['not-a-file', 'sectorization.csv', 'unused-files']
|
|
assert os.listdir(os.path.join(sectordata_dir, 'unused-files')) == ['csv-file.csv']
|
|
|
|
# wrong storage directory, do nothing
|
|
with open(os.path.join(other_dir, 'bar.csv'), 'w'):
|
|
pass
|
|
sector.csv_file.name = 'foo/%s/csv-file.csv' % sector.slug
|
|
sector.save()
|
|
assert sorted(os.listdir(other_dir)) == ['bar.csv', 'csv-file.csv']
|
|
call_command('cron', 'daily')
|
|
assert sorted(os.listdir(other_dir)) == ['bar.csv', 'csv-file.csv']
|
|
|
|
# unknown file
|
|
sector.csv_file.name = 'sector/%s/bar.csv' % sector.slug
|
|
sector.save()
|
|
call_command('cron', 'daily')
|
|
finally:
|
|
os.stat = orig_os_stat
|
|
|
|
|
|
def test_sector_manage_create(app, admin_user):
|
|
app = login(app)
|
|
response = app.get(reverse('create-connector', kwargs={'connector': 'sector'}))
|
|
response.form.set('title', 'test title')
|
|
response.form.set('slug', 'test-slug')
|
|
response.form.set('description', 'test description')
|
|
response.form.set('csv_file', webtest.Upload('test.csv', CSV.encode('utf-8'), 'application/octet-stream'))
|
|
response = response.form.submit()
|
|
assert response.location
|
|
response = response.follow()
|
|
assert 'test title' in response
|
|
assert 'test description' in response
|
|
assert SectorResource.objects.count() == 1
|
|
resource = SectorResource.objects.get()
|
|
assert resource.title == 'test title'
|
|
assert resource.slug == 'test-slug'
|
|
assert resource.description == 'test description'
|
|
assert resource.csv_file.read() == CSV.encode('utf-8')
|
|
assert resource.sector_set.count() == 4
|