passerelle/tests/test_sector.py

440 lines
17 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import os
from io import StringIO
from posix import stat_result
from stat import ST_MTIME
import pytest
import webtest
from django.core.exceptions import ValidationError
from django.core.files import File
from django.core.management import call_command
from django.urls import reverse
from django.utils.encoding import force_str
from django.utils.timezone import now
import tests.utils
from passerelle.apps.sector.models import Sectorization, SectorResource
from passerelle.base.models import AccessRight
from tests.test_manager import login
CSV = """street_id,street_name,parity,min_housenumber,max_housenumber,sector_id,sector_name
75114_1912,rue du Château,P,,, gs-moulin, Groupe Scolaire Moulin
75114_1912,rue du Château,I,0,999999,gs-zola,Groupe Scolaire Zola
75114_1913,rue des Moulins ,N,0,999999,ecole-hugo,École Hugo
75114_1914,rue du Vent,,,10,ecole-hugo, École Hugo
75114_1914,rue du Vent,,11,, ecole-hugo2, École Hugo 2
75114_1915, ,,,,ecole-hugo2 , École Hugo 2
"""
CSV_BOM = force_str(force_str(CSV, 'utf-8').encode('utf-8-sig'))
CSV_NO_FIRST_LINE = """75114_1912,rue du Château,P,,, gs-moulin, Groupe Scolaire Moulin
75114_1912,rue du Château,I,0,999999,gs-zola,Groupe Scolaire Zola
75114_1913,,N,0,999999,ecole-hugo,École Hugo
75114_1914,rue du Vent,,,10,ecole-hugo, École Hugo
75114_1914,rue du Vent,,11,, ecole-hugo, École Hugo"""
CSV_REORDERED = """sector_id,sector_name,street_id,parity,min_housenumber,max_housenumber,foo,street_name,bar
gs-moulin, Groupe Scolaire Moulin, 75114_1912,P,,,aaa,rue du Château,bbb
gs-zola,Groupe Scolaire Zola,75114_1912,I,0,999999,xxx,rue du Château,yyy
ecole-hugo,École Hugo,75114_1913,N,0,999999,000,rue des Moulins,1
,,75114_1999,N,0,999999,,avenue 999,
"""
CSV_MISSING_COLUMN = """street_id,street_name,min_housenumber,max_housenumber,sector_id,sector_name
75114_1912,,,,foo,
,,0,999999,gs-zola,Groupe Scolaire Zola"""
CSV_MISSING_SECTOR = """street_id,street_name,parity,min_housenumber,max_housenumber,sector_id,sector_name
75114_1912,rue du Château,P,,, ,
75114_1912,rue du Château,I,0,999999,gs-zola,Groupe Scolaire Zola"""
CSV_MISSING_STREET = """street_id,street_name,parity,min_housenumber,max_housenumber,sector_id,sector_name
75114_1912,,P,,,foo,
,,I,0,999999,gs-zola,Groupe Scolaire Zola"""
pytestmark = pytest.mark.django_db
@pytest.fixture
def sector(db):
return tests.utils.setup_access_rights(
SectorResource.objects.create(
slug='test',
title='title',
csv_file=File(StringIO(CSV), 'sectorization.csv'),
)
)
def test_sector_creation(sector):
assert '%s' % sector == 'title [test]'
assert sector.sector_set.count() == 4
rue1913 = Sectorization.objects.get(street_id='75114_1913')
assert rue1913.street_name == 'rue des Moulins'
assert (
'%s' % rue1913
== '75114_1913 (rue des Moulins), parity:all, min:0, max:999999 → title [test] > École Hugo [ecole-hugo]'
)
rue1915 = Sectorization.objects.get(street_id='75114_1915')
assert rue1915.street_name == ''
assert (
'%s' % rue1915
== '75114_1915, parity:all, min:0, max:999999 → title [test] > École Hugo 2 [ecole-hugo2]'
)
hugo = sector.sector_set.get(slug='ecole-hugo')
assert Sectorization.objects.filter(sector=hugo).count() == 2
hugo2 = sector.sector_set.get(slug='gs-zola')
assert Sectorization.objects.filter(sector=hugo2).count() == 1
sector.clean()
sector.save()
assert sector.sector_set.count() == 4 # no change
# forced reset
sector.sector_set.all().delete()
assert sector.sector_set.count() == 0
sector.save()
assert sector.sector_set.count() == 4
def test_sector_creation_bom(sector):
# forced reset
sector.sector_set.all().delete()
assert sector.sector_set.count() == 0
sector.csv_file = File(StringIO(CSV_BOM), 'sectorization.csv')
sector.clean()
sector.save()
assert sector.sector_set.count() == 4
def test_sector_creation_nofirstline(sector):
sector.csv_file = File(StringIO(CSV_NO_FIRST_LINE), 'sectorization.csv')
with pytest.raises(ValidationError, match='Invalid CSV file:.*missing column'):
sector.clean()
assert sector.sector_set.count() == 4 # nothing change from initial creation
sector.titles_in_first_line = False
sector.save()
assert sector.sector_set.count() == 3
def test_sector_reordered(sector):
assert sector.sector_set.count() == 4
sector.csv_file = File(StringIO(CSV_REORDERED), 'sectorization.csv')
sector.save()
assert sector.sector_set.count() == 3
def test_sector_empty_file(sector):
sector.csv_file = File(StringIO(''), 'sectorization.csv')
with pytest.raises(ValidationError, match='Invalid CSV file:.*failed to read CSV'):
sector.clean()
with pytest.raises(ValidationError, match='failed to read CSV'):
sector.save()
assert sector.sector_set.count() == 4 # nothing change
def test_sector_missing_sector(sector):
sector.csv_file = File(StringIO(CSV_MISSING_SECTOR), 'sectorization.csv')
with pytest.raises(ValidationError, match='Invalid CSV file:.*missing sector_id, line 2'):
sector.clean()
with pytest.raises(ValidationError, match='missing sector_id, line 2'):
sector.save()
assert sector.sector_set.count() == 4 # nothing change
def test_sector_missing_street(sector):
sector.csv_file = File(StringIO(CSV_MISSING_STREET), 'sectorization.csv')
with pytest.raises(ValidationError, match='Invalid CSV file:.*missing street_id, line 3'):
sector.clean()
with pytest.raises(ValidationError, match='missing street_id, line 3'):
sector.save()
assert sector.sector_set.count() == 4 # nothing change
def test_sector_missing_column(sector):
sector.csv_file = File(StringIO(CSV_MISSING_COLUMN), 'sectorization.csv')
with pytest.raises(ValidationError, match=r'Invalid CSV file:.*missing column.*: parity\.'):
sector.clean()
with pytest.raises(ValidationError, match=r'missing column.*: parity\.'):
sector.save()
assert sector.sector_set.count() == 4 # nothing change
def test_sector_endpoint_sectors(app, sector):
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'sector',
'slug': sector.slug,
'endpoint': 'sectors',
},
)
result = app.get(url).json
assert result['err'] == 0
assert len(result['data']) == 4
assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data']
result = app.get(url, params={'id': 'ecole-hugo'}).json
assert result['err'] == 0
assert len(result['data']) == 1
assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data']
result = app.get(url, params={'q': 'hugo'}).json
assert result['err'] == 0
assert len(result['data']) == 2
assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data']
assert {'id': 'ecole-hugo2', 'text': 'École Hugo 2'} in result['data']
result = app.get(url, params={'q': 'foobar'}).json
assert result['err'] == 0
assert len(result['data']) == 0
# search a sector by street and house number
result = app.get(url, params={'street_id': '75114_1915'}).json
assert result['err'] == 0
assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}]
result = app.get(url, params={'street_id': '75114_1915', 'house_number': '123'}).json
assert result['err'] == 0
assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}]
result = app.get(url, params={'street_id': '75114_1912', 'house_number': '12'}).json # even
assert result['err'] == 0
assert result['data'] == [{'id': 'gs-moulin', 'text': 'Groupe Scolaire Moulin'}]
result = app.get(url, params={'street_id': '75114_1912', 'house_number': '13'}).json # odd
assert result['err'] == 0
assert result['data'] == [{'id': 'gs-zola', 'text': 'Groupe Scolaire Zola'}]
result = app.get(url, params={'street_id': '75114_1914', 'house_number': '5'}).json # 5 <= 10
assert result['err'] == 0
assert result['data'] == [{'id': 'ecole-hugo', 'text': 'École Hugo'}]
result = app.get(url, params={'street_id': '75114_1914', 'house_number': '20'}).json # 20 >= 11
assert result['err'] == 0
assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}]
# bad searches
result = app.get(url, params={'street_id': '75114_1915', 'house_number': 'abc'}, status=400).json
assert result['err'] == 1
assert result['err_desc'] == 'invalid value for parameter "house_number"' # not an integer
result = app.get(url, params={'house_number': '123'}, status=400).json
assert result['err'] == 1
assert result['err_desc'] == 'house_number requires a street_id'
# access right is needed
AccessRight.objects.all().delete()
result = app.get(url, status=403).json
assert result['err'] == 1
assert 'PermissionDenied' in result['err_class']
assert result['data'] is None
def test_sector_endpoint_export(app, sector):
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'sector',
'slug': sector.slug,
'endpoint': 'export',
},
)
resp = app.get(url)
assert resp.headers['content-type'] == 'text/csv'
assert resp.text.startswith(
'"street_id","street_name","parity","min_housenumber","max_housenumber","sector_id","sector_name"'
)
assert len(resp.text.splitlines()) == 7
sector.titles_in_first_line = False
sector.save(import_csv=False)
resp = app.get(url)
assert resp.headers['content-type'] == 'text/csv'
assert 'street_id' not in resp.text
assert len(resp.text.splitlines()) == 6
sector.titles_in_first_line = True
sector.save()
resp = app.get(url)
assert resp.headers['content-type'] == 'text/csv'
# "street_id","parity","min_housenumber","max_housenumber","sector_id","sector_name"
# "75114_1913","all","","","ecole-hugo","École Hugo"
# "75114_1914","all","","10","",""
# "75114_1915","all","","","ecole-hugo2","École Hugo 2"
# "75114_1914","all","11","","",""
# "75114_1912","odd","","","gs-zola","Groupe Scolaire Zola"
# "75114_1912","even","","","gs-moulin","Groupe Scolaire Moulin"
assert len(resp.text.splitlines()) == 7
assert resp.text.count('"all"') == 4
assert resp.text.count('"odd"') == 1
assert resp.text.count('"even"') == 1
assert resp.text.count('"ecole-hugo"') == 1
assert resp.text.count('"0"') == 0
assert resp.text.count('"999999"') == 0
# import -> export again
initial_export = resp.text
sector.csv_file = File(StringIO(initial_export), 'data.csv')
sector.save()
resp = app.get(url)
assert resp.text == initial_export
# modify export format
resp = app.get(
url, params={'odd': 'IMPAIRS', 'even': 'PAIRS', 'mix': 'TOUS', 'repeat': 'true', 'limits': 'true'}
)
assert len(resp.text.splitlines()) == 7
assert resp.text.count('"TOUS"') == 4
assert resp.text.count('"IMPAIRS"') == 1
assert resp.text.count('"PAIRS"') == 1
assert resp.text.count('"ecole-hugo"') == 2 # repeat
assert resp.text.count('"0"') == 5 # limits
assert resp.text.count('"999999"') == 5
# access right is needed
AccessRight.objects.all().delete()
result = app.get(url, status=403).json
assert result['err'] == 1
assert 'PermissionDenied' in result['err_class']
assert result['data'] is None
def test_sector_endpoint_update(app, sector):
url = reverse(
'generic-endpoint',
kwargs={
'connector': 'sector',
'slug': sector.slug,
'endpoint': 'update',
},
)
assert sector.sector_set.count() == 4
result = app.put(url, params=CSV_REORDERED, headers={'Content-Type': 'text/csv'}).json
assert sector.sector_set.count() == 3
assert result['err'] == 0
assert len(result['data']) == 3
assert result['updated'] == 'sector/test/api-uploaded-file.csv'
result = app.put(url, params=CSV_MISSING_COLUMN, headers={'Content-Type': 'text/csv'}, status=400).json
assert result['err'] == 1
assert 'missing column' in result['err_desc']
result = app.put(url, params=CSV_REORDERED, headers={}, status=400).json
assert result['err'] == 1
assert "can't guess filename extension" in result['err_desc']
# access right is needed
AccessRight.objects.all().delete()
result = app.put(url, params=CSV_REORDERED, headers={'Content-Type': 'text/csv'}, status=403).json
assert result['err'] == 1
assert 'PermissionDenied' in result['err_class']
@pytest.mark.parametrize('remove_files', [False, True])
def test_daily_clean(settings, remove_files, sector):
settings.SECTOR_REMOVE_ON_CLEAN = remove_files
sectordata_dir = os.path.dirname(sector.csv_file.path)
other_dir = os.path.join(settings.MEDIA_ROOT, 'foo', sector.slug)
os.makedirs(other_dir)
# create additional file in sector dir
with open(os.path.join(sectordata_dir, 'csv-file.csv'), 'w'):
pass
os.makedirs(os.path.join(sectordata_dir, 'not-a-file'))
# create additional file in other dir
with open(os.path.join(other_dir, 'csv-file.csv'), 'w'):
pass
call_command('cron', 'daily')
# not changed
assert os.listdir(other_dir) == ['csv-file.csv']
# too soon to be removed
dir_list = os.listdir(sectordata_dir)
dir_list.sort()
assert dir_list == ['csv-file.csv', 'not-a-file', 'sectorization.csv']
orig_os_stat = os.stat
def _fake_stat(arg, delta):
faked = list(orig_os_stat(arg))
faked[ST_MTIME] = (now() + delta).timestamp()
return stat_result(faked)
try:
# 1 week ago but one minute too soon
os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7, minutes=1))
call_command('cron', 'daily')
# not changed
assert os.listdir(other_dir) == ['csv-file.csv']
# still too soon to be removed
dir_list = os.listdir(sectordata_dir)
dir_list.sort()
assert dir_list == ['csv-file.csv', 'not-a-file', 'sectorization.csv']
# 1 week ago
os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7))
call_command('cron', 'daily')
# not changed
assert os.listdir(other_dir) == ['csv-file.csv']
# removed or moved
dir_list = os.listdir(sectordata_dir)
dir_list.sort()
if remove_files:
assert dir_list == ['not-a-file', 'sectorization.csv']
else:
assert dir_list == ['not-a-file', 'sectorization.csv', 'unused-files']
assert os.listdir(os.path.join(sectordata_dir, 'unused-files')) == ['csv-file.csv']
# wrong storage directory, do nothing
with open(os.path.join(other_dir, 'bar.csv'), 'w'):
pass
sector.csv_file.name = 'foo/%s/csv-file.csv' % sector.slug
sector.save()
assert sorted(os.listdir(other_dir)) == ['bar.csv', 'csv-file.csv']
call_command('cron', 'daily')
assert sorted(os.listdir(other_dir)) == ['bar.csv', 'csv-file.csv']
# unknown file
sector.csv_file.name = 'sector/%s/bar.csv' % sector.slug
sector.save()
call_command('cron', 'daily')
finally:
os.stat = orig_os_stat
def test_sector_manage_create(app, admin_user):
app = login(app)
response = app.get(reverse('create-connector', kwargs={'connector': 'sector'}))
response.form.set('title', 'test title')
response.form.set('slug', 'test-slug')
response.form.set('description', 'test description')
response.form.set('csv_file', webtest.Upload('test.csv', CSV.encode('utf-8'), 'application/octet-stream'))
response = response.form.submit()
assert response.location
response = response.follow()
assert 'test title' in response
assert 'test description' in response
assert SectorResource.objects.count() == 1
resource = SectorResource.objects.get()
assert resource.title == 'test title'
assert resource.slug == 'test-slug'
assert resource.description == 'test description'
assert resource.csv_file.read() == CSV.encode('utf-8')
assert resource.sector_set.count() == 4