wcs/tests/test_datasource.py

506 lines
20 KiB
Python

# -*- coding: utf-8 -*-
import codecs
import pytest
import os
import json
import sys
import shutil
from django.utils import six
from django.utils.six import StringIO
from django.utils.six.moves.urllib import parse as urlparse
from quixote import cleanup
from wcs import publisher
from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.form import *
from wcs import fields, data_sources
from wcs.data_sources import NamedDataSource, register_data_source_function
import mock
from test_widgets import MockHtmlForm, mock_form_submission
from utilities import create_temporary_pub
def setup_module(module):
cleanup()
global pub
pub = create_temporary_pub()
pub.cfg['debug'] = {'logger': True}
pub.write_cfg()
pub.set_config()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write('''
[wscall-secrets]
api.example.com = 1234
''')
def teardown_module(module):
shutil.rmtree(pub.APP_DIR)
@pytest.fixture
def no_request_pub(request):
pub._request = None
@pytest.fixture
def requests_pub(request):
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''})
pub._set_request(req)
return req
def test_item_field_python_datasource(requests_pub):
req = get_request()
req.environ['REQUEST_METHOD'] = 'POST'
field = fields.ItemField()
field.id = 1
field.data_source = {
'type': 'formula',
'value': '''[('1', 'un'), ('2', 'deux')]'''
}
form = Form()
field.add_to_form(form)
widget = form.get_widget('f1')
assert widget is not None
assert widget.options == [('1', 'un', '1'), ('2', 'deux', '2')]
form = MockHtmlForm(widget)
mock_form_submission(req, widget, {'f1': ['1']})
assert widget.parse() == '1'
form = Form()
field.add_to_view_form(form, value='1')
widget = form.get_widget('f1')
form = MockHtmlForm(widget)
mock_form_submission(req, widget)
assert widget.parse() == '1'
def test_python_datasource():
plain_list = [('1', 'foo'), ('2', 'bar')]
datasource = {'type': 'formula', 'value': repr(plain_list)}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar'})]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
# invalid python expression
datasource = {'type': 'formula', 'value': 'foobar'}
assert data_sources.get_items(datasource) == []
# expression not iterable
datasource = {'type': 'formula', 'value': '2'}
assert data_sources.get_items(datasource) == []
# three-item tuples
plain_list = [('1', 'foo', 'a'), ('2', 'bar', 'b')]
datasource = {'type': 'formula', 'value': repr(plain_list)}
assert data_sources.get_items(datasource) == [
('1', 'foo', 'a', {'id': '1', 'key': 'a', 'text': 'foo'}),
('2', 'bar', 'b', {'id': '2', 'key': 'b', 'text': 'bar'})]
# single-item tuples
plain_list = [('foo', ), ('bar', )]
datasource = {'type': 'formula', 'value': repr(plain_list)}
assert data_sources.get_items(datasource) == [
('foo', 'foo', 'foo', {'id': 'foo', 'text': 'foo'}),
('bar', 'bar', 'bar', {'id': 'bar', 'text': 'bar'})]
# list of strings
plain_list = ['foo', 'bar']
datasource = {'type': 'formula', 'value': repr(plain_list)}
assert data_sources.get_items(datasource) == [
('foo', 'foo', 'foo', {'id': 'foo', 'text': 'foo'}),
('bar', 'bar', 'bar', {'id': 'bar', 'text': 'bar'})]
# list of dicts
plain_list = [{'id': 'foo', 'text': 'Foo'}, {'id': 'bar', 'text': 'Bar', 'disabled': True}]
datasource = {'type': 'formula', 'value': repr(plain_list)}
assert data_sources.get_items(datasource) == [
('foo', 'Foo', 'foo', {'id': 'foo', 'text': 'Foo'})]
assert data_sources.get_items(datasource, include_disabled=True) == [
('foo', 'Foo', 'foo', {'id': 'foo', 'text': 'Foo'}),
('bar', 'Bar', 'bar', {'id': 'bar', 'text': 'Bar', 'disabled': True})]
def test_python_datasource_with_evalutils():
plain_list = [
{'id': 'foo', 'text': 'Foo', 'value': '2017-01-01'},
{'id': 'bar', 'text': 'Bar', 'value': '2015-01-01'}]
datasource = {'type': 'formula', 'value': '[x for x in %s if date(x["value"]) > date("2016-01-01")]' % repr(plain_list)}
assert data_sources.get_items(datasource) == [
('foo', 'Foo', 'foo', {'id': 'foo', 'text': 'Foo', 'value': '2017-01-01'})]
def test_json_datasource(http_requests):
req = get_request()
get_request().datasources_cache = {}
datasource = {'type': 'json', 'value': ''}
assert data_sources.get_items(datasource) == []
# missing file
get_request().datasources_cache = {}
json_file_path = os.path.join(pub.app_dir, 'test.json')
datasource = {'type': 'json', 'value': 'file://%s' % json_file_path}
assert data_sources.get_items(datasource) == []
# invalid json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'wb')
json_file.write(codecs.encode(b'foobar', 'zlib_codec'))
json_file.close()
assert data_sources.get_items(datasource) == []
# empty json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == []
# unrelated json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump('foobar', json_file)
json_file.close()
assert data_sources.get_items(datasource) == []
# another unrelated json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': 'foobar'}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == []
# a good json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar'})]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
# a json file with additional keys
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '1', 'text': 'foo', 'more': 'xxx'},
{'id': '2', 'text': 'bar', 'more': 'yyy'}]}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'})]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'more': 'xxx'},
{'id': '2', 'text': 'bar', 'more': 'yyy'}]
# json specified with a variadic url
get_request().datasources_cache = {}
class JsonUrlPath(object):
def get_substitution_variables(self):
return {'json_url': 'file://%s' % json_file_path}
pub.substitutions.feed(JsonUrlPath())
datasource = {'type': 'json', 'value': '[json_url]'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'})]
# same with django templated url
get_request().datasources_cache = {}
class JsonUrlPath(object):
def get_substitution_variables(self):
return {'json_url': 'file://%s' % json_file_path}
pub.substitutions.feed(JsonUrlPath())
datasource = {'type': 'json', 'value': '{{ json_url }}'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'})]
# json specified with a variadic url with an erroneous space
get_request().datasources_cache = {}
class JsonUrlPath(object):
def get_substitution_variables(self):
return {'json_url': 'file://%s' % json_file_path}
pub.substitutions.feed(JsonUrlPath())
datasource = {'type': 'json', 'value': ' [json_url]'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'})]
# same with django templated url
get_request().datasources_cache = {}
class JsonUrlPath(object):
def get_substitution_variables(self):
return {'json_url': 'file://%s' % json_file_path}
pub.substitutions.feed(JsonUrlPath())
datasource = {'type': 'json', 'value': ' {{ json_url }}'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'})]
# a json file with integer as 'id'
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': 1, 'text': 'foo'}, {'id': 2, 'text': 'bar'}]}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': 1, 'text': 'foo'}),
('2', 'bar', '2', {'id': 2, 'text': 'bar'})]
assert data_sources.get_structured_items(datasource) == [
{'id': 1, 'text': 'foo'}, {'id': 2, 'text': 'bar'}]
# a json file with empty or no text values
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '1', 'text': ''}, {'id': '2'}]}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == [
('1', '', '1', {'id': '1', 'text': ''}),
('2', '2', '2', {'id': '2', 'text': '2'})]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': ''},
{'id': '2', 'text': '2'}]
# a json file with empty or no id
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '', 'text': 'foo'}, {'text': 'bar'}, {'id': None}]}, json_file)
json_file.close()
assert data_sources.get_items(datasource) == []
assert data_sources.get_structured_items(datasource) == []
def test_json_datasource_bad_url(http_requests, caplog):
datasource = {'type': 'json', 'value': 'http://remote.example.net/404'}
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in caplog.records[-1].message
assert 'status: 404' in caplog.records[-1].message
datasource = {'type': 'json', 'value': 'http://remote.example.net/xml'}
assert data_sources.get_items(datasource) == []
assert 'Error reading JSON data source output' in caplog.records[-1].message
if six.PY2:
assert 'No JSON object could be decoded' in caplog.records[-1].message
else:
assert 'Expecting value:' in caplog.records[-1].message
datasource = {'type': 'json', 'value': 'http://remote.example.net/connection-error'}
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in caplog.records[-1].message
assert 'error' in caplog.records[-1].message
def test_json_datasource_bad_url_scheme(caplog):
datasource = {'type': 'json', 'value': ''}
assert data_sources.get_items(datasource) == []
assert caplog.records[-1].message == 'Empty URL in JSON data source'
datasource = {'type': 'json', 'value': 'foo://bar'}
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in caplog.records[-1].message
assert 'invalid scheme in URL' in caplog.records[-1].message
datasource = {'type': 'json', 'value': '/bla/blo'}
assert data_sources.get_items(datasource) == []
assert 'Error loading JSON data source' in caplog.records[-1].message
assert 'invalid scheme in URL' in caplog.records[-1].message
def test_item_field_named_python_datasource():
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'formula',
'value': repr([('1', 'un'), ('2', 'deux')])}
data_source.store()
field = fields.ItemField()
field.id = 1
field.data_source = {
'type': 'foobar', # use the named data source defined earlier
}
form = Form()
field.add_to_form(form)
widget = form.get_widget('f1')
assert widget is not None
assert widget.options == [('1', 'un', '1'), ('2', 'deux', '2')]
def test_register_data_source_function():
def xxx():
return [('1', 'foo'), ('2', 'bar')]
register_data_source_function(xxx)
datasource = {'type': 'formula', 'value': 'xxx()'}
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar'})]
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
def test_data_source_substitution_variables():
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'formula', 'value': repr(['un', 'deux'])}
data_source.store()
context = pub.substitutions.get_context_variables()
assert context.get('data_source').foobar == [
{'id': 'un', 'text': 'un'}, {'id': 'deux', 'text': 'deux'}]
def test_data_source_slug_name():
NamedDataSource.wipe()
data_source = NamedDataSource(name='foo bar')
data_source.store()
assert data_source.slug == 'foo_bar'
def test_optional_item_field_with_data_source():
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'formula',
'value': repr([('1', 'un'), ('2', 'deux')])}
data_source.store()
field = fields.ItemField()
field.id = 1
field.required = False
field.data_source = {
'type': 'foobar', # use the named data source defined earlier
}
form = Form()
field.add_to_form(form)
widget = form.get_widget('f1')
assert widget is not None
assert widget.options == [('1', 'un', '1'), ('2', 'deux', '2')]
def test_data_source_unicode():
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'formula', 'value': "['uné', 'deux']"}
data_source.store()
data_source2 = NamedDataSource.select()[0]
assert data_source2.data_source == data_source.data_source
assert data_sources.get_items({'type': 'foobar'}) == [
('uné', 'uné', 'uné', {'id': 'uné', 'text': 'uné'}),
('deux', 'deux', 'deux', {'id': 'deux', 'text': 'deux'}),
]
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'json', 'value': "https://whatever.com/json"}
data_source.store()
data_source2 = NamedDataSource.select()[0]
assert data_source2.data_source == data_source.data_source
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zéro"}, {"id": 1, "text": "uné"}, {"id": 2, "text": "deux"}]}')
assert data_sources.get_items({'type': 'foobar'}) == [
('0', 'zéro', '0', {"id": 0, "text": "zéro"}),
('1', 'uné', '1', {"id": 1, "text": "uné"}),
('2', 'deux', '2', {"id": 2, "text": "deux"}),
]
def test_data_source_signed(no_request_pub):
NamedDataSource.wipe()
data_source = NamedDataSource(name='foobar')
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json"}
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
signed_url = urlopen.call_args[0][0]
assert signed_url.startswith('https://api.example.com/json?')
parsed = urlparse.urlparse(signed_url)
querystring = urlparse.parse_qs(parsed.query)
# stupid simple (but sufficient) signature test:
assert querystring['algo'] == ['sha256']
assert querystring['orig'] == ['example.net']
assert querystring['nonce'][0]
assert querystring['timestamp'][0]
assert querystring['signature'][0]
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json?foo=bar"}
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
signed_url = urlopen.call_args[0][0]
assert signed_url.startswith('https://api.example.com/json?')
parsed = urlparse.urlparse(signed_url)
querystring = urlparse.parse_qs(parsed.query)
assert querystring['algo'] == ['sha256']
assert querystring['orig'] == ['example.net']
assert querystring['nonce'][0]
assert querystring['timestamp'][0]
assert querystring['signature'][0]
assert querystring['foo'][0] == 'bar'
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json"}
data_source.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
unsigned_url = urlopen.call_args[0][0]
assert unsigned_url == 'https://no-secret.example.com/json'
def test_named_datasource_json_cache(requests_pub):
NamedDataSource.wipe()
datasource = NamedDataSource(name='foobar')
datasource.data_source = {'type': 'json', 'value': 'http://whatever/'}
datasource.store()
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
json.dumps({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}))
assert data_sources.get_structured_items({'type': 'foobar'}) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
assert urlopen.call_count == 1
get_request().datasources_cache = {}
assert data_sources.get_structured_items({'type': 'foobar'}) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
assert urlopen.call_count == 2
datasource.cache_duration = '60'
datasource.store()
# will cache
get_request().datasources_cache = {}
assert data_sources.get_structured_items({'type': 'foobar'}) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
assert urlopen.call_count == 3
# will get from cache
get_request().datasources_cache = {}
assert data_sources.get_structured_items({'type': 'foobar'}) == [
{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]
assert urlopen.call_count == 3
def test_named_datasource_in_formdef():
from wcs.formdef import FormDef
datasource = NamedDataSource(name='foobar')
datasource.data_source = {'type': 'json', 'value': 'http://whatever/'}
datasource.store()
assert datasource.slug == 'foobar'
formdef = FormDef()
assert not datasource.is_used_in_formdef(formdef)
formdef.fields = [
fields.ItemField(id='0', label='string', type='item',
data_source={'type': 'foobar'}),
]
assert datasource.is_used_in_formdef(formdef)
datasource.slug = 'barfoo'
assert not datasource.is_used_in_formdef(formdef)