# passerelle - uniform access to multiple data sources and services # Copyright (C) 2021 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import datetime import os from io import StringIO from posix import stat_result from stat import ST_MTIME import pytest import webtest from django.core.exceptions import ValidationError from django.core.files import File from django.core.management import call_command from django.urls import reverse from django.utils.encoding import force_str from django.utils.timezone import now import tests.utils from passerelle.apps.sector.models import Sectorization, SectorResource from passerelle.base.models import AccessRight from tests.test_manager import login CSV = """street_id,street_name,parity,min_housenumber,max_housenumber,sector_id,sector_name 75114_1912,rue du Château,P,,, gs-moulin, Groupe Scolaire Moulin 75114_1912,rue du Château,I,0,999999,gs-zola,Groupe Scolaire Zola 75114_1913,rue des Moulins ,N,0,999999,ecole-hugo,École Hugo 75114_1914,rue du Vent,,,10,ecole-hugo, École Hugo 75114_1914,rue du Vent,,11,, ecole-hugo2, École Hugo 2 75114_1915, ,,,,ecole-hugo2 , École Hugo 2 """ CSV_BOM = force_str(force_str(CSV, 'utf-8').encode('utf-8-sig')) CSV_NO_FIRST_LINE = """75114_1912,rue du Château,P,,, gs-moulin, Groupe Scolaire Moulin 75114_1912,rue du Château,I,0,999999,gs-zola,Groupe Scolaire Zola 75114_1913,,N,0,999999,ecole-hugo,École Hugo 75114_1914,rue du Vent,,,10,ecole-hugo, École Hugo 75114_1914,rue du Vent,,11,, ecole-hugo, École Hugo""" CSV_REORDERED = """sector_id,sector_name,street_id,parity,min_housenumber,max_housenumber,foo,street_name,bar gs-moulin, Groupe Scolaire Moulin, 75114_1912,P,,,aaa,rue du Château,bbb gs-zola,Groupe Scolaire Zola,75114_1912,I,0,999999,xxx,rue du Château,yyy ecole-hugo,École Hugo,75114_1913,N,0,999999,000,rue des Moulins,1 ,,75114_1999,N,0,999999,,avenue 999, """ CSV_MISSING_COLUMN = """street_id,street_name,min_housenumber,max_housenumber,sector_id,sector_name 75114_1912,,,,foo, ,,0,999999,gs-zola,Groupe Scolaire Zola""" CSV_MISSING_SECTOR = """street_id,street_name,parity,min_housenumber,max_housenumber,sector_id,sector_name 75114_1912,rue du Château,P,,, , 75114_1912,rue du Château,I,0,999999,gs-zola,Groupe Scolaire Zola""" CSV_MISSING_STREET = """street_id,street_name,parity,min_housenumber,max_housenumber,sector_id,sector_name 75114_1912,,P,,,foo, ,,I,0,999999,gs-zola,Groupe Scolaire Zola""" pytestmark = pytest.mark.django_db @pytest.fixture def sector(db): return tests.utils.setup_access_rights( SectorResource.objects.create( slug='test', title='title', csv_file=File(StringIO(CSV), 'sectorization.csv'), ) ) def test_sector_creation(sector): assert '%s' % sector == 'title [test]' assert sector.sector_set.count() == 4 rue1913 = Sectorization.objects.get(street_id='75114_1913') assert rue1913.street_name == 'rue des Moulins' assert ( '%s' % rue1913 == '75114_1913 (rue des Moulins), parity:all, min:0, max:999999 → title [test] > École Hugo [ecole-hugo]' ) rue1915 = Sectorization.objects.get(street_id='75114_1915') assert rue1915.street_name == '' assert ( '%s' % rue1915 == '75114_1915, parity:all, min:0, max:999999 → title [test] > École Hugo 2 [ecole-hugo2]' ) hugo = sector.sector_set.get(slug='ecole-hugo') assert Sectorization.objects.filter(sector=hugo).count() == 2 hugo2 = sector.sector_set.get(slug='gs-zola') assert Sectorization.objects.filter(sector=hugo2).count() == 1 sector.clean() sector.save() assert sector.sector_set.count() == 4 # no change # forced reset sector.sector_set.all().delete() assert sector.sector_set.count() == 0 sector.save() assert sector.sector_set.count() == 4 def test_sector_creation_bom(sector): # forced reset sector.sector_set.all().delete() assert sector.sector_set.count() == 0 sector.csv_file = File(StringIO(CSV_BOM), 'sectorization.csv') sector.clean() sector.save() assert sector.sector_set.count() == 4 def test_sector_creation_nofirstline(sector): sector.csv_file = File(StringIO(CSV_NO_FIRST_LINE), 'sectorization.csv') with pytest.raises(ValidationError, match='Invalid CSV file:.*missing column'): sector.clean() assert sector.sector_set.count() == 4 # nothing change from initial creation sector.titles_in_first_line = False sector.save() assert sector.sector_set.count() == 3 def test_sector_reordered(sector): assert sector.sector_set.count() == 4 sector.csv_file = File(StringIO(CSV_REORDERED), 'sectorization.csv') sector.save() assert sector.sector_set.count() == 3 def test_sector_empty_file(sector): sector.csv_file = File(StringIO(''), 'sectorization.csv') with pytest.raises(ValidationError, match='Invalid CSV file:.*failed to read CSV'): sector.clean() with pytest.raises(ValidationError, match='failed to read CSV'): sector.save() assert sector.sector_set.count() == 4 # nothing change def test_sector_missing_sector(sector): sector.csv_file = File(StringIO(CSV_MISSING_SECTOR), 'sectorization.csv') with pytest.raises(ValidationError, match='Invalid CSV file:.*missing sector_id, line 2'): sector.clean() with pytest.raises(ValidationError, match='missing sector_id, line 2'): sector.save() assert sector.sector_set.count() == 4 # nothing change def test_sector_missing_street(sector): sector.csv_file = File(StringIO(CSV_MISSING_STREET), 'sectorization.csv') with pytest.raises(ValidationError, match='Invalid CSV file:.*missing street_id, line 3'): sector.clean() with pytest.raises(ValidationError, match='missing street_id, line 3'): sector.save() assert sector.sector_set.count() == 4 # nothing change def test_sector_missing_column(sector): sector.csv_file = File(StringIO(CSV_MISSING_COLUMN), 'sectorization.csv') with pytest.raises(ValidationError, match=r'Invalid CSV file:.*missing column.*: parity\.'): sector.clean() with pytest.raises(ValidationError, match=r'missing column.*: parity\.'): sector.save() assert sector.sector_set.count() == 4 # nothing change def test_sector_endpoint_sectors(app, sector): url = reverse( 'generic-endpoint', kwargs={ 'connector': 'sector', 'slug': sector.slug, 'endpoint': 'sectors', }, ) result = app.get(url).json assert result['err'] == 0 assert len(result['data']) == 4 assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data'] result = app.get(url, params={'id': 'ecole-hugo'}).json assert result['err'] == 0 assert len(result['data']) == 1 assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data'] result = app.get(url, params={'q': 'hugo'}).json assert result['err'] == 0 assert len(result['data']) == 2 assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data'] assert {'id': 'ecole-hugo2', 'text': 'École Hugo 2'} in result['data'] result = app.get(url, params={'q': 'foobar'}).json assert result['err'] == 0 assert len(result['data']) == 0 # search a sector by street and house number result = app.get(url, params={'street_id': '75114_1915'}).json assert result['err'] == 0 assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}] result = app.get(url, params={'street_id': '75114_1915', 'house_number': '123'}).json assert result['err'] == 0 assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}] result = app.get(url, params={'street_id': '75114_1912', 'house_number': '12'}).json # even assert result['err'] == 0 assert result['data'] == [{'id': 'gs-moulin', 'text': 'Groupe Scolaire Moulin'}] result = app.get(url, params={'street_id': '75114_1912', 'house_number': '13'}).json # odd assert result['err'] == 0 assert result['data'] == [{'id': 'gs-zola', 'text': 'Groupe Scolaire Zola'}] result = app.get(url, params={'street_id': '75114_1914', 'house_number': '5'}).json # 5 <= 10 assert result['err'] == 0 assert result['data'] == [{'id': 'ecole-hugo', 'text': 'École Hugo'}] result = app.get(url, params={'street_id': '75114_1914', 'house_number': '20'}).json # 20 >= 11 assert result['err'] == 0 assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}] # bad searches result = app.get(url, params={'street_id': '75114_1915', 'house_number': 'abc'}, status=400).json assert result['err'] == 1 assert result['err_desc'] == 'invalid value for parameter "house_number"' # not an integer result = app.get(url, params={'house_number': '123'}, status=400).json assert result['err'] == 1 assert result['err_desc'] == 'house_number requires a street_id' # access right is needed AccessRight.objects.all().delete() result = app.get(url, status=403).json assert result['err'] == 1 assert 'PermissionDenied' in result['err_class'] assert result['data'] is None def test_sector_endpoint_export(app, sector): url = reverse( 'generic-endpoint', kwargs={ 'connector': 'sector', 'slug': sector.slug, 'endpoint': 'export', }, ) resp = app.get(url) assert resp.headers['content-type'] == 'text/csv' assert resp.text.startswith( '"street_id","street_name","parity","min_housenumber","max_housenumber","sector_id","sector_name"' ) assert len(resp.text.splitlines()) == 7 sector.titles_in_first_line = False sector.save(import_csv=False) resp = app.get(url) assert resp.headers['content-type'] == 'text/csv' assert 'street_id' not in resp.text assert len(resp.text.splitlines()) == 6 sector.titles_in_first_line = True sector.save() resp = app.get(url) assert resp.headers['content-type'] == 'text/csv' # "street_id","parity","min_housenumber","max_housenumber","sector_id","sector_name" # "75114_1913","all","","","ecole-hugo","École Hugo" # "75114_1914","all","","10","","" # "75114_1915","all","","","ecole-hugo2","École Hugo 2" # "75114_1914","all","11","","","" # "75114_1912","odd","","","gs-zola","Groupe Scolaire Zola" # "75114_1912","even","","","gs-moulin","Groupe Scolaire Moulin" assert len(resp.text.splitlines()) == 7 assert resp.text.count('"all"') == 4 assert resp.text.count('"odd"') == 1 assert resp.text.count('"even"') == 1 assert resp.text.count('"ecole-hugo"') == 1 assert resp.text.count('"0"') == 0 assert resp.text.count('"999999"') == 0 # import -> export again initial_export = resp.text sector.csv_file = File(StringIO(initial_export), 'data.csv') sector.save() resp = app.get(url) assert resp.text == initial_export # modify export format resp = app.get( url, params={'odd': 'IMPAIRS', 'even': 'PAIRS', 'mix': 'TOUS', 'repeat': 'true', 'limits': 'true'} ) assert len(resp.text.splitlines()) == 7 assert resp.text.count('"TOUS"') == 4 assert resp.text.count('"IMPAIRS"') == 1 assert resp.text.count('"PAIRS"') == 1 assert resp.text.count('"ecole-hugo"') == 2 # repeat assert resp.text.count('"0"') == 5 # limits assert resp.text.count('"999999"') == 5 # access right is needed AccessRight.objects.all().delete() result = app.get(url, status=403).json assert result['err'] == 1 assert 'PermissionDenied' in result['err_class'] assert result['data'] is None def test_sector_endpoint_update(app, sector): url = reverse( 'generic-endpoint', kwargs={ 'connector': 'sector', 'slug': sector.slug, 'endpoint': 'update', }, ) assert sector.sector_set.count() == 4 result = app.put(url, params=CSV_REORDERED, headers={'Content-Type': 'text/csv'}).json assert sector.sector_set.count() == 3 assert result['err'] == 0 assert len(result['data']) == 3 assert result['updated'] == 'sector/test/api-uploaded-file.csv' result = app.put(url, params=CSV_MISSING_COLUMN, headers={'Content-Type': 'text/csv'}, status=400).json assert result['err'] == 1 assert "missing column" in result['err_desc'] result = app.put(url, params=CSV_REORDERED, headers={}, status=400).json assert result['err'] == 1 assert "can't guess filename extension" in result['err_desc'] # access right is needed AccessRight.objects.all().delete() result = app.put(url, params=CSV_REORDERED, headers={'Content-Type': 'text/csv'}, status=403).json assert result['err'] == 1 assert 'PermissionDenied' in result['err_class'] @pytest.mark.parametrize('remove_files', [False, True]) def test_daily_clean(settings, remove_files, sector): settings.SECTOR_REMOVE_ON_CLEAN = remove_files sectordata_dir = os.path.dirname(sector.csv_file.path) other_dir = os.path.join(settings.MEDIA_ROOT, 'foo', sector.slug) os.makedirs(other_dir) # create additional file in sector dir with open(os.path.join(sectordata_dir, 'csv-file.csv'), 'w'): pass os.makedirs(os.path.join(sectordata_dir, 'not-a-file')) # create additional file in other dir with open(os.path.join(other_dir, 'csv-file.csv'), 'w'): pass call_command('cron', 'daily') # not changed assert os.listdir(other_dir) == ['csv-file.csv'] # too soon to be removed dir_list = os.listdir(sectordata_dir) dir_list.sort() assert dir_list == ['csv-file.csv', 'not-a-file', 'sectorization.csv'] orig_os_stat = os.stat def _fake_stat(arg, delta): faked = list(orig_os_stat(arg)) faked[ST_MTIME] = (now() + delta).timestamp() return stat_result(faked) try: # 1 week ago but one minute too soon os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7, minutes=1)) call_command('cron', 'daily') # not changed assert os.listdir(other_dir) == ['csv-file.csv'] # still too soon to be removed dir_list = os.listdir(sectordata_dir) dir_list.sort() assert dir_list == ['csv-file.csv', 'not-a-file', 'sectorization.csv'] # 1 week ago os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7)) call_command('cron', 'daily') # not changed assert os.listdir(other_dir) == ['csv-file.csv'] # removed or moved dir_list = os.listdir(sectordata_dir) dir_list.sort() if remove_files: assert dir_list == ['not-a-file', 'sectorization.csv'] else: assert dir_list == ['not-a-file', 'sectorization.csv', 'unused-files'] assert os.listdir(os.path.join(sectordata_dir, 'unused-files')) == ['csv-file.csv'] # wrong storage directory, do nothing with open(os.path.join(other_dir, 'bar.csv'), 'w'): pass sector.csv_file.name = 'foo/%s/csv-file.csv' % sector.slug sector.save() assert sorted(os.listdir(other_dir)) == ['bar.csv', 'csv-file.csv'] call_command('cron', 'daily') assert sorted(os.listdir(other_dir)) == ['bar.csv', 'csv-file.csv'] # unknown file sector.csv_file.name = 'sector/%s/bar.csv' % sector.slug sector.save() call_command('cron', 'daily') finally: os.stat = orig_os_stat def test_sector_manage_create(app, admin_user): app = login(app) response = app.get(reverse('create-connector', kwargs={'connector': 'sector'})) response.form.set('title', 'test title') response.form.set('slug', 'test-slug') response.form.set('description', 'test description') response.form.set('csv_file', webtest.Upload('test.csv', CSV.encode('utf-8'), 'application/octet-stream')) response = response.form.submit() assert response.location response = response.follow() assert 'test title' in response assert 'test description' in response assert SectorResource.objects.count() == 1 resource = SectorResource.objects.get() assert resource.title == 'test title' assert resource.slug == 'test-slug' assert resource.description == 'test description' assert resource.csv_file.read() == CSV.encode('utf-8') assert resource.sector_set.count() == 4