384 lines
14 KiB
Python
384 lines
14 KiB
Python
# bijoe - BI dashboard
|
||
# Copyright (C) 2015 Entr'ouvert
|
||
#
|
||
# This program is free software: you can redistribute it and/or modify it
|
||
# under the terms of the GNU Affero General Public License as published
|
||
# by the Free Software Foundation, either version 3 of the License, or
|
||
# (at your option) any later version.
|
||
#
|
||
# This program is distributed in the hope that it will be useful,
|
||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
# GNU Affero General Public License for more details.
|
||
#
|
||
# You should have received a copy of the GNU Affero General Public License
|
||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
|
||
import copy
|
||
import hashlib
|
||
import json
|
||
from unittest import mock
|
||
|
||
import pytest
|
||
from django.test import override_settings
|
||
from django.urls import reverse
|
||
from django.utils.encoding import force_bytes
|
||
from utils import login
|
||
from webtest import Upload
|
||
|
||
from bijoe.visualization.models import Visualization
|
||
from bijoe.visualization.signature import sign_url
|
||
|
||
|
||
@pytest.fixture
|
||
def visualization():
|
||
return Visualization.objects.create(
|
||
name='test',
|
||
parameters={
|
||
'cube': 'facts1',
|
||
'warehouse': 'schema1',
|
||
'measure': 'simple_count',
|
||
'representation': 'table',
|
||
'loop': '',
|
||
'filters': {},
|
||
'drilldown_x': 'date__yearmonth',
|
||
},
|
||
)
|
||
|
||
|
||
def test_simple_user_403(app, john_doe):
|
||
login(app, john_doe)
|
||
app.get('/', status=403)
|
||
app.get('/manage/menu.json', status=403)
|
||
|
||
|
||
def test_superuser(app, admin):
|
||
login(app, admin)
|
||
resp = app.get('/manage/menu.json', status=200)
|
||
assert len(resp.json) == 1
|
||
assert resp.json[0]['slug'] == 'statistics'
|
||
resp = app.get('/manage/menu.json?callback=foo', status=200)
|
||
assert resp.content_type == 'application/javascript'
|
||
assert 'foo([{' in resp.text
|
||
app.get('/', status=200)
|
||
app.get('/visualization/', status=200)
|
||
resp = app.get('/accounts/logout/', status=(200, 302))
|
||
|
||
|
||
def test_visualizations_json_api(schema1, app, admin, settings):
|
||
Visualization(name='test', parameters={'warehouse': 'schema1', 'cube': 'test'}).save()
|
||
Visualization(name='test', parameters={'warehouse': 'schema1', 'cube': 'test'}).save()
|
||
Visualization(name='test', parameters={'warehouse': 'schema1', 'cube': 'test'}).save()
|
||
Visualization(name='test', parameters={'warehouse_slug': 'schema1_slug', 'cube': 'test'}).save()
|
||
|
||
# using signature
|
||
key = 'xxx'
|
||
orig = 'other.example.net'
|
||
settings.KNOWN_SERVICES = {
|
||
'wcs': {
|
||
'default': {
|
||
'verif_orig': orig,
|
||
'secret': key,
|
||
}
|
||
}
|
||
}
|
||
url = '%s?orig=%s' % (reverse('visualizations-json'), orig)
|
||
url = sign_url(url, key)
|
||
resp = app.get(url, status=200)
|
||
assert {x['slug'] for x in resp.json} == {'test', 'test-2', 'test-3', 'test-4'}
|
||
|
||
url = '%s?orig=%s' % (reverse('visualizations-json'), orig)
|
||
url = sign_url(url, 'wrong-key')
|
||
app.get(url, status=403)
|
||
|
||
url = '%s?orig=%s' % (reverse('visualizations-json'), 'wrong-orig')
|
||
url = sign_url(url, key)
|
||
app.get(url, status=403)
|
||
|
||
# without signature
|
||
app.get(reverse('visualizations-json'), status=403)
|
||
|
||
login(app, admin)
|
||
resp = app.get(reverse('visualizations-json'))
|
||
assert {x['slug'] for x in resp.json} == {'test', 'test-2', 'test-3', 'test-4'}
|
||
|
||
|
||
def test_visualization_json_api(schema1, app, admin, visualization):
|
||
login(app, admin)
|
||
resp = app.get(reverse('visualization-json', kwargs={'pk': visualization.id}))
|
||
# values from test_schem1/test_yearmonth_drilldown
|
||
assert resp.json == {
|
||
'axis': {
|
||
'x_labels': [
|
||
'01/2017',
|
||
'02/2017',
|
||
'03/2017',
|
||
'04/2017',
|
||
'05/2017',
|
||
'06/2017',
|
||
'07/2017',
|
||
'08/2017',
|
||
]
|
||
},
|
||
'data': [10, 1, 1, 1, 1, 1, 1, 1],
|
||
'format': '1',
|
||
'unit': None,
|
||
'measure': 'integer',
|
||
'warnings': [
|
||
"le champ « pouët » n'est pas bon",
|
||
'warning2',
|
||
],
|
||
}
|
||
|
||
|
||
def test_visualization_json_api_duration(schema1, app, admin, visualization):
|
||
visualization.parameters['measure'] = 'duration'
|
||
visualization.save()
|
||
|
||
login(app, admin)
|
||
resp = app.get(reverse('visualization-json', kwargs={'pk': visualization.id}))
|
||
# values from test_schem1/test_yearmonth_drilldown
|
||
assert resp.json == {
|
||
'axis': {
|
||
'x_labels': [
|
||
'01/2017',
|
||
'02/2017',
|
||
'03/2017',
|
||
'04/2017',
|
||
'05/2017',
|
||
'06/2017',
|
||
'07/2017',
|
||
'08/2017',
|
||
]
|
||
},
|
||
'data': [
|
||
536968800.0,
|
||
539258400.0,
|
||
541677600.0,
|
||
544352400.0,
|
||
546944400.0,
|
||
549622800.0,
|
||
552214800.0,
|
||
554893200.0,
|
||
],
|
||
'format': '1',
|
||
'unit': 'seconds',
|
||
'measure': 'duration',
|
||
'warnings': [
|
||
"le champ « pouët » n'est pas bon",
|
||
'warning2',
|
||
],
|
||
}
|
||
|
||
|
||
def test_missing_data(schema1, app, admin, visualization):
|
||
visualization.parameters['cube'] = 'missing_cube'
|
||
visualization.save()
|
||
login(app, admin)
|
||
|
||
response = app.get('/')
|
||
assert response.pyquery('ul li a.disabled').text() == visualization.name
|
||
|
||
|
||
def test_visualization_creation_view(schema1, app, admin):
|
||
login(app, admin)
|
||
response = app.get('/')
|
||
response = response.click('schema1')
|
||
response = response.click('Facts 1')
|
||
form = response.form
|
||
form.set('representation', 'table')
|
||
form.set('measure', 'simple_count')
|
||
response = form.submit('visualize')
|
||
response = response.click(href='save')
|
||
response.form['name'] = 'test'
|
||
response.form.submit()
|
||
|
||
visu = Visualization.objects.get(name='test')
|
||
assert visu.parameters['warehouse_slug'] == 'schema1_slug'
|
||
|
||
|
||
def test_visualization_warehouse_view_errors(app, admin):
|
||
login(app, admin)
|
||
app.get('/visualization/warehouse/not-a-schema/', status=404)
|
||
|
||
|
||
def test_visualization_cube_view_errors(schema1, app, admin):
|
||
login(app, admin)
|
||
app.get('/visualization/warehouse/not-a-schema/fact1/', status=404)
|
||
app.get('/visualization/warehouse/schema1/fact1/', status=404)
|
||
|
||
|
||
def test_import_visualization(schema1, app, admin, visualization, settings, freezer):
|
||
freezer.move_to('2020-01-01T00:00:00Z')
|
||
settings.LANGUAGE_CODE = 'en-us'
|
||
login(app, admin)
|
||
resp = app.get('/visualization/%s/' % visualization.id)
|
||
resp = resp.click('Export as JSON')
|
||
assert resp.headers['content-type'] == 'application/json'
|
||
assert resp.headers['content-disposition'] == 'attachment; filename="export_stats_20200101.json"'
|
||
visualization_export = resp.text
|
||
|
||
# invalid json
|
||
resp = app.get('/', status=200)
|
||
resp = resp.click('Import')
|
||
resp.form['visualizations_json'] = Upload('export.json', b'garbage', 'application/json')
|
||
resp = resp.form.submit()
|
||
assert 'File is not in the expected JSON format.' in resp.text
|
||
|
||
# empty json
|
||
resp = app.get('/', status=200)
|
||
resp = resp.click('Import')
|
||
resp.form['visualizations_json'] = Upload('export.json', b'{}', 'application/json')
|
||
resp = resp.form.submit().follow()
|
||
assert 'No visualizations were found.' in resp.text
|
||
|
||
# existing visualization
|
||
resp = app.get('/', status=200)
|
||
resp = resp.click('Import')
|
||
resp.form['visualizations_json'] = Upload(
|
||
'export.json', visualization_export.encode('utf-8'), 'application/json'
|
||
)
|
||
resp = resp.form.submit().follow()
|
||
assert 'No visualization created. A visualization has been updated.' in resp.text
|
||
assert Visualization.objects.count() == 1
|
||
|
||
# new visualization
|
||
Visualization.objects.all().delete()
|
||
resp = app.get('/')
|
||
resp = resp.click('Import')
|
||
resp.form['visualizations_json'] = Upload(
|
||
'export.json', visualization_export.encode('utf-8'), 'application/json'
|
||
)
|
||
resp = resp.form.submit().follow()
|
||
assert 'A visualization has been created. No visualization updated.' in resp.text
|
||
assert Visualization.objects.count() == 1
|
||
|
||
# multiple visualizations
|
||
visualizations = json.loads(visualization_export)
|
||
visualizations['visualizations'].append(copy.copy(visualizations['visualizations'][0]))
|
||
visualizations['visualizations'].append(copy.copy(visualizations['visualizations'][0]))
|
||
visualizations['visualizations'][1]['name'] = 'test 2'
|
||
visualizations['visualizations'][1]['slug'] = 'test-2'
|
||
visualizations['visualizations'][2]['name'] = 'test 3'
|
||
visualizations['visualizations'][2]['slug'] = 'test-3'
|
||
|
||
resp = app.get('/', status=200)
|
||
resp = resp.click('Import')
|
||
resp.form['visualizations_json'] = Upload(
|
||
'export.json', json.dumps(visualizations).encode('utf-8'), 'application/json'
|
||
)
|
||
resp = resp.form.submit().follow()
|
||
assert '2 visualizations have been created. A visualization has been updated.' in resp.text
|
||
assert Visualization.objects.count() == 3
|
||
|
||
# global export/import
|
||
resp = app.get('/').click('Export')
|
||
assert resp.headers['content-type'] == 'application/json'
|
||
assert resp.headers['content-disposition'] == 'attachment; filename="export_stats_20200101.json"'
|
||
visualizations_export = resp.text
|
||
Visualization.objects.all().delete()
|
||
|
||
resp = app.get('/')
|
||
resp = resp.click('Import')
|
||
resp.form['visualizations_json'] = Upload(
|
||
'export.json', visualizations_export.encode('utf-8'), 'application/json'
|
||
)
|
||
resp = resp.form.submit().follow()
|
||
assert '3 visualizations have been created. No visualization updated.' in resp.text
|
||
assert Visualization.objects.count() == 3
|
||
|
||
|
||
@override_settings(LANGUAGE_CODE='en-us')
|
||
def test_save_as(schema1, app, admin, visualization):
|
||
login(app, admin)
|
||
resp = app.get('/visualization/%s/' % visualization.id)
|
||
resp = resp.click('Save as')
|
||
assert resp.form['name'].value == 'test (Copy)'
|
||
resp.form['name'] = 'zob'
|
||
resp = resp.form.submit().follow()
|
||
assert Visualization.objects.count() == 2
|
||
new_visualization = Visualization.objects.get(name='zob')
|
||
assert new_visualization.parameters == visualization.parameters
|
||
|
||
|
||
def test_iframe_view(schema1, app, admin, visualization, settings):
|
||
# using signature
|
||
base_url = '/visualization/%s/iframe/' % visualization.id
|
||
signature = hashlib.sha1(force_bytes(base_url + settings.SECRET_KEY)).hexdigest()
|
||
resp = app.get('%s?signature=%s' % (base_url, signature), status=200)
|
||
resp = app.get('%s?signature=%s' % (base_url, 'no-good'), status=302)
|
||
assert '/accounts/login/?next=' in resp.location
|
||
|
||
# without signature
|
||
resp = app.get(base_url, status=302)
|
||
assert '/accounts/login/?next=' in resp.location
|
||
login(app, admin)
|
||
resp = app.get(base_url, status=200)
|
||
|
||
|
||
def test_ods_view(schema1, app, admin, visualization, settings):
|
||
login(app, admin)
|
||
resp = app.get('/visualization/%s/ods/' % visualization.id)
|
||
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet'
|
||
|
||
|
||
def test_geojson_view(schema1, app, admin, visualization, settings):
|
||
login(app, admin)
|
||
resp = app.get('/visualization/%s/geojson/' % visualization.id)
|
||
assert resp.content_type == 'application/json'
|
||
assert resp.json['type'] == 'FeatureCollection'
|
||
assert len(resp.json['features']) == 8
|
||
|
||
|
||
@mock.patch('bijoe.views.get_idps', return_value=[{'METADATA': '...'}])
|
||
@mock.patch('bijoe.views.resolve_url', return_value='foo-url')
|
||
def test_mellon_idp_redirections(mocked_resolv_url, mocked_get_idps, app):
|
||
resp = app.get('/accounts/login/', status=302)
|
||
assert resp.location == 'foo-url'
|
||
resp = app.get('/accounts/login/?next=http://foo/?bar', status=302)
|
||
assert resp.location == 'foo-url?next=http%3A//foo/%3Fbar'
|
||
resp = app.get('/accounts/logout/', status=302)
|
||
assert resp.location == 'foo-url'
|
||
|
||
|
||
def test_visualization_json_api_dimension_not_found(schema1, app, admin, visualization):
|
||
visualization.parameters['drilldown_x'] = 'foo'
|
||
visualization.parameters['drilldown_y'] = 'bar'
|
||
visualization.parameters['loop'] = 'zob'
|
||
visualization.parameters['filters']['coin'] = '1'
|
||
visualization.save()
|
||
|
||
login(app, admin)
|
||
resp = app.get(reverse('visualization-json', kwargs={'pk': visualization.id}), status=200)
|
||
assert resp.json['warnings'] == [
|
||
"le champ « pouët » n'est pas bon",
|
||
'warning2',
|
||
]
|
||
assert resp.json['errors'] == [
|
||
'La dimension «\xa0foo\xa0» n’existe pas.',
|
||
'La dimension «\xa0bar\xa0» n’existe pas.',
|
||
'La dimension «\xa0coin\xa0» n’existe pas, le filtre sera ignoré.',
|
||
'La dimension «\xa0zob\xa0» n’existe pas.',
|
||
]
|
||
|
||
|
||
def test_visualization_dimension_not_found(schema1, app, admin, visualization):
|
||
visualization.parameters['drilldown_x'] = 'foo'
|
||
visualization.parameters['drilldown_y'] = 'bar'
|
||
visualization.parameters['loop'] = 'zob'
|
||
visualization.parameters['filters']['coin'] = '1'
|
||
visualization.save()
|
||
|
||
login(app, admin)
|
||
resp = app.get(reverse('visualization', kwargs={'pk': visualization.id}), status=200)
|
||
assert [elt.text() for elt in resp.pyquery('ul.messages li.warning').items()] == [
|
||
"le champ « pouët » n'est pas bon",
|
||
'warning2',
|
||
]
|
||
assert [elt.text() for elt in resp.pyquery('ul.messages li.error').items()] == [
|
||
'La dimension «\xa0foo\xa0» n’existe pas.',
|
||
'La dimension «\xa0bar\xa0» n’existe pas.',
|
||
'La dimension «\xa0coin\xa0» n’existe pas, le filtre sera ignoré.',
|
||
'La dimension «\xa0zob\xa0» n’existe pas.',
|
||
]
|