misc: use transient data objects for autocomplete contexts (#76943) #277

Merged
fpeters merged 1 commits from wip/76943-use-transient-data-for-autocomplete-tokens into main 2023-05-02 08:16:37 +02:00
4 changed files with 31 additions and 28 deletions

View File

@ -29,6 +29,7 @@ from wcs.qommon.misc import ConnectionError
from wcs.qommon.substitution import CompatibilityNamesDict
from wcs.qommon.template import Template
from wcs.roles import logged_users_role
from wcs.sql import TransientData
from wcs.tracking_code import TrackingCode
from wcs.wf.create_formdata import JournalAssignationErrorPart, Mapping
from wcs.wf.form import WorkflowFormFieldsFormDef
@ -5331,7 +5332,7 @@ def test_form_string_field_autocomplete_named_datasource(pub):
def test_form_autocomplete_named_datasource_expired_token(pub):
CardDef.wipe()
FormDef.wipe()
pub.token_class.wipe()
TransientData.wipe()
formdef = FormDef()
formdef.name = 'test'
@ -5353,13 +5354,11 @@ def test_form_autocomplete_named_datasource_expired_token(pub):
data_source.store()
resp = get_app(pub).get('/test/')
assert pub.token_class.count() == 1
token = pub.token_class.select()[0]
assert TransientData.count() == 1
token = TransientData.select()[0]
assert '/api/autocomplete/%s' % token.id in resp.text
token.set_expiration_delay(-1) # expired
token.store()
# check a new token is generated
# new session, check a new token is generated
resp = get_app(pub).get('/test/')
assert '/api/autocomplete/%s' % token.id not in resp.text
@ -5368,7 +5367,7 @@ def test_form_autocomplete_named_datasource_expired_token(pub):
def test_form_autocomplete_named_datasource_cache_duration(pub, sign):
CardDef.wipe()
FormDef.wipe()
pub.token_class.wipe()
TransientData.wipe()
formdef = FormDef()
formdef.name = 'test'
@ -5400,8 +5399,8 @@ remote.example.net = 1234
app = get_app(pub)
resp = app.get('/test/')
assert pub.token_class.count() == 1
token = pub.token_class.select()[0]
assert TransientData.count() == 1
token = TransientData.select()[0]
assert '/api/autocomplete/%s' % token.id in resp.text
with responses.RequestsMock() as rsps:

View File

@ -1173,12 +1173,8 @@ class AutocompleteDirectory(Directory):
def _q_lookup(self, component):
get_request().ignore_session = True
try:
token = get_publisher().token_class.get(component)
if token.type != 'autocomplete':
raise KeyError()
if token.context['session_id'] != get_session().id:
raise KeyError()
if token.context.get('url') == '':
autocomplete_context = get_session().get_autocomplete_token(component)
if autocomplete_context.data.get('url') == '':
# this is a datasource without a json url
# (typically a Python source)
raise KeyError()
@ -1186,7 +1182,7 @@ class AutocompleteDirectory(Directory):
raise AccessForbiddenError()
get_response().set_content_type('application/json')
info = token.context
info = autocomplete_context.data
if 'url' in info:
named_data_source = None
@ -1237,14 +1233,10 @@ class GeoJsonDirectory(Directory):
data_source = get_data_source_object({'type': component}, ignore_errors=False)
except KeyError:
try:
token = get_publisher().token_class.get(component)
if token.type != 'autocomplete':
raise KeyError()
if token.context['session_id'] != get_session().id:
raise KeyError()
autocomplete_context = get_session().get_autocomplete_token(component)
except KeyError:
raise TraversalError()
info = token.context
info = autocomplete_context.data
try:
data_source = get_data_source_object({'type': info['slug']}, ignore_errors=False)
except KeyError:

View File

@ -880,9 +880,7 @@ class NamedDataSource(XmlStorableObject):
if token_context:
token_context['session_id'] = get_session().id
token, dummy = get_publisher().token_class.get_or_create(
type='autocomplete', context=token_context
)
token = get_session().create_autocomplete_token(token_context)
return '/api/autocomplete/%s' % token.id
return None
@ -893,9 +891,7 @@ class NamedDataSource(XmlStorableObject):
new_url = self.get_variadic_url()
if new_url != url:
token_context = {'session_id': get_session().id, 'url': new_url, 'slug': self.slug}
token, dummy = get_publisher().token_class.get_or_create(
type='autocomplete', context=token_context
)
token = get_session().create_autocomplete_token(token_context)
return '/api/geojson/%s' % token.id
return '/api/geojson/%s' % self.slug

View File

@ -16,6 +16,7 @@
import copy
import datetime
import hashlib
import io
import itertools
import json
@ -3653,6 +3654,21 @@ class Session(SqlMixin, wcs.sessions.BasicSession):
def remove_form_token(self, token):
TransientData.remove_object(token)
def create_autocomplete_token(self, autocomplete_context):
token_id = hashlib.sha1(repr(autocomplete_context).encode()).hexdigest()
try:
token = self.get_autocomplete_token(token_id)
except KeyError:
token = TransientData(id=token_id, session_id=self.id, data=autocomplete_context)
token.store()
return token
def get_autocomplete_token(self, token_id):
tokens = TransientData.select([Equal('id', token_id), Equal('session_id', self.id)])
if not tokens:
raise KeyError(token_id)
return tokens[0]
class TrackingCode(SqlMixin, wcs.tracking_code.TrackingCode):
_table_name = 'tracking_codes'