272 lines
8.3 KiB
Python
272 lines
8.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
|
|
import pytest
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.core.files import File
|
|
from django.core.management import call_command
|
|
from django.test import Client
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from django.utils.encoding import force_bytes, force_text
|
|
from django.utils.six import BytesIO, StringIO
|
|
|
|
from passerelle.apps.base_adresse.models import BaseAdresse
|
|
from passerelle.apps.ovh.models import OVHSMSGateway
|
|
from passerelle.base.models import AccessRight, ApiUser
|
|
from passerelle.compat import json_loads
|
|
from passerelle.utils import export_site, import_site
|
|
|
|
data = """121;69981;DELANOUE;Eliot;H
|
|
525;6;DANIEL WILLIAMS;Shanone;F
|
|
253;67742;MARTIN;Sandra;F
|
|
511;38142;MARCHETTI;Lucie;F
|
|
235;22;MARTIN;Sandra;F
|
|
620;52156;ARNAUD;Mathis;H
|
|
902;36;BRIGAND;Coline;F
|
|
2179;48548;THEBAULT;Salima;F
|
|
3420;46;WILSON-LUZAYADIO;Anaëlle;F
|
|
1421;55486;WONE;Fadouma;F
|
|
2841;51;FIDJI;Zakia;F
|
|
2431;59;BELICARD;Sacha;H
|
|
4273;60;GOUBERT;Adrien;H
|
|
4049;64;MOVSESSIAN;Dimitri;H
|
|
4605;67;ABDOU BACAR;Kyle;H
|
|
4135;22231;SAVERIAS;Marius;H
|
|
4809;75;COROLLER;Maelys;F
|
|
5427;117;KANTE;Aliou;H
|
|
116642;118;ZAHMOUM;Yaniss;H
|
|
216352;38;Dupont;Benoît;H
|
|
"""
|
|
|
|
data_bom = force_text(data, 'utf-8').encode('utf-8-sig')
|
|
|
|
from passerelle.apps.bdp.models import Bdp
|
|
from passerelle.apps.csvdatasource.models import CsvDataSource, Query
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
TEST_BASE_DIR = os.path.join(os.path.dirname(__file__), 'data', 'csvdatasource')
|
|
|
|
|
|
def get_file_content(filename):
|
|
return open(os.path.join(TEST_BASE_DIR, filename), 'rb').read()
|
|
|
|
|
|
@pytest.fixture
|
|
def setup():
|
|
def maker(
|
|
columns_keynames='fam,id,lname,fname,sex', filename='data.csv', sheet_name='Feuille2', data=b''
|
|
):
|
|
api = ApiUser.objects.create(username='all', keytype='', key='')
|
|
csv = CsvDataSource.objects.create(
|
|
csv_file=File(BytesIO(data), filename),
|
|
sheet_name=sheet_name,
|
|
columns_keynames=columns_keynames,
|
|
slug='test',
|
|
title='a title',
|
|
description='a description',
|
|
)
|
|
obj_type = ContentType.objects.get_for_model(csv)
|
|
AccessRight.objects.create(
|
|
codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=csv.pk
|
|
)
|
|
url = reverse(
|
|
'generic-endpoint',
|
|
kwargs={
|
|
'connector': 'csvdatasource',
|
|
'slug': csv.slug,
|
|
'endpoint': 'data',
|
|
},
|
|
)
|
|
return csv, url
|
|
|
|
return maker
|
|
|
|
|
|
def parse_response(response):
|
|
return json_loads(response.content)['data']
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
return Client()
|
|
|
|
|
|
@pytest.fixture(params=['data.csv', 'data.ods', 'data.xls', 'data.xlsx'])
|
|
def filetype(request):
|
|
return request.param
|
|
|
|
|
|
def get_output_of_command(command, *args, **kwargs):
|
|
old_stdout = sys.stdout
|
|
output = sys.stdout = StringIO()
|
|
call_command(command, *args, **kwargs)
|
|
sys.stdout = old_stdout
|
|
return output.getvalue()
|
|
|
|
|
|
def clear():
|
|
Query.objects.all().delete()
|
|
CsvDataSource.objects.all().delete()
|
|
ApiUser.objects.all().delete()
|
|
|
|
|
|
def test_export_csvdatasource(app, setup, filetype):
|
|
|
|
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
query = Query(slug='query-1_', resource=csvdata, structure='array')
|
|
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
|
|
query.save()
|
|
|
|
first = export_site()
|
|
clear()
|
|
import_site(first, import_users=True)
|
|
second = export_site()
|
|
|
|
# we don't care about file names
|
|
first['resources'][0]['csv_file']['name'] = 'whocares'
|
|
second['resources'][0]['csv_file']['name'] = 'whocares'
|
|
assert first == second
|
|
|
|
output = get_output_of_command('export_site')
|
|
third = json_loads(output)
|
|
third['resources'][0]['csv_file']['name'] = 'whocares'
|
|
assert first == third
|
|
|
|
clear()
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
f.write(force_bytes(output))
|
|
f.flush()
|
|
call_command('import_site', f.name, import_users=True)
|
|
fourth = export_site()
|
|
fourth['resources'][0]['csv_file']['name'] = 'whocares'
|
|
assert first == fourth
|
|
|
|
Query.objects.all().delete()
|
|
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
f.write(force_bytes(output))
|
|
f.flush()
|
|
call_command('import_site', f.name, import_users=True)
|
|
assert Query.objects.count() == 0
|
|
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
f.write(force_bytes(output))
|
|
f.flush()
|
|
call_command('import_site', f.name, overwrite=True, import_users=True)
|
|
assert Query.objects.count() == 1
|
|
|
|
query = Query(slug='query-2_', resource=CsvDataSource.objects.get(), structure='array')
|
|
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom'])
|
|
query.save()
|
|
|
|
assert Query.objects.count() == 2
|
|
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
f.write(force_bytes(output))
|
|
f.flush()
|
|
call_command('import_site', f.name, clean=True, overwrite=True, import_users=True)
|
|
assert Query.objects.count() == 1
|
|
assert Query.objects.filter(slug='query-1_').exists()
|
|
|
|
|
|
def test_import_export_empty_filefield(app, setup, filetype):
|
|
Bdp.objects.create(service_url='https://bdp.example.com/')
|
|
|
|
first = export_site()
|
|
clear()
|
|
import_site(first)
|
|
second = export_site()
|
|
|
|
assert first == second
|
|
assert first['resources'][0]['keystore'] is None
|
|
assert second['resources'][0]['keystore'] is None
|
|
|
|
|
|
def test_export_to_file(app, setup, filetype):
|
|
bdp = Bdp.objects.create(service_url='https://bdp.example.com/')
|
|
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
call_command('export_site', output=f.name)
|
|
assert Bdp.objects.count() == 1
|
|
bdp.delete()
|
|
assert Bdp.objects.count() == 0
|
|
import_site(json.load(open(f.name)), overwrite=True)
|
|
assert Bdp.objects.count() == 1
|
|
|
|
|
|
def test_export_log_level(app, setup):
|
|
bdp = Bdp.objects.create(service_url='https://bdp.example.com/')
|
|
bdp.set_log_level('DEBUG')
|
|
|
|
first = export_site()
|
|
Bdp.objects.all().delete()
|
|
import_site(first)
|
|
second = export_site()
|
|
assert first == second
|
|
assert Bdp.objects.all().first().log_level == 'DEBUG'
|
|
|
|
|
|
def test_export_access_rights(app, setup):
|
|
bdp = Bdp.objects.create(service_url='https://bdp.example.com/')
|
|
obj_type = ContentType.objects.get_for_model(bdp)
|
|
api = ApiUser.objects.create(username='all', keytype='', key='')
|
|
AccessRight.objects.create(codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=bdp.pk)
|
|
first = export_site()
|
|
Bdp.objects.all().delete()
|
|
AccessRight.objects.all().delete()
|
|
ApiUser.objects.all().delete()
|
|
import_site(first, import_users=True)
|
|
second = export_site()
|
|
assert first == second
|
|
|
|
|
|
def test_export_by_slug():
|
|
Bdp.objects.create(service_url='https://bdp.example.com/', slug='bdp')
|
|
Bdp.objects.create(service_url='https://bdp2.example.com/', slug='bdp2')
|
|
Bdp.objects.create(service_url='https://bdp3.example.com/', slug='bdp3')
|
|
output = get_output_of_command('export_site', '--slugs', 'bdp', 'bdp3')
|
|
export = json.loads(output)
|
|
|
|
assert len(export['resources']) == 2
|
|
assert 'bdp2' not in [res['slug'] for res in export['resources']]
|
|
|
|
|
|
def test_export_base_adresse():
|
|
ba = BaseAdresse.objects.create(
|
|
service_url='https://api-adresse.data.gouv.fr/',
|
|
api_geo_url='https://geo.api.gouv.fr/',
|
|
zipcode='75013',
|
|
latitude=0.2,
|
|
longitude=1.4,
|
|
)
|
|
ba_export = ba.export_json()
|
|
new_ba = BaseAdresse.import_json(ba_export, overwrite=True)
|
|
assert ba == new_ba
|
|
|
|
|
|
def test_export_ovh():
|
|
ovh = OVHSMSGateway.objects.create(
|
|
slug='test-ovh',
|
|
title='Test OVH',
|
|
account='sms-test42',
|
|
application_key='RHrTdU2oTsrVC0pu',
|
|
application_secret='CLjtS69tTcPgCKxedeoZlgMSoQGSiXMa',
|
|
consumer_key='iF0zi0MJrbjNcI3hvuvwkhNk8skrigxz',
|
|
credit_threshold_alert=100,
|
|
credit_left=102,
|
|
credit_alert_timestamp=timezone.now(),
|
|
alert_emails=['test@entrouvert.org'],
|
|
)
|
|
|
|
ovh_export = ovh.export_json()
|
|
ovh.delete()
|
|
new_ovh = OVHSMSGateway.import_json(ovh_export)
|
|
assert new_ovh.alert_emails == ovh.alert_emails
|
|
assert new_ovh.credit_alert_timestamp is None
|
|
assert new_ovh.credit_left == 0
|