wcs/tests/test_datasource.py

915 lines
38 KiB
Python

# -*- coding: utf-8 -*-
import codecs
import pytest
import os
import json
import shutil
from django.utils.six import StringIO
from django.utils.six.moves.urllib import parse as urlparse
from quixote import cleanup
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.form import get_request, Form
from wcs import fields, data_sources
from wcs.data_sources import NamedDataSource, register_data_source_function
import mock
from test_widgets import MockHtmlForm, mock_form_submission
from utilities import create_temporary_pub
def setup_module(module):
cleanup()
global pub
pub = create_temporary_pub()
pub.cfg['debug'] = {'logger': True}
pub.write_cfg()
pub.set_config()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write('''
[wscall-secrets]
api.example.com = 1234
''')
def teardown_module(module):
shutil.rmtree(pub.APP_DIR)
@pytest.fixture
def no_request_pub(request):
pub._request = None
@pytest.fixture
def requests_pub(request):
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
pub._set_request(req)
return req
def test_item_field_python_datasource(requests_pub):
req = get_request()
req.environ['REQUEST_METHOD'] = 'POST'
field = fields.ItemField()
field.id = 1
field.data_source = {
'type': 'formula',
'value': '''[('1', 'un'), ('2', 'deux')]'''
}
form = Form()
field.add_to_form(form)
widget = form.get_widget('f1')
assert widget is not None
assert widget.options == [('1', 'un', '1'), ('2', 'deux', '2')]
form = MockHtmlForm(widget)
mock_form_submission(req, widget, {'f1': ['1']})
assert widget.parse() == '1'
form = Form()
field.add_to_view_form(form, value='1')
widget = form.get_widget('f1')
form = MockHtmlForm(widget)
mock_form_submission(req, widget)
assert widget.parse() == '1'
def test_python_datasource():
plain_list = [('1', 'foo'), ('2', 'bar')]
datasource = {'type': 'formula', 'value': repr(plain_list)}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar'})]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
# invalid python expression
datasource = {'type': 'formula', 'value': 'foobar'}
assert data_sources.get_items(datasource) == []
# expression not iterable
datasource = {'type': 'formula', 'value': '2'}
assert data_sources.get_items(datasource) == []
# three-item tuples
plain_list = [('1', 'foo', 'a'), ('2', 'bar', 'b')]
datasource = {'type': 'formula', 'value': repr(plain_list)}
assert data_sources.get_items(datasource) == [
('1', 'foo', 'a', {'id': '1', 'key': 'a', 'text': 'foo'}),
('2', 'bar', 'b', {'id': '2', 'key': 'b', 'text': 'bar'})]
# single-item tuples
plain_list = [('foo', ), ('bar', )]
datasource = {'type': 'formula', 'value': repr(plain_list)}
assert data_sources.get_items(datasource) == [
('foo', 'foo', 'foo', {'id': 'foo', 'text': 'foo'}),
('bar', 'bar', 'bar', {'id': 'bar', 'text': 'bar'})]
# list of strings
plain_list = ['foo', 'bar']
datasource = {'type': 'formula', 'value': repr(plain_list)}
assert data_sources.get_items(datasource) == [
('foo', 'foo', 'foo', {'id': 'foo', 'text': 'foo'}),
('bar', 'bar', 'bar', {'id': 'bar', 'text': 'bar'})]
# list of dicts
plain_list = [{'id': 'foo', 'text': 'Foo'}, {'id': 'bar', 'text': 'Bar', 'disabled': True}]
datasource = {'type': 'formula', 'value': repr(plain_list)}
assert data_sources.get_items(datasource) == [
('foo', 'Foo', 'foo', {'id': 'foo', 'text': 'Foo'})]
assert data_sources.get_items(datasource, include_disabled=True) == [
('foo', 'Foo', 'foo', {'id': 'foo', 'text': 'Foo'}),
('bar', 'Bar', 'bar', {'id': 'bar', 'text': 'Bar', 'disabled': True})]
def test_python_datasource_with_evalutils():
plain_list = [
{'id': 'foo', 'text': 'Foo', 'value': '2017-01-01'},
{'id': 'bar', 'text': 'Bar', 'value': '2015-01-01'}]
datasource = {'type': 'formula', 'value': '[x for x in %s if date(x["value"]) > date("2016-01-01")]' % repr(plain_list)}
assert data_sources.get_items(datasource) == [
('foo', 'Foo', 'foo', {'id': 'foo', 'text': 'Foo', 'value': '2017-01-01'})]
def test_json_datasource(requests_pub, http_requests):
req = get_request()
get_request().datasources_cache = {}
datasource = {'type': 'json', 'value': ''}
assert data_sources.get_items(datasource) == []
# missing file
get_request().datasources_cache = {}
json_file_path = os.path.join(pub.app_dir, 'test.json')
datasource = {'type': 'json', 'value': 'file://%s' % json_file_path}
assert data_sources.get_items(datasource) == []
# invalid json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'wb')
json_file.write(codecs.encode(b'foobar', 'zlib_codec'))
json_file.close()
assert data_sources.get_items(datasource) == []
# empty json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == []
# unrelated json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump('foobar', json_file)
json_file.close()
assert data_sources.get_items(datasource) == []
# another unrelated json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': 'foobar'}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == []
# json file not using dictionaries
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [['1', 'foo'], ['2', 'bar']]}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == []
# a good json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar'})]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
# a json file with additional keys
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '1', 'text': 'foo', 'more': 'xxx'},
{'id': '2', 'text': 'bar', 'more': 'yyy'}]}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'})]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'more': 'xxx'},
{'id': '2', 'text': 'bar', 'more': 'yyy'}]
# json specified with a variadic url
get_request().datasources_cache = {}
class JsonUrlPath(object):
def get_substitution_variables(self):
return {'json_url': 'file://%s' % json_file_path}
pub.substitutions.feed(JsonUrlPath())
datasource = {'type': 'json', 'value': '[json_url]'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'})]
# same with django templated url
get_request().datasources_cache = {}
class JsonUrlPath(object):
def get_substitution_variables(self):
return {'json_url': 'file://%s' % json_file_path}
pub.substitutions.feed(JsonUrlPath())
datasource = {'type': 'json', 'value': '{{ json_url }}'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'})]
# json specified with a variadic url with an erroneous space
get_request().datasources_cache = {}
class JsonUrlPath(object):
def get_substitution_variables(self):
return {'json_url': 'file://%s' % json_file_path}
pub.substitutions.feed(JsonUrlPath())
datasource = {'type': 'json', 'value': ' [json_url]'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'})]
# same with django templated url
get_request().datasources_cache = {}
class JsonUrlPath(object):
def get_substitution_variables(self):
return {'json_url': 'file://%s' % json_file_path}
pub.substitutions.feed(JsonUrlPath())
datasource = {'type': 'json', 'value': ' {{ json_url }}'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'})]
# a json file with integer as 'id'
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': 1, 'text': 'foo'}, {'id': 2, 'text': 'bar'}]}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': 1, 'text': 'foo'}),
('2', 'bar', '2', {'id': 2, 'text': 'bar'})]
assert data_sources.get_structured_items(datasource) == [
{'id': 1, 'text': 'foo'}, {'id': 2, 'text': 'bar'}]
# a json file with empty or no text values
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '1', 'text': ''}, {'id': '2'}]}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == [
('1', '', '1', {'id': '1', 'text': ''}),
('2', '2', '2', {'id': '2', 'text': '2'})]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': ''},
{'id': '2', 'text': '2'}]
# a json file with empty or no id
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '', 'text': 'foo'}, {'text': 'bar'}, {'id': None}]}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == []
assert data_sources.get_structured_items(datasource) == []
# specify data_attribute
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'data_attribute': 'results'}
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'results': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
json_file.close()
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'data_attribute': 'data'}
get_request().datasources_cache = {}
assert data_sources.get_structured_items(datasource) == []
# specify id_attribute
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'id_attribute': 'pk'}
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'pk': '1', 'text': 'foo'}, {'pk': '2', 'text': 'bar'}]}, json_file)
json_file.close()
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'pk': '1'}, {'id': '2', 'text': 'bar', 'pk': '2'}]
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'id_attribute': 'id'}
get_request().datasources_cache = {}
assert data_sources.get_structured_items(datasource) == []
# specify text_attribute
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'text_attribute': 'label'}
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '1', 'label': 'foo'}, {'id': '2', 'label': 'bar'}]}, json_file)
json_file.close()
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'label': 'foo'}, {'id': '2', 'text': 'bar', 'label': 'bar'}]
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'text_attribute': 'text'}
get_request().datasources_cache = {}
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': '1', 'label': 'foo'}, {'id': '2', 'text': '2', 'label': 'bar'}]
def test_json_datasource_bad_url(http_requests, caplog):
datasource = {'type': 'json', 'value': 'http://remote.example.net/404'}
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in caplog.records[-1].message
assert 'status: 404' in caplog.records[-1].message
datasource = {'type': 'json', 'value': 'http://remote.example.net/xml'}
assert data_sources.get_items(datasource) == []
assert 'Error reading JSON data source output' in caplog.records[-1].message
assert 'Expecting value:' in caplog.records[-1].message
datasource = {'type': 'json', 'value': 'http://remote.example.net/connection-error'}
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in caplog.records[-1].message
assert 'error' in caplog.records[-1].message
datasource = {'type': 'json', 'value': 'http://remote.example.net/json-list-err1'}
assert data_sources.get_items(datasource) == []
assert 'Error reading JSON data source output (err 1)' in caplog.records[-1].message
def test_json_datasource_bad_url_scheme(caplog):
datasource = {'type': 'json', 'value': ''}
assert data_sources.get_items(datasource) == []
assert caplog.records[-1].message == 'Empty URL in JSON data source'
datasource = {'type': 'json', 'value': 'foo://bar'}
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in caplog.records[-1].message
assert 'invalid scheme in URL' in caplog.records[-1].message
datasource = {'type': 'json', 'value': '/bla/blo'}
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in caplog.records[-1].message
assert 'invalid scheme in URL' in caplog.records[-1].message
def test_geojson_datasource(requests_pub, http_requests):
get_request()
get_request().datasources_cache = {}
datasource = {'type': 'geojson', 'value': ''}
assert data_sources.get_items(datasource) == []
# missing file
get_request().datasources_cache = {}
geojson_file_path = os.path.join(pub.app_dir, 'test.geojson')
datasource = {'type': 'geojson', 'value': 'file://%s' % geojson_file_path}
assert data_sources.get_items(datasource) == []
# invalid geojson file
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'wb')
geojson_file.write(codecs.encode(b'foobar', 'zlib_codec'))
geojson_file.close()
assert data_sources.get_items(datasource) == []
# empty geojson file
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump({}, geojson_file)
geojson_file.close()
assert data_sources.get_items(datasource) == []
# unrelated geojson file
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump('foobar', geojson_file)
geojson_file.close()
assert data_sources.get_items(datasource) == []
# another unrelated geojson file
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump({'features': 'foobar'}, geojson_file)
geojson_file.close()
assert data_sources.get_items(datasource) == []
# a good geojson file
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump({'features': [
{'properties': {'id': '1', 'text': 'foo'}},
{'properties': {'id': '2', 'text': 'bar'}}]}, geojson_file)
geojson_file.close()
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'properties': {'id': '1', 'text': 'foo'}}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'properties': {'id': '2', 'text': 'bar'}})]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'properties': {'id': '1', 'text': 'foo'}},
{'id': '2', 'text': 'bar', 'properties': {'id': '2', 'text': 'bar'}}]
# a geojson file with additional keys
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump({'features': [
{'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}},
{'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}}]}, geojson_file)
geojson_file.close()
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}})]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}},
{'id': '2', 'text': 'bar', 'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}}]
# geojson specified with a variadic url
get_request().datasources_cache = {}
class GeoJSONUrlPath(object):
def get_substitution_variables(self):
return {'geojson_url': 'file://%s' % geojson_file_path}
pub.substitutions.feed(GeoJSONUrlPath())
datasource = {'type': 'geojson', 'value': '[geojson_url]'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}})]
# same with django templated url
get_request().datasources_cache = {}
class GeoJSONUrlPath(object):
def get_substitution_variables(self):
return {'geojson_url': 'file://%s' % geojson_file_path}
pub.substitutions.feed(GeoJSONUrlPath())
datasource = {'type': 'geojson', 'value': '{{ geojson_url }}'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}})]
# geojson specified with a variadic url with an erroneous space
get_request().datasources_cache = {}
class GeoJSONUrlPath(object):
def get_substitution_variables(self):
return {'geojson_url': 'file://%s' % geojson_file_path}
pub.substitutions.feed(GeoJSONUrlPath())
datasource = {'type': 'geojson', 'value': ' [geojson_url]'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}})]
# same with django templated url
get_request().datasources_cache = {}
class GeoJSONUrlPath(object):
def get_substitution_variables(self):
return {'geojson_url': 'file://%s' % geojson_file_path}
pub.substitutions.feed(GeoJSONUrlPath())
datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}})]
# a geojson file with integer as 'id'
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump({'features': [
{'properties': {'id': 1, 'text': 'foo'}},
{'properties': {'id': 2, 'text': 'bar'}}]}, geojson_file)
geojson_file.close()
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': 1, 'text': 'foo', 'properties': {'id': 1, 'text': 'foo'}}),
('2', 'bar', '2', {'id': 2, 'text': 'bar', 'properties': {'id': 2, 'text': 'bar'}})]
assert data_sources.get_structured_items(datasource) == [
{'id': 1, 'text': 'foo', 'properties': {'id': 1, 'text': 'foo'}},
{'id': 2, 'text': 'bar', 'properties': {'id': 2, 'text': 'bar'}}]
# a geojson file with empty or no text values
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump({'features': [
{'properties': {'id': '1', 'text': ''}},
{'properties': {'id': '2'}}]}, geojson_file)
geojson_file.close()
assert data_sources.get_items(datasource) == [
('1', '1', '1', {'id': '1', 'text': '1', 'properties': {'id': '1', 'text': ''}}),
('2', '2', '2', {'id': '2', 'text': '2', 'properties': {'id': '2'}})]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': '1', 'properties': {'id': '1', 'text': ''}},
{'id': '2', 'text': '2', 'properties': {'id': '2'}}]
# a geojson file with empty or no id
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump({'features': [
{'properties': {'id': '', 'text': 'foo'}},
{'properties': {'text': 'bar'}},
{'properties': {'id': None}}]}, geojson_file)
geojson_file.close()
assert data_sources.get_items(datasource) == []
assert data_sources.get_structured_items(datasource) == []
# specify id_property
datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'id_property': 'gid'}
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump({'features': [
{'properties': {'gid': '1', 'text': 'foo'}},
{'properties': {'gid': '2', 'text': 'bar'}}]}, geojson_file)
geojson_file.close()
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'properties': {'gid': '1', 'text': 'foo'}},
{'id': '2', 'text': 'bar', 'properties': {'gid': '2', 'text': 'bar'}}]
datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'id_property': 'id'}
get_request().datasources_cache = {}
assert data_sources.get_structured_items(datasource) == []
# specify label_template_property
datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'label_template_property': '{{ id }}: {{ text }}'}
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump({'features': [
{'properties': {'id': '1', 'text': 'foo'}},
{'properties': {'id': '2', 'text': 'bar'}}]}, geojson_file)
geojson_file.close()
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': '1: foo', 'properties': {'id': '1', 'text': 'foo'}},
{'id': '2', 'text': '2: bar', 'properties': {'id': '2', 'text': 'bar'}}]
# wrong template
get_request().datasources_cache = {}
datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'label_template_property': '{{ text }'}
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': '{{ text }', 'properties': {'id': '1', 'text': 'foo'}},
{'id': '2', 'text': '{{ text }', 'properties': {'id': '2', 'text': 'bar'}}]
get_request().datasources_cache = {}
datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'label_template_property': 'text'}
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'text', 'properties': {'id': '1', 'text': 'foo'}},
{'id': '2', 'text': 'text', 'properties': {'id': '2', 'text': 'bar'}}]
# unknown property or empty value
datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'label_template_property': '{{ label }}'}
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump({'features': [
{'properties': {'id': '1', 'text': 'foo', 'label': ''}},
{'properties': {'id': '2', 'text': 'bar'}}]}, geojson_file)
geojson_file.close()
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': '1', 'properties': {'id': '1', 'text': 'foo', 'label': ''}},
{'id': '2', 'text': '2', 'properties': {'id': '2', 'text': 'bar'}}]
def test_geojson_datasource_bad_url(http_requests, caplog):
datasource = {'type': 'geojson', 'value': 'http://remote.example.net/404'}
assert data_sources.get_items(datasource) == []
assert 'Error loading GeoJSON data source' in caplog.records[-1].message
assert 'status: 404' in caplog.records[-1].message
datasource = {'type': 'geojson', 'value': 'http://remote.example.net/xml'}
assert data_sources.get_items(datasource) == []
assert 'Error reading GeoJSON data source output' in caplog.records[-1].message
assert 'Expecting value:' in caplog.records[-1].message
datasource = {'type': 'geojson', 'value': 'http://remote.example.net/connection-error'}
assert data_sources.get_items(datasource) == []
assert 'Error loading GeoJSON data source' in caplog.records[-1].message
assert 'error' in caplog.records[-1].message
datasource = {'type': 'geojson', 'value': 'http://remote.example.net/json-list-err1'}
assert data_sources.get_items(datasource) == []
assert 'Error reading GeoJSON data source output (err 1)' in caplog.records[-1].message
def test_geojson_datasource_bad_url_scheme(caplog):
datasource = {'type': 'geojson', 'value': ''}
assert data_sources.get_items(datasource) == []
assert caplog.records[-1].message == 'Empty URL in GeoJSON data source'
datasource = {'type': 'geojson', 'value': 'foo://bar'}
assert data_sources.get_items(datasource) == []
assert 'Error loading GeoJSON data source' in caplog.records[-1].message
assert 'invalid scheme in URL' in caplog.records[-1].message
datasource = {'type': 'geojson', 'value': '/bla/blo'}
assert data_sources.get_items(datasource) == []
assert 'Error loading GeoJSON data source' in caplog.records[-1].message
assert 'invalid scheme in URL' in caplog.records[-1].message
def test_item_field_named_python_datasource():
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'formula',
'value': repr([('1', 'un'), ('2', 'deux')])}
data_source.store()
field = fields.ItemField()
field.id = 1
field.data_source = {
'type': 'foobar', # use the named data source defined earlier
}
form = Form()
field.add_to_form(form)
widget = form.get_widget('f1')
assert widget is not None
assert widget.options == [('1', 'un', '1'), ('2', 'deux', '2')]
def test_register_data_source_function():
def xxx():
return [('1', 'foo'), ('2', 'bar')]
register_data_source_function(xxx)
datasource = {'type': 'formula', 'value': 'xxx()'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar'})]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
def test_data_source_substitution_variables():
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'formula', 'value': repr(['un', 'deux'])}
data_source.store()
context = pub.substitutions.get_context_variables()
assert context.get('data_source').foobar == [
{'id': 'un', 'text': 'un'}, {'id': 'deux', 'text': 'deux'}]
def test_data_source_slug_name():
NamedDataSource.wipe()
data_source = NamedDataSource(name='foo bar')
data_source.store()
assert data_source.slug == 'foo_bar'
def test_data_source_new_id():
NamedDataSource.wipe()
data_source = NamedDataSource(name='foo bar')
data_source.store()
assert data_source.id == '1'
data_source = NamedDataSource(name='foo bar2')
data_source.store()
assert data_source.id == '2'
data_source.remove_self()
data_source = NamedDataSource(name='foo bar3')
data_source.store()
assert data_source.id == '3'
NamedDataSource.wipe()
data_source = NamedDataSource(name='foo bar4')
data_source.store()
assert data_source.id == '1'
def test_optional_item_field_with_data_source():
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'formula',
'value': repr([('1', 'un'), ('2', 'deux')])}
data_source.store()
field = fields.ItemField()
field.id = 1
field.required = False
field.data_source = {
'type': 'foobar', # use the named data source defined earlier
}
form = Form()
field.add_to_form(form)
widget = form.get_widget('f1')
assert widget is not None
assert widget.options == [('1', 'un', '1'), ('2', 'deux', '2')]
def test_data_source_unicode():
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'formula', 'value': "['uné', 'deux']"}
data_source.store()
data_source2 = NamedDataSource.select()[0]
assert data_source2.data_source == data_source.data_source
assert data_sources.get_items({'type': 'foobar'}) == [
('uné', 'uné', 'uné', {'id': 'uné', 'text': 'uné'}),
('deux', 'deux', 'deux', {'id': 'deux', 'text': 'deux'}),
]
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'json', 'value': "https://whatever.com/json"}
data_source.store()
data_source2 = NamedDataSource.select()[0]
assert data_source2.data_source == data_source.data_source
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zéro"}, {"id": 1, "text": "uné"}, {"id": 2, "text": "deux"}]}')
assert data_sources.get_items({'type': 'foobar'}) == [
('0', 'zéro', '0', {"id": 0, "text": "zéro"}),
('1', 'uné', '1', {"id": 1, "text": "uné"}),
('2', 'deux', '2', {"id": 2, "text": "deux"}),
]
def test_data_source_signed(no_request_pub):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json"}
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
signed_url = urlopen.call_args[0][0]
assert signed_url.startswith('https://api.example.com/json?')
parsed = urlparse.urlparse(signed_url)
querystring = urlparse.parse_qs(parsed.query)
# stupid simple (but sufficient) signature test:
assert querystring['algo'] == ['sha256']
assert querystring['orig'] == ['example.net']
assert querystring['nonce'][0]
assert querystring['timestamp'][0]
assert querystring['signature'][0]
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json?foo=bar"}
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
signed_url = urlopen.call_args[0][0]
assert signed_url.startswith('https://api.example.com/json?')
parsed = urlparse.urlparse(signed_url)
querystring = urlparse.parse_qs(parsed.query)
assert querystring['algo'] == ['sha256']
assert querystring['orig'] == ['example.net']
assert querystring['nonce'][0]
assert querystring['timestamp'][0]
assert querystring['signature'][0]
assert querystring['foo'][0] == 'bar'
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json"}
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
unsigned_url = urlopen.call_args[0][0]
assert unsigned_url == 'https://no-secret.example.com/json'
def test_named_datasource_json_cache(requests_pub):
NamedDataSource.wipe()
datasource = NamedDataSource(name='foobar')
datasource.data_source = {'type': 'json', 'value': 'http://whatever/'}
datasource.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
json.dumps({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}))
assert data_sources.get_structured_items({'type': 'foobar'}) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
assert urlopen.call_count == 1
get_request().datasources_cache = {}
assert data_sources.get_structured_items({'type': 'foobar'}) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
assert urlopen.call_count == 2
datasource.cache_duration = '60'
datasource.store()
# will cache
get_request().datasources_cache = {}
assert data_sources.get_structured_items({'type': 'foobar'}) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
assert urlopen.call_count == 3
# will get from cache
get_request().datasources_cache = {}
assert data_sources.get_structured_items({'type': 'foobar'}) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
assert urlopen.call_count == 3
def test_named_datasource_id_parameter(requests_pub):
NamedDataSource.wipe()
datasource = NamedDataSource(name='foobar')
datasource.data_source = {'type': 'json', 'value': 'http://whatever/'}
datasource.id_parameter = 'id'
datasource.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
assert datasource.get_structured_value('1') == value[0]
assert urlopen.call_count == 1
assert urlopen.call_args[0][0] == 'http://whatever/?id=1'
# try again, get from request.datasources_cache
assert datasource.get_structured_value('1') == value[0]
assert urlopen.call_count == 1 # no new call
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
assert datasource.get_structured_value('1') == value[0]
assert urlopen.call_count == 1
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': []})) # empty list
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 1
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value, 'err': 0}))
assert datasource.get_structured_value('1') == value[0]
assert urlopen.call_count == 1
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value, 'err': 1}))
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 1
# no cache for errors
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 2 # called again
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = {'id': '1', 'text': 'foo'} # not a list
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 1
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO('not json')
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 1
# ws badly configured, return all items
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
assert datasource.get_structured_value('2') == value[1]
assert urlopen.call_count == 1
# try again, get from request.datasources_cache
assert datasource.get_structured_value('2') == value[1]
assert urlopen.call_count == 1 # no new call
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
assert datasource.get_structured_value('3') is None
assert urlopen.call_count == 1
# try again, get from request.datasources_cache
assert datasource.get_structured_value('3') is None
assert urlopen.call_count == 1 # no new call
def test_named_datasource_in_formdef():
from wcs.formdef import FormDef
NamedDataSource.wipe()
datasource = NamedDataSource(name='foobar')
datasource.data_source = {'type': 'json', 'value': 'http://whatever/'}
datasource.store()
assert datasource.slug == 'foobar'
formdef = FormDef()
assert not datasource.is_used_in_formdef(formdef)
formdef.fields = [
fields.ItemField(id='0', label='string', type='item',
data_source={'type': 'foobar'}),
]
assert datasource.is_used_in_formdef(formdef)
datasource.slug = 'barfoo'
assert not datasource.is_used_in_formdef(formdef)