# bijoe - BI dashboard # Copyright (C) 2015 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import copy import hashlib import json from unittest import mock import pytest from django.test import override_settings from django.urls import reverse from django.utils.encoding import force_bytes from utils import login from webtest import Upload from bijoe.visualization.models import Visualization from bijoe.visualization.signature import sign_url @pytest.fixture def visualization(): return Visualization.objects.create( name='test', parameters={ 'cube': 'facts1', 'warehouse': 'schema1', 'measure': 'simple_count', 'representation': 'table', 'loop': '', 'filters': {}, 'drilldown_x': 'date__yearmonth', }, ) def test_simple_user_403(app, john_doe): login(app, john_doe) app.get('/', status=403) app.get('/manage/menu.json', status=403) def test_superuser(app, admin): login(app, admin) resp = app.get('/manage/menu.json', status=200) assert len(resp.json) == 1 assert resp.json[0]['slug'] == 'statistics' resp = app.get('/manage/menu.json?callback=foo', status=200) assert resp.content_type == 'application/javascript' assert 'foo([{' in resp.text app.get('/', status=200) app.get('/visualization/', status=200) resp = app.get('/accounts/logout/', status=(200, 302)) def test_visualizations_json_api(schema1, app, admin, settings): Visualization(name='test', parameters={'warehouse': 'schema1', 'cube': 'test'}).save() Visualization(name='test', parameters={'warehouse': 'schema1', 'cube': 'test'}).save() Visualization(name='test', parameters={'warehouse': 'schema1', 'cube': 'test'}).save() Visualization(name='test', parameters={'warehouse_slug': 'schema1_slug', 'cube': 'test'}).save() # using signature key = 'xxx' orig = 'other.example.net' settings.KNOWN_SERVICES = { 'wcs': { 'default': { 'verif_orig': orig, 'secret': key, } } } url = '%s?orig=%s' % (reverse('visualizations-json'), orig) url = sign_url(url, key) resp = app.get(url, status=200) assert {x['slug'] for x in resp.json} == {'test', 'test-2', 'test-3', 'test-4'} url = '%s?orig=%s' % (reverse('visualizations-json'), orig) url = sign_url(url, 'wrong-key') app.get(url, status=403) url = '%s?orig=%s' % (reverse('visualizations-json'), 'wrong-orig') url = sign_url(url, key) app.get(url, status=403) # without signature app.get(reverse('visualizations-json'), status=403) login(app, admin) resp = app.get(reverse('visualizations-json')) assert {x['slug'] for x in resp.json} == {'test', 'test-2', 'test-3', 'test-4'} def test_visualization_json_api(schema1, app, admin, visualization): login(app, admin) resp = app.get(reverse('visualization-json', kwargs={'pk': visualization.id})) # values from test_schem1/test_yearmonth_drilldown assert resp.json == { 'axis': { 'x_labels': [ '01/2017', '02/2017', '03/2017', '04/2017', '05/2017', '06/2017', '07/2017', '08/2017', ] }, 'data': [10, 1, 1, 1, 1, 1, 1, 1], 'format': '1', 'unit': None, 'measure': 'integer', 'warnings': [ "le champ « pouët » n'est pas bon", 'warning2', ], } def test_visualization_json_api_duration(schema1, app, admin, visualization): visualization.parameters['measure'] = 'duration' visualization.save() login(app, admin) resp = app.get(reverse('visualization-json', kwargs={'pk': visualization.id})) # values from test_schem1/test_yearmonth_drilldown assert resp.json == { 'axis': { 'x_labels': [ '01/2017', '02/2017', '03/2017', '04/2017', '05/2017', '06/2017', '07/2017', '08/2017', ] }, 'data': [ 536968800.0, 539258400.0, 541677600.0, 544352400.0, 546944400.0, 549622800.0, 552214800.0, 554893200.0, ], 'format': '1', 'unit': 'seconds', 'measure': 'duration', 'warnings': [ "le champ « pouët » n'est pas bon", 'warning2', ], } def test_missing_data(schema1, app, admin, visualization): visualization.parameters['cube'] = 'missing_cube' visualization.save() login(app, admin) response = app.get('/') assert response.pyquery('ul li a.disabled').text() == visualization.name def test_visualization_creation_view(schema1, app, admin): login(app, admin) response = app.get('/') response = response.click('schema1') response = response.click('Facts 1') form = response.form form.set('representation', 'table') form.set('measure', 'simple_count') response = form.submit('visualize') response = response.click(href='save') response.form['name'] = 'test' response.form.submit() visu = Visualization.objects.get(name='test') assert visu.parameters['warehouse_slug'] == 'schema1_slug' def test_visualization_warehouse_view_errors(app, admin): login(app, admin) app.get('/visualization/warehouse/not-a-schema/', status=404) def test_visualization_cube_view_errors(schema1, app, admin): login(app, admin) app.get('/visualization/warehouse/not-a-schema/fact1/', status=404) app.get('/visualization/warehouse/schema1/fact1/', status=404) def test_import_visualization(schema1, app, admin, visualization, settings, freezer): freezer.move_to('2020-01-01T00:00:00Z') settings.LANGUAGE_CODE = 'en-us' login(app, admin) resp = app.get('/visualization/%s/' % visualization.id) resp = resp.click('Export as JSON') assert resp.headers['content-type'] == 'application/json' assert resp.headers['content-disposition'] == 'attachment; filename="export_stats_20200101.json"' visualization_export = resp.text # invalid json resp = app.get('/', status=200) resp = resp.click('Import') resp.form['visualizations_json'] = Upload('export.json', b'garbage', 'application/json') resp = resp.form.submit() assert 'File is not in the expected JSON format.' in resp.text # empty json resp = app.get('/', status=200) resp = resp.click('Import') resp.form['visualizations_json'] = Upload('export.json', b'{}', 'application/json') resp = resp.form.submit().follow() assert 'No visualizations were found.' in resp.text # existing visualization resp = app.get('/', status=200) resp = resp.click('Import') resp.form['visualizations_json'] = Upload( 'export.json', visualization_export.encode('utf-8'), 'application/json' ) resp = resp.form.submit().follow() assert 'No visualization created. A visualization has been updated.' in resp.text assert Visualization.objects.count() == 1 # new visualization Visualization.objects.all().delete() resp = app.get('/') resp = resp.click('Import') resp.form['visualizations_json'] = Upload( 'export.json', visualization_export.encode('utf-8'), 'application/json' ) resp = resp.form.submit().follow() assert 'A visualization has been created. No visualization updated.' in resp.text assert Visualization.objects.count() == 1 # multiple visualizations visualizations = json.loads(visualization_export) visualizations['visualizations'].append(copy.copy(visualizations['visualizations'][0])) visualizations['visualizations'].append(copy.copy(visualizations['visualizations'][0])) visualizations['visualizations'][1]['name'] = 'test 2' visualizations['visualizations'][1]['slug'] = 'test-2' visualizations['visualizations'][2]['name'] = 'test 3' visualizations['visualizations'][2]['slug'] = 'test-3' resp = app.get('/', status=200) resp = resp.click('Import') resp.form['visualizations_json'] = Upload( 'export.json', json.dumps(visualizations).encode('utf-8'), 'application/json' ) resp = resp.form.submit().follow() assert '2 visualizations have been created. A visualization has been updated.' in resp.text assert Visualization.objects.count() == 3 # global export/import resp = app.get('/').click('Export') assert resp.headers['content-type'] == 'application/json' assert resp.headers['content-disposition'] == 'attachment; filename="export_stats_20200101.json"' visualizations_export = resp.text Visualization.objects.all().delete() resp = app.get('/') resp = resp.click('Import') resp.form['visualizations_json'] = Upload( 'export.json', visualizations_export.encode('utf-8'), 'application/json' ) resp = resp.form.submit().follow() assert '3 visualizations have been created. No visualization updated.' in resp.text assert Visualization.objects.count() == 3 @override_settings(LANGUAGE_CODE='en-us') def test_save_as(schema1, app, admin, visualization): login(app, admin) resp = app.get('/visualization/%s/' % visualization.id) resp = resp.click('Save as') assert resp.form['name'].value == 'test (Copy)' resp.form['name'] = 'zob' resp = resp.form.submit().follow() assert Visualization.objects.count() == 2 new_visualization = Visualization.objects.get(name='zob') assert new_visualization.parameters == visualization.parameters def test_iframe_view(schema1, app, admin, visualization, settings): # using signature base_url = '/visualization/%s/iframe/' % visualization.id signature = hashlib.sha1(force_bytes(base_url + settings.SECRET_KEY)).hexdigest() resp = app.get('%s?signature=%s' % (base_url, signature), status=200) resp = app.get('%s?signature=%s' % (base_url, 'no-good'), status=302) assert '/accounts/login/?next=' in resp.location # without signature resp = app.get(base_url, status=302) assert '/accounts/login/?next=' in resp.location login(app, admin) resp = app.get(base_url, status=200) def test_ods_view(schema1, app, admin, visualization, settings): login(app, admin) resp = app.get('/visualization/%s/ods/' % visualization.id) assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet' def test_geojson_view(schema1, app, admin, visualization, settings): login(app, admin) resp = app.get('/visualization/%s/geojson/' % visualization.id) assert resp.content_type == 'application/json' assert resp.json['type'] == 'FeatureCollection' assert len(resp.json['features']) == 8 @mock.patch('bijoe.views.get_idps', return_value=[{'METADATA': '...'}]) @mock.patch('bijoe.views.resolve_url', return_value='foo-url') def test_mellon_idp_redirections(mocked_resolv_url, mocked_get_idps, app): resp = app.get('/accounts/login/', status=302) assert resp.location == 'foo-url' resp = app.get('/accounts/login/?next=http://foo/?bar', status=302) assert resp.location == 'foo-url?next=http%3A//foo/%3Fbar' resp = app.get('/accounts/logout/', status=302) assert resp.location == 'foo-url' def test_visualization_json_api_dimension_not_found(schema1, app, admin, visualization): visualization.parameters['drilldown_x'] = 'foo' visualization.parameters['drilldown_y'] = 'bar' visualization.parameters['loop'] = 'zob' visualization.parameters['filters']['coin'] = '1' visualization.save() login(app, admin) resp = app.get(reverse('visualization-json', kwargs={'pk': visualization.id}), status=200) assert resp.json['warnings'] == [ "le champ « pouët » n'est pas bon", 'warning2', ] assert resp.json['errors'] == [ 'La dimension «\xa0foo\xa0» n’existe pas.', 'La dimension «\xa0bar\xa0» n’existe pas.', 'La dimension «\xa0coin\xa0» n’existe pas, le filtre sera ignoré.', 'La dimension «\xa0zob\xa0» n’existe pas.', ] def test_visualization_dimension_not_found(schema1, app, admin, visualization): visualization.parameters['drilldown_x'] = 'foo' visualization.parameters['drilldown_y'] = 'bar' visualization.parameters['loop'] = 'zob' visualization.parameters['filters']['coin'] = '1' visualization.save() login(app, admin) resp = app.get(reverse('visualization', kwargs={'pk': visualization.id}), status=200) assert [elt.text() for elt in resp.pyquery('ul.messages li.warning').items()] == [ "le champ « pouët » n'est pas bon", 'warning2', ] assert [elt.text() for elt in resp.pyquery('ul.messages li.error').items()] == [ 'La dimension «\xa0foo\xa0» n’existe pas.', 'La dimension «\xa0bar\xa0» n’existe pas.', 'La dimension «\xa0coin\xa0» n’existe pas, le filtre sera ignoré.', 'La dimension «\xa0zob\xa0» n’existe pas.', ]