datasources: check item id (#46031)

This commit is contained in:
Lauréline Guérin 2020-09-01 14:34:52 +02:00
parent aa5ffe8cc4
commit fa4a58429d
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
2 changed files with 32 additions and 3 deletions

View File

@ -832,6 +832,26 @@ def test_named_datasource_id_parameter(requests_pub):
assert datasource.get_structured_value('1') is None
assert urlopen.call_count == 1
# ws badly configured, return all items
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
assert datasource.get_structured_value('2') == value[1]
assert urlopen.call_count == 1
# try again, get from request.datasources_cache
assert datasource.get_structured_value('2') == value[1]
assert urlopen.call_count == 1 # no new call
get_request().datasources_cache = {}
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}]
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value}))
assert datasource.get_structured_value('3') is None
assert urlopen.call_count == 1
# try again, get from request.datasources_cache
assert datasource.get_structured_value('3') is None
assert urlopen.call_count == 1 # no new call
def test_named_datasource_in_formdef():
from wcs.formdef import FormDef

View File

@ -432,25 +432,34 @@ class NamedDataSource(XmlStorableObject):
vars = get_publisher().substitutions.get_context_variables(mode='lazy')
url = get_variadic_url(url, vars)
if not '?' in url:
if '?' not in url:
url += '?'
else:
url += '&'
url += param_name + '=' + urllib.quote(param_value)
def find_item(items, name, value):
from wcs.logged_errors import LoggedError
for item in items:
if item.get(name) == value:
return item
# not found
LoggedError.record(_('Could not find element by id "%s"') % value)
return None
request = get_request()
if hasattr(request, 'datasources_cache') and url in request.datasources_cache:
items = request.datasources_cache[url]
if not items: # cache may contains empty list from get_structured_items
return None
return items[0]
return find_item(items, param_name, param_value)
items = request_json_items(url, self.data_source)
if not items: # None or empty list are not valid
return None
if hasattr(request, 'datasources_cache'):
request.datasources_cache[url] = items
return items[0]
return find_item(items, param_name, param_value)
def get_display_value(self, option_id):
value = self.get_structured_value(option_id)