misc: use tokens to store autocompletion context (#60665)

This commit is contained in:
Frédéric Péters 2022-01-17 17:50:11 +01:00
parent b05cb6ed2a
commit 940c4066d6
4 changed files with 82 additions and 81 deletions

View File

@ -1147,17 +1147,21 @@ class ApiTrackingCodeDirectory(Directory):
class AutocompleteDirectory(Directory):
def _q_lookup(self, component):
info = get_session().get_data_source_query_info_from_token(component)
if not info:
try:
token = get_publisher().token_class.get(component)
if token.type != 'autocomplete':
raise KeyError()
if token.context['session_id'] != get_session().id:
raise KeyError()
if token.context.get('url') == '':
# this is a datasource without a json url
# (typically a Python source)
raise KeyError()
except KeyError:
raise AccessForbiddenError()
get_response().set_content_type('application/json')
if isinstance(info, str) and not info.startswith('carddef:'):
# legacy json source
info = {'url': info}
elif isinstance(info, str):
# legacy carddef source
info = {'carddef_ref': info}
info = token.context
if 'url' in info:
url = info['url']
@ -1180,9 +1184,13 @@ class AutocompleteDirectory(Directory):
# carddef_ref in info
carddef_ref = info['carddef_ref']
custom_view = None
if 'dynamic_custom_view' in info:
custom_view = get_publisher().custom_view_class.get(info['dynamic_custom_view'])
custom_view.filters = info['dynamic_custom_view_filters']
values = CardDef.get_data_source_items(
carddef_ref,
custom_view=info.get('dynamic_custom_view'),
custom_view=custom_view,
query=get_request().form.get('q', ''),
limit=get_request().form.get('page_limit'),
)
@ -1195,9 +1203,15 @@ class GeoJsonDirectory(Directory):
try:
data_source = get_data_source_object({'type': component}, ignore_errors=False)
except KeyError:
info = get_session().get_data_source_query_info_from_token(component)
if not info:
try:
token = get_publisher().token_class.get(component)
if token.type != 'autocomplete':
raise KeyError()
if token.context['session_id'] != get_session().id:
raise KeyError()
except KeyError:
raise TraversalError()
info = token.context
try:
data_source = get_data_source_object({'type': info['slug']}, ignore_errors=False)
except KeyError:

View File

@ -599,53 +599,51 @@ class NamedDataSource(XmlStorableObject):
if self.type == 'jsonp':
return self.data_source.get('value')
token_context = {}
if self.type == 'json' and self.query_parameter:
json_url = self.get_json_query_url()
info = None
if json_url:
info = {'url': json_url, 'data_source': self.id}
return '/api/autocomplete/%s' % (get_session().get_data_source_query_info_token(info))
token_context = {'url': json_url, 'data_source': self.id}
elif self.type and self.type.startswith('carddef:'):
token_context = {'carddef_ref': self.type}
if self.type and self.type.startswith('carddef:'):
api_url = '/api/autocomplete/%s' % (
get_session().get_data_source_query_info_token(
{
'carddef_ref': self.type,
}
)
)
parts = self.type.split(':')
if len(parts) <= 2:
return api_url
if len(parts) > 2:
# custom view, check if it's dynamic
from wcs.carddef import CardDef
from wcs.workflows import WorkflowStatusItem
# custom view, check if it's dynamic
from wcs.carddef import CardDef
from wcs.workflows import WorkflowStatusItem
custom_view = CardDef.get_data_source_custom_view(self.type)
if custom_view is None:
get_publisher().record_error(
_('Unknown custom view "%s" for CardDef "%s"') % (parts[2], parts[1]),
context='[DATASOURCE]',
notify=True,
record=True,
)
return api_url
had_template = False
for filter_key, filter_value in custom_view.filters.items():
if not Template.is_template_string(filter_value):
continue
custom_view.filters[filter_key] = WorkflowStatusItem.compute(filter_value)
had_template = True
if had_template:
# keep altered custom view in session
api_url = '/api/autocomplete/%s' % (
get_session().get_data_source_query_info_token(
{'carddef_ref': self.type, 'dynamic_custom_view': custom_view}
custom_view = CardDef.get_data_source_custom_view(self.type)
if custom_view is None:
get_publisher().record_error(
_('Unknown custom view "%s" for CardDef "%s"') % (parts[2], parts[1]),
context='[DATASOURCE]',
notify=True,
record=True,
)
)
return api_url
else:
had_template = False
for filter_key, filter_value in custom_view.filters.items():
if not Template.is_template_string(filter_value):
continue
custom_view.filters[filter_key] = WorkflowStatusItem.compute(filter_value)
had_template = True
if had_template:
# keep altered custom view in token
token_context = {
'carddef_ref': self.type,
'dynamic_custom_view': custom_view.id,
'dynamic_custom_view_filters': custom_view.filters,
}
if token_context:
token_context['session_id'] = get_session().id
token, created = get_publisher().token_class.get_or_create(
type='autocomplete', context=token_context
)
if created:
token.store()
return '/api/autocomplete/%s' % token.id
return None
@ -656,8 +654,13 @@ class NamedDataSource(XmlStorableObject):
context = get_publisher().substitutions.get_context_variables(mode='lazy')
new_url = get_variadic_url(url, context)
if new_url != url:
info = {'url': new_url, 'slug': self.slug}
return '/api/geojson/%s' % get_session().get_data_source_query_info_token(info)
token_context = {'session_id': get_session().id, 'url': new_url, 'slug': self.slug}
token, created = get_publisher().token_class.get_or_create(
type='autocomplete', context=token_context
)
if created:
token.store()
return '/api/geojson/%s' % token.id
return '/api/geojson/%s' % self.slug
def get_geojson_data(self, force_url=None):

View File

@ -19,10 +19,9 @@ import random
import string
from django.utils.timezone import make_aware, now
from quixote import get_publisher
from .storage import Less, StorableObject
from .storage import Equal, Less, StorableObject
class Token(StorableObject):
@ -49,6 +48,16 @@ class Token(StorableObject):
if not cls.has_key(id):
return id
@classmethod
def get_or_create(cls, type, context):
try:
return (cls.select(clause=[Equal('type', type), Equal('context', context)])[0], False)
except IndexError:
token = cls()
token.type = type
token.context = context
return (token, True)
def migrate(self):
if isinstance(self.expiration, (float, int)):
self.expiration = make_aware(datetime.datetime.fromtimestamp(self.expiration))

View File

@ -16,7 +16,6 @@
import os
import time
import uuid
from .qommon import sessions
from .qommon.sessions import Session
@ -27,14 +26,12 @@ class BasicSession(Session):
magictokens = None
anonymous_formdata_keys = None
visiting_objects = None
data_source_query_url_tokens = None
def has_info(self):
return (
self.anonymous_formdata_keys
or self.magictokens
or self.visiting_objects
or self.data_source_query_url_tokens
or Session.has_info(self)
)
@ -149,28 +146,6 @@ class BasicSession(Session):
del session.visiting_objects[object_key]
session.store()
def get_data_source_query_info_token(self, info):
# info can be an URL in case of remote JSON source or a cards
# reference, like carddef:foobar. It is stored in a dictionary
# with _url_ in its name for historical reasons.
if not self.data_source_query_url_tokens:
self.data_source_query_url_tokens = {}
for key, value in self.data_source_query_url_tokens.items():
if value == info:
return key
key = str(uuid.uuid4())
self.data_source_query_url_tokens[key] = info
if self.id:
# do not save if the session has not been created yet, this happens
# for API calls to create card/form data.
self.store()
return key
def get_data_source_query_info_from_token(self, token):
if not self.data_source_query_url_tokens:
return None
return self.data_source_query_url_tokens.get(token)
sessions.BasicSession = BasicSession
StorageSessionManager = sessions.StorageSessionManager