forms: extend json autocomplete support for be asynchronous if possible (#31492)
This commit is contained in:
parent
45a374ae50
commit
065247ae13
|
@ -4785,6 +4785,138 @@ def test_items_field_with_disabled_items(http_requests, pub):
|
|||
assert formdef.data_class().select()[0].data['0'] == ['2']
|
||||
assert formdef.data_class().select()[0].data['0_display'] == 'world'
|
||||
|
||||
def test_item_field_autocomplete_json_source(http_requests, pub):
|
||||
user = create_user(pub)
|
||||
formdef = create_formdef()
|
||||
formdef.data_class().wipe()
|
||||
|
||||
NamedDataSource.wipe()
|
||||
data_source = NamedDataSource(name='foobar')
|
||||
data_source.data_source = {'type': 'json', 'value': 'http://remote.example.net/json'}
|
||||
data_source.store()
|
||||
|
||||
formdef.fields = [
|
||||
fields.ItemField(id='0', label='string', type='item',
|
||||
data_source={'type': 'foobar'},
|
||||
display_mode='autocomplete',
|
||||
),
|
||||
]
|
||||
formdef.store()
|
||||
|
||||
with mock.patch('qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'},
|
||||
{'id': '2', 'text': 'world', 'extra': 'bar'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert 'data-autocomplete="true"' in resp.body
|
||||
resp.form['f0'] = '2'
|
||||
resp = resp.form.submit('submit') # -> validation page
|
||||
resp = resp.form.submit('submit') # -> submit
|
||||
assert formdef.data_class().select()[0].data['0'] == '2'
|
||||
assert formdef.data_class().select()[0].data['0_display'] == 'world'
|
||||
assert formdef.data_class().select()[0].data['0_structured'] == data['data'][1]
|
||||
|
||||
# check with possibility of remote query
|
||||
data_source.query_parameter = 'q'
|
||||
data_source.id_parameter = 'id'
|
||||
data_source.store()
|
||||
|
||||
formdef.data_class().wipe()
|
||||
|
||||
app = get_app(pub)
|
||||
with mock.patch('qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'},
|
||||
{'id': '2', 'text': 'world', 'extra': 'bar'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
|
||||
resp = app.get('/test/')
|
||||
assert urlopen.call_count == 0
|
||||
pq = resp.pyquery.remove_namespaces()
|
||||
select2_url = pq('select').attr['data-select2-url']
|
||||
|
||||
with mock.patch('qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
|
||||
resp2 = app.get(select2_url + '?q=hell')
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?q=hell'
|
||||
assert resp2.json == data
|
||||
|
||||
# check unauthorized access
|
||||
resp2 = get_app(pub).get(select2_url + '?q=hell', status=403)
|
||||
|
||||
# simulate select2 mode, with qommon.forms.js adding an extra hidden widget
|
||||
resp.form.fields['f0_display'] = Hidden(form=resp.form, tag='input', name='f0_display', pos=10)
|
||||
resp.form['f0'].force_value('1')
|
||||
resp.form.fields['f0_display'].force_value('hello')
|
||||
|
||||
with mock.patch('qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
|
||||
resp = resp.form.submit('submit') # -> validation page
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?id=1'
|
||||
assert resp.form['f0'].value == '1'
|
||||
assert resp.form['f0_label'].value == 'hello'
|
||||
|
||||
with mock.patch('qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
|
||||
resp = resp.form.submit('submit') # -> submit
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?id=1'
|
||||
assert formdef.data_class().select()[0].data['0'] == '1'
|
||||
assert formdef.data_class().select()[0].data['0_display'] == 'hello'
|
||||
assert formdef.data_class().select()[0].data['0_structured'] == data['data'][0]
|
||||
|
||||
# same thing with signed URLs
|
||||
formdef.data_class().wipe()
|
||||
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write('''\
|
||||
[wscall-secrets]
|
||||
remote.example.net = 1234
|
||||
''')
|
||||
|
||||
app = get_app(pub)
|
||||
with mock.patch('qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'},
|
||||
{'id': '2', 'text': 'world', 'extra': 'bar'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
|
||||
resp = app.get('/test/')
|
||||
assert urlopen.call_count == 0
|
||||
pq = resp.pyquery.remove_namespaces()
|
||||
select2_url = pq('select').attr['data-select2-url']
|
||||
|
||||
with mock.patch('qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
|
||||
resp2 = app.get(select2_url + '?q=hell')
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?q=hell&orig=example.net&')
|
||||
assert resp2.json == data
|
||||
|
||||
# simulate select2 mode, with qommon.forms.js adding an extra hidden widget
|
||||
resp.form.fields['f0_display'] = Hidden(form=resp.form, tag='input', name='f0_display', pos=10)
|
||||
resp.form['f0'].force_value('1')
|
||||
resp.form.fields['f0_display'].force_value('hello')
|
||||
|
||||
with mock.patch('qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
|
||||
resp = resp.form.submit('submit') # -> validation page
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?id=1&orig=example.net&')
|
||||
assert resp.form['f0'].value == '1'
|
||||
assert resp.form['f0_label'].value == 'hello'
|
||||
|
||||
with mock.patch('qommon.misc.urlopen') as urlopen:
|
||||
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]}
|
||||
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
|
||||
resp = resp.form.submit('submit') # -> submit
|
||||
assert urlopen.call_count == 1
|
||||
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?id=1&orig=example.net&')
|
||||
assert formdef.data_class().select()[0].data['0'] == '1'
|
||||
assert formdef.data_class().select()[0].data['0_display'] == 'hello'
|
||||
assert formdef.data_class().select()[0].data['0_structured'] == data['data'][0]
|
||||
|
||||
def test_item_field_autocomplete_jsonp_source(http_requests, pub):
|
||||
user = create_user(pub)
|
||||
formdef = create_formdef()
|
||||
|
|
|
@ -56,6 +56,26 @@ class NamedDataSourceUI(object):
|
|||
'data-dynamic-display-child-of': 'data_source$type',
|
||||
'data-dynamic-display-value': _('JSON URL'),
|
||||
})
|
||||
form.add(StringWidget, 'query_parameter',
|
||||
value=self.datasource.query_parameter,
|
||||
title=_('Query Parameter'),
|
||||
hint=_('Name of the parameter to use for querying source (typically, q)'),
|
||||
required=False,
|
||||
advanced=False,
|
||||
attrs={
|
||||
'data-dynamic-display-child-of': 'data_source$type',
|
||||
'data-dynamic-display-value': _('JSON URL'),
|
||||
})
|
||||
form.add(StringWidget, 'id_parameter',
|
||||
value=self.datasource.id_parameter,
|
||||
title=_('Id Parameter'),
|
||||
hint=_('Name of the parameter to use to get a given entry from data source (typically, id)'),
|
||||
required=False,
|
||||
advanced=False,
|
||||
attrs={
|
||||
'data-dynamic-display-child-of': 'data_source$type',
|
||||
'data-dynamic-display-value': _('JSON URL'),
|
||||
})
|
||||
if self.datasource.slug:
|
||||
form.add(StringWidget, 'slug',
|
||||
value=self.datasource.slug,
|
||||
|
@ -88,6 +108,8 @@ class NamedDataSourceUI(object):
|
|||
self.datasource.description = form.get_widget('description').parse()
|
||||
self.datasource.data_source = form.get_widget('data_source')
|
||||
self.datasource.cache_duration = form.get_widget('cache_duration').parse()
|
||||
self.datasource.query_parameter = form.get_widget('query_parameter').parse()
|
||||
self.datasource.id_parameter = form.get_widget('id_parameter').parse()
|
||||
if self.datasource.slug:
|
||||
self.datasource.slug = slug
|
||||
self.datasource.store()
|
||||
|
|
22
wcs/api.py
22
wcs/api.py
|
@ -19,9 +19,11 @@ import re
|
|||
import time
|
||||
import sys
|
||||
|
||||
from quixote import get_request, get_publisher, get_response
|
||||
from quixote import get_request, get_publisher, get_response, get_session
|
||||
from quixote.directory import Directory
|
||||
|
||||
from django.utils.six.moves.urllib import parse as urllib
|
||||
|
||||
from qommon import _
|
||||
from qommon import misc
|
||||
from qommon.evalutils import make_datetime
|
||||
|
@ -31,11 +33,12 @@ from qommon.form import ComputedExpressionWidget, ConditionWidget
|
|||
|
||||
from wcs.categories import Category
|
||||
from wcs.conditions import Condition, ValidationError
|
||||
from wcs.data_sources import NamedDataSource
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.roles import Role, logged_users_role
|
||||
from wcs.forms.common import FormStatusPage
|
||||
import wcs.qommon.storage as st
|
||||
from wcs.api_utils import is_url_signed, get_user_from_api_query_string
|
||||
from wcs.api_utils import sign_url_auto_orig, is_url_signed, get_user_from_api_query_string
|
||||
|
||||
from backoffice.management import FormPage as BackofficeFormPage
|
||||
from backoffice.management import ManagementDirectory
|
||||
|
@ -741,9 +744,21 @@ class ApiTrackingCodeDirectory(Directory):
|
|||
return json.dumps(data)
|
||||
|
||||
|
||||
class AutocompleteDirectory(Directory):
|
||||
def _q_lookup(self, component):
|
||||
url = get_session().get_data_source_query_url_from_token(component)
|
||||
if not url:
|
||||
raise AccessForbiddenError()
|
||||
url += urllib.quote(get_request().form['q'])
|
||||
unsigned_url = url
|
||||
url = sign_url_auto_orig(url)
|
||||
get_response().set_content_type('application/json')
|
||||
return misc.urlopen(url).read()
|
||||
|
||||
|
||||
class ApiDirectory(Directory):
|
||||
_q_exports = ['forms', 'roles', ('reverse-geocoding', 'reverse_geocoding'),
|
||||
'formdefs', 'categories', 'user', 'users', 'code',
|
||||
'formdefs', 'categories', 'user', 'users', 'code', 'autocomplete',
|
||||
('validate-expression', 'validate_expression'),
|
||||
('validate-condition', 'validate_condition')]
|
||||
|
||||
|
@ -753,6 +768,7 @@ class ApiDirectory(Directory):
|
|||
user = ApiUserDirectory()
|
||||
users = ApiUsersDirectory()
|
||||
code = ApiTrackingCodeDirectory()
|
||||
autocomplete = AutocompleteDirectory()
|
||||
|
||||
def reverse_geocoding(self):
|
||||
get_response().set_content_type('application/json')
|
||||
|
|
|
@ -21,7 +21,7 @@ import xml.etree.ElementTree as ET
|
|||
from django.utils.six.moves.urllib import parse as urllib
|
||||
from django.utils.six.moves.urllib import parse as urlparse
|
||||
|
||||
from quixote import get_publisher, get_request
|
||||
from quixote import get_publisher, get_request, get_session
|
||||
from quixote.html import TemplateIO
|
||||
|
||||
from qommon import _
|
||||
|
@ -236,10 +236,14 @@ class NamedDataSource(XmlStorableObject):
|
|||
description = None
|
||||
data_source = None
|
||||
cache_duration = None
|
||||
query_parameter = None
|
||||
id_parameter = None
|
||||
|
||||
# declarations for serialization
|
||||
XML_NODES = [('name', 'str'), ('slug', 'str'), ('description', 'str'),
|
||||
('cache_duration', 'str'),
|
||||
('query_parameter', 'str'),
|
||||
('id_parameter', 'str'),
|
||||
('data_source', 'data_source'),
|
||||
]
|
||||
|
||||
|
@ -254,6 +258,8 @@ class NamedDataSource(XmlStorableObject):
|
|||
def can_jsonp(self):
|
||||
if self.type == 'jsonp':
|
||||
return True
|
||||
if self.type == 'json' and self.query_parameter:
|
||||
return True
|
||||
return False
|
||||
|
||||
def migrate(self):
|
||||
|
@ -304,11 +310,70 @@ class NamedDataSource(XmlStorableObject):
|
|||
return objects[0]
|
||||
raise KeyError("data source '%s' does not exist" % slug)
|
||||
|
||||
def get_json_query_url(self):
|
||||
url = self.data_source.get('value').strip()
|
||||
if Template.is_template_string(url):
|
||||
vars = get_publisher().substitutions.get_context_variables(mode='lazy')
|
||||
url = get_variadic_url(url, vars)
|
||||
if not '?' in url:
|
||||
url += '?' + self.query_parameter + '='
|
||||
else:
|
||||
url += '&' + self.query_parameter + '='
|
||||
return url
|
||||
|
||||
def get_jsonp_url(self):
|
||||
if self.type == 'jsonp':
|
||||
return self.data_source.get('value')
|
||||
if self.type == 'json' and self.query_parameter:
|
||||
return '/api/autocomplete/%s' % (
|
||||
get_session().get_data_source_query_url_token(self.get_json_query_url()))
|
||||
return None
|
||||
|
||||
def load_json(self, param_name, param_value):
|
||||
url = self.data_source.get('value').strip()
|
||||
if Template.is_template_string(url):
|
||||
vars = get_publisher().substitutions.get_context_variables(mode='lazy')
|
||||
url = get_variadic_url(url, vars)
|
||||
|
||||
if not '?' in url:
|
||||
url += '?'
|
||||
else:
|
||||
url += '&'
|
||||
url += param_name + '=' + urllib.quote(param_value)
|
||||
|
||||
request = get_request()
|
||||
if hasattr(request, 'datasources_cache') and url in request.datasources_cache:
|
||||
return request.datasources_cache[url]
|
||||
|
||||
unsigned_url = url
|
||||
url = sign_url_auto_orig(url)
|
||||
resp = qommon.misc.urlopen(url).read()
|
||||
if hasattr(request, 'datasources_cache'):
|
||||
request.datasources_cache[unsigned_url] = resp
|
||||
return resp
|
||||
|
||||
def get_display_value(self, option_id):
|
||||
value = self.get_structured_value(option_id)
|
||||
if value:
|
||||
return value.get('text')
|
||||
return None
|
||||
|
||||
def get_structured_value(self, option_id):
|
||||
value = None
|
||||
if self.type == 'json' and self.id_parameter:
|
||||
resp = self.load_json(self.id_parameter, option_id)
|
||||
response = qommon.misc.json_loads(resp)
|
||||
if response['data']:
|
||||
value = response['data'][0]
|
||||
else:
|
||||
for item in get_structured_items(self.data_source, mode='lazy'):
|
||||
if str(item['id']) == str(option_id):
|
||||
value = item
|
||||
break
|
||||
if value is None:
|
||||
return None
|
||||
return value
|
||||
|
||||
@classmethod
|
||||
def get_substitution_variables(cls):
|
||||
return {'data_source': DataSourcesSubstitutionProxy()}
|
||||
|
|
|
@ -1271,19 +1271,7 @@ class ItemField(WidgetField):
|
|||
return get_session().jsonp_display_values.get(
|
||||
'%s_%s' % (data_source.get_jsonp_url(), value))
|
||||
|
||||
label_value = str(value or '')
|
||||
if type(kwargs['options'][0]) in (tuple, list):
|
||||
if len(kwargs['options'][0]) == 2:
|
||||
for key, value in kwargs['options']:
|
||||
if str(key) == str(real_value):
|
||||
label_value = value
|
||||
break
|
||||
elif len(kwargs['options'][0]) == 3:
|
||||
for key, value, key2 in kwargs['options']:
|
||||
if str(key) == str(real_value):
|
||||
label_value = value
|
||||
break
|
||||
return label_value
|
||||
return data_source.get_display_value(value)
|
||||
|
||||
def add_to_view_form(self, form, value = None):
|
||||
real_value = value
|
||||
|
@ -1319,16 +1307,7 @@ class ItemField(WidgetField):
|
|||
get_session().jsonp_display_values[
|
||||
'%s_%s' % (real_data_source.get('value'), value)] = display_value
|
||||
return display_value
|
||||
elif type(kwargs['options'][0]) in (tuple, list):
|
||||
if len(kwargs['options'][0]) == 2:
|
||||
for key, option_value in kwargs['options']:
|
||||
if str(key) == str(value):
|
||||
return option_value
|
||||
elif len(kwargs['options'][0]) == 3:
|
||||
for key, option_value, key_repeat in kwargs['options']:
|
||||
if str(key) == str(value):
|
||||
return option_value
|
||||
return str(value)
|
||||
return self.get_display_value(value)
|
||||
|
||||
def store_structured_value(self, data, field_id):
|
||||
data_source = data_sources.get_object(self.data_source)
|
||||
|
@ -1338,15 +1317,10 @@ class ItemField(WidgetField):
|
|||
if data_source.type == 'jsonp':
|
||||
return
|
||||
|
||||
structured_options = data_sources.get_structured_items(self.data_source)
|
||||
if not structured_options:
|
||||
value = data_source.get_structured_value(data.get(field_id))
|
||||
if value is None or set(value.keys()) == set(['id', 'text']):
|
||||
return
|
||||
if not set(structured_options[0].keys()) != set(['id', 'text']):
|
||||
return
|
||||
for structured_option in structured_options:
|
||||
if str(structured_option.get('id')) == str(value):
|
||||
return structured_option
|
||||
return None
|
||||
return value
|
||||
|
||||
def fill_admin_form(self, form):
|
||||
WidgetField.fill_admin_form(self, form)
|
||||
|
|
|
@ -14,8 +14,8 @@
|
|||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import random
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import qommon.sessions
|
||||
from qommon.sessions import Session
|
||||
|
@ -25,10 +25,14 @@ class BasicSession(Session):
|
|||
magictokens = None
|
||||
anonymous_formdata_keys = None
|
||||
visiting_objects = None
|
||||
data_source_query_url_tokens = None
|
||||
|
||||
def has_info(self):
|
||||
return (self.anonymous_formdata_keys or
|
||||
self.magictokens or self.visiting_objects or Session.has_info(self))
|
||||
self.magictokens or
|
||||
self.visiting_objects or
|
||||
self.data_source_query_url_tokens or
|
||||
Session.has_info(self))
|
||||
is_dirty = has_info
|
||||
|
||||
def add_magictoken(self, token, data):
|
||||
|
@ -120,5 +124,21 @@ class BasicSession(Session):
|
|||
del session.visiting_objects[object_key]
|
||||
session.store()
|
||||
|
||||
def get_data_source_query_url_token(self, url):
|
||||
if not self.data_source_query_url_tokens:
|
||||
self.data_source_query_url_tokens = {}
|
||||
for key, value in self.data_source_query_url_tokens.items():
|
||||
if value == url:
|
||||
return key
|
||||
key = str(uuid.uuid4())
|
||||
self.data_source_query_url_tokens[key] = url
|
||||
self.store()
|
||||
return key
|
||||
|
||||
def get_data_source_query_url_from_token(self, token):
|
||||
if not self.data_source_query_url_tokens:
|
||||
return None
|
||||
return self.data_source_query_url_tokens.get(token)
|
||||
|
||||
qommon.sessions.BasicSession = BasicSession
|
||||
StorageSessionManager = qommon.sessions.StorageSessionManager
|
||||
|
|
Loading…
Reference in New Issue