import json import os import sys import tempfile from io import BytesIO, StringIO import pytest from django.contrib.contenttypes.models import ContentType from django.core.files import File from django.core.management import call_command from django.test import Client from django.urls import reverse from django.utils import timezone from django.utils.encoding import force_bytes, force_str from passerelle.apps.base_adresse.models import BaseAdresse from passerelle.apps.bdp.models import Bdp from passerelle.apps.csvdatasource.models import CsvDataSource, Query from passerelle.apps.ovh.models import OVHSMSGateway from passerelle.apps.sector.models import SectorResource from passerelle.base.models import AccessRight, ApiUser from passerelle.utils import ImportSiteError, export_site, import_site data = """121;69981;DELANOUE;Eliot;H 525;6;DANIEL WILLIAMS;Shanone;F 253;67742;MARTIN;Sandra;F 511;38142;MARCHETTI;Lucie;F 235;22;MARTIN;Sandra;F 620;52156;ARNAUD;Mathis;H 902;36;BRIGAND;Coline;F 2179;48548;THEBAULT;Salima;F 3420;46;WILSON-LUZAYADIO;Anaëlle;F 1421;55486;WONE;Fadouma;F 2841;51;FIDJI;Zakia;F 2431;59;BELICARD;Sacha;H 4273;60;GOUBERT;Adrien;H 4049;64;MOVSESSIAN;Dimitri;H 4605;67;ABDOU BACAR;Kyle;H 4135;22231;SAVERIAS;Marius;H 4809;75;COROLLER;Maelys;F 5427;117;KANTE;Aliou;H 116642;118;ZAHMOUM;Yaniss;H 216352;38;Dupont;Benoît;H """ data_bom = force_str(data, 'utf-8').encode('utf-8-sig') SECTOR_CSV = """street_id,street_name,parity,min_housenumber,max_housenumber,sector_id,sector_name 75114_1912,rue du Château,P,,, gs-moulin, Groupe Scolaire Moulin""" pytestmark = pytest.mark.django_db TEST_BASE_DIR = os.path.join(os.path.dirname(__file__), 'data', 'csvdatasource') def get_file_content(filename): return open(os.path.join(TEST_BASE_DIR, filename), 'rb').read() @pytest.fixture def setup(): def maker( columns_keynames='fam,id,lname,fname,sex', filename='data.csv', sheet_name='Feuille2', data=b'' ): api = ApiUser.objects.create(username='all', keytype='', key='') csv = CsvDataSource.objects.create( csv_file=File(BytesIO(data), filename), sheet_name=sheet_name, columns_keynames=columns_keynames, slug='test', title='a title', description='a description', ) obj_type = ContentType.objects.get_for_model(csv) AccessRight.objects.create( codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=csv.pk ) url = reverse( 'generic-endpoint', kwargs={ 'connector': 'csvdatasource', 'slug': csv.slug, 'endpoint': 'data', }, ) return csv, url return maker def parse_response(response): return json.loads(response.content)['data'] @pytest.fixture def client(): return Client() @pytest.fixture(params=['data.csv', 'data.ods', 'data.xls', 'data.xlsx']) def filetype(request): return request.param def get_output_of_command(command, *args, **kwargs): old_stdout = sys.stdout output = sys.stdout = StringIO() call_command(command, *args, **kwargs) sys.stdout = old_stdout return output.getvalue() def clear(): Query.objects.all().delete() CsvDataSource.objects.all().delete() ApiUser.objects.all().delete() BaseAdresse.objects.all().delete() SectorResource.objects.all().delete() def test_export_csvdatasource(app, setup, filetype): csvdata, _ = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype)) query = Query(slug='query-1_', resource=csvdata, structure='array') query.projections = '\n'.join(['id:int(id)', 'prenom:prenom']) query.save() first = export_site() clear() import_site(first, import_users=True) second = export_site() # we don't care about file names first['resources'][0]['csv_file']['name'] = 'whocares' second['resources'][0]['csv_file']['name'] = 'whocares' assert first == second output = get_output_of_command('export_site') third = json.loads(output) third['resources'][0]['csv_file']['name'] = 'whocares' assert first == third clear() with tempfile.NamedTemporaryFile() as f: f.write(force_bytes(output)) f.flush() call_command('import_site', f.name, import_users=True) fourth = export_site() fourth['resources'][0]['csv_file']['name'] = 'whocares' assert first == fourth Query.objects.all().delete() with tempfile.NamedTemporaryFile() as f: f.write(force_bytes(output)) f.flush() call_command('import_site', f.name, import_users=True) assert Query.objects.count() == 0 with tempfile.NamedTemporaryFile() as f: f.write(force_bytes(output)) f.flush() call_command('import_site', f.name, overwrite=True, import_users=True) assert Query.objects.count() == 1 query = Query(slug='query-2_', resource=CsvDataSource.objects.get(), structure='array') query.projections = '\n'.join(['id:int(id)', 'prenom:prenom']) query.save() assert Query.objects.count() == 2 with tempfile.NamedTemporaryFile() as f: f.write(force_bytes(output)) f.flush() call_command('import_site', f.name, clean=True, overwrite=True, import_users=True) assert Query.objects.count() == 1 assert Query.objects.filter(slug='query-1_').exists() def test_import_export_empty_filefield(app, setup, filetype): Bdp.objects.create(service_url='https://bdp.example.com/') first = export_site() clear() import_site(first) second = export_site() assert first == second assert first['resources'][0]['keystore'] is None assert second['resources'][0]['keystore'] is None def test_export_to_file(app, setup, filetype): bdp = Bdp.objects.create(service_url='https://bdp.example.com/') with tempfile.NamedTemporaryFile() as f: call_command('export_site', output=f.name) assert Bdp.objects.count() == 1 bdp.delete() assert Bdp.objects.count() == 0 with open(f.name) as fd: import_site(json.load(fd), overwrite=True) assert Bdp.objects.count() == 1 def test_export_log_level(app, setup): bdp = Bdp.objects.create(service_url='https://bdp.example.com/') bdp.set_log_level('DEBUG') first = export_site() Bdp.objects.all().delete() import_site(first) second = export_site() assert first == second assert Bdp.objects.all().first().log_level == 'DEBUG' def test_export_access_rights(app, setup): bdp = Bdp.objects.create(service_url='https://bdp.example.com/') obj_type = ContentType.objects.get_for_model(bdp) api = ApiUser.objects.create(username='all', keytype='', key='') AccessRight.objects.create(codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=bdp.pk) first = export_site() Bdp.objects.all().delete() AccessRight.objects.all().delete() ApiUser.objects.all().delete() import_site(first, import_users=True) second = export_site() assert first == second # partial import, with missing ApiUser del first['apiusers'] Bdp.objects.all().delete() AccessRight.objects.all().delete() ApiUser.objects.all().delete() with pytest.raises(ImportSiteError) as excinfo: import_site(first, import_users=True) assert '%s' % excinfo.value == 'Api user "all" not found' def test_export_by_slug(): Bdp.objects.create(service_url='https://bdp.example.com/', slug='bdp') Bdp.objects.create(service_url='https://bdp2.example.com/', slug='bdp2') Bdp.objects.create(service_url='https://bdp3.example.com/', slug='bdp3') output = get_output_of_command('export_site', '--slugs', 'bdp', 'bdp3') export = json.loads(output) assert len(export['resources']) == 2 assert 'bdp2' not in [res['slug'] for res in export['resources']] def test_export_base_adresse(): ba = BaseAdresse.objects.create( slug='ba', service_url='https://api-adresse.data.gouv.fr/', api_geo_url='https://geo.api.gouv.fr/', zipcode='75013', latitude=0.2, longitude=1.4, ) ba_export = ba.export_json() new_ba = BaseAdresse.import_json(ba_export, overwrite=True) assert ba == new_ba # test sectors relationships assert new_ba.sectors.count() == 0 assert ba_export['sectors'] == [] sector = SectorResource.objects.create( slug='sector1', title='sector 1', csv_file=File(StringIO(SECTOR_CSV), 'sectorization.csv'), ) ba.sectors.add(sector) ba_export = ba.export_json() assert ba_export['sectors'] == ['sector1'] new_ba = BaseAdresse.import_json(ba_export, overwrite=True) assert new_ba.sectors.count() == 1 assert new_ba.sectors.get() == sector new_ba = BaseAdresse.import_json(ba_export, overwrite=True) assert new_ba.sectors.count() == 1 assert new_ba.sectors.get() == sector # export all site: handle dependencies export = export_site() clear() assert SectorResource.objects.count() == BaseAdresse.objects.count() == 0 ba = [d for d in export['resources'] if d['slug'] == 'ba'][0] sector1 = [d for d in export['resources'] if d['slug'] == 'sector1'][0] export['resources'] = [ba, sector1] # import must be clever: create sector1 before ba import_site(export, import_users=True) ba = BaseAdresse.objects.get() assert ba.sectors.get() == SectorResource.objects.get() def test_export_ovh(): ovh = OVHSMSGateway.objects.create( slug='test-ovh', title='Test OVH', account='sms-test42', application_key='RHrTdU2oTsrVC0pu', application_secret='CLjtS69tTcPgCKxedeoZlgMSoQGSiXMa', consumer_key='iF0zi0MJrbjNcI3hvuvwkhNk8skrigxz', credit_threshold_alert=100, credit_left=102, credit_alert_timestamp=timezone.now(), alert_emails=['test@entrouvert.org'], ) ovh_export = ovh.export_json() ovh.delete() new_ovh = OVHSMSGateway.import_json(ovh_export) assert new_ovh.alert_emails == ovh.alert_emails assert new_ovh.credit_alert_timestamp is None assert new_ovh.credit_left == 0 assert len(export_site(slugs=['test-ovh'])['resources']) == 1