misc: use transient data objects for autocomplete contexts (#76943) #277
|
@ -29,6 +29,7 @@ from wcs.qommon.misc import ConnectionError
|
|||
from wcs.qommon.substitution import CompatibilityNamesDict
|
||||
from wcs.qommon.template import Template
|
||||
from wcs.roles import logged_users_role
|
||||
from wcs.sql import TransientData
|
||||
from wcs.tracking_code import TrackingCode
|
||||
from wcs.wf.create_formdata import JournalAssignationErrorPart, Mapping
|
||||
from wcs.wf.form import WorkflowFormFieldsFormDef
|
||||
|
@ -5331,7 +5332,7 @@ def test_form_string_field_autocomplete_named_datasource(pub):
|
|||
def test_form_autocomplete_named_datasource_expired_token(pub):
|
||||
CardDef.wipe()
|
||||
FormDef.wipe()
|
||||
pub.token_class.wipe()
|
||||
TransientData.wipe()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test'
|
||||
|
@ -5353,13 +5354,11 @@ def test_form_autocomplete_named_datasource_expired_token(pub):
|
|||
data_source.store()
|
||||
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert pub.token_class.count() == 1
|
||||
token = pub.token_class.select()[0]
|
||||
assert TransientData.count() == 1
|
||||
token = TransientData.select()[0]
|
||||
assert '/api/autocomplete/%s' % token.id in resp.text
|
||||
token.set_expiration_delay(-1) # expired
|
||||
token.store()
|
||||
|
||||
# check a new token is generated
|
||||
# new session, check a new token is generated
|
||||
resp = get_app(pub).get('/test/')
|
||||
assert '/api/autocomplete/%s' % token.id not in resp.text
|
||||
|
||||
|
@ -5368,7 +5367,7 @@ def test_form_autocomplete_named_datasource_expired_token(pub):
|
|||
def test_form_autocomplete_named_datasource_cache_duration(pub, sign):
|
||||
CardDef.wipe()
|
||||
FormDef.wipe()
|
||||
pub.token_class.wipe()
|
||||
TransientData.wipe()
|
||||
|
||||
formdef = FormDef()
|
||||
formdef.name = 'test'
|
||||
|
@ -5400,8 +5399,8 @@ remote.example.net = 1234
|
|||
|
||||
app = get_app(pub)
|
||||
resp = app.get('/test/')
|
||||
assert pub.token_class.count() == 1
|
||||
token = pub.token_class.select()[0]
|
||||
assert TransientData.count() == 1
|
||||
token = TransientData.select()[0]
|
||||
assert '/api/autocomplete/%s' % token.id in resp.text
|
||||
|
||||
with responses.RequestsMock() as rsps:
|
||||
|
|
18
wcs/api.py
18
wcs/api.py
|
@ -1173,12 +1173,8 @@ class AutocompleteDirectory(Directory):
|
|||
def _q_lookup(self, component):
|
||||
get_request().ignore_session = True
|
||||
try:
|
||||
token = get_publisher().token_class.get(component)
|
||||
if token.type != 'autocomplete':
|
||||
raise KeyError()
|
||||
if token.context['session_id'] != get_session().id:
|
||||
raise KeyError()
|
||||
if token.context.get('url') == '':
|
||||
autocomplete_context = get_session().get_autocomplete_token(component)
|
||||
if autocomplete_context.data.get('url') == '':
|
||||
# this is a datasource without a json url
|
||||
# (typically a Python source)
|
||||
raise KeyError()
|
||||
|
@ -1186,7 +1182,7 @@ class AutocompleteDirectory(Directory):
|
|||
raise AccessForbiddenError()
|
||||
get_response().set_content_type('application/json')
|
||||
|
||||
info = token.context
|
||||
info = autocomplete_context.data
|
||||
|
||||
if 'url' in info:
|
||||
named_data_source = None
|
||||
|
@ -1237,14 +1233,10 @@ class GeoJsonDirectory(Directory):
|
|||
data_source = get_data_source_object({'type': component}, ignore_errors=False)
|
||||
except KeyError:
|
||||
try:
|
||||
token = get_publisher().token_class.get(component)
|
||||
if token.type != 'autocomplete':
|
||||
raise KeyError()
|
||||
if token.context['session_id'] != get_session().id:
|
||||
raise KeyError()
|
||||
autocomplete_context = get_session().get_autocomplete_token(component)
|
||||
except KeyError:
|
||||
raise TraversalError()
|
||||
info = token.context
|
||||
info = autocomplete_context.data
|
||||
try:
|
||||
data_source = get_data_source_object({'type': info['slug']}, ignore_errors=False)
|
||||
except KeyError:
|
||||
|
|
|
@ -880,9 +880,7 @@ class NamedDataSource(XmlStorableObject):
|
|||
|
||||
if token_context:
|
||||
token_context['session_id'] = get_session().id
|
||||
token, dummy = get_publisher().token_class.get_or_create(
|
||||
type='autocomplete', context=token_context
|
||||
)
|
||||
token = get_session().create_autocomplete_token(token_context)
|
||||
return '/api/autocomplete/%s' % token.id
|
||||
|
||||
return None
|
||||
|
@ -893,9 +891,7 @@ class NamedDataSource(XmlStorableObject):
|
|||
new_url = self.get_variadic_url()
|
||||
if new_url != url:
|
||||
token_context = {'session_id': get_session().id, 'url': new_url, 'slug': self.slug}
|
||||
token, dummy = get_publisher().token_class.get_or_create(
|
||||
type='autocomplete', context=token_context
|
||||
)
|
||||
token = get_session().create_autocomplete_token(token_context)
|
||||
return '/api/geojson/%s' % token.id
|
||||
return '/api/geojson/%s' % self.slug
|
||||
|
||||
|
|
16
wcs/sql.py
16
wcs/sql.py
|
@ -16,6 +16,7 @@
|
|||
|
||||
import copy
|
||||
import datetime
|
||||
import hashlib
|
||||
import io
|
||||
import itertools
|
||||
import json
|
||||
|
@ -3653,6 +3654,21 @@ class Session(SqlMixin, wcs.sessions.BasicSession):
|
|||
def remove_form_token(self, token):
|
||||
TransientData.remove_object(token)
|
||||
|
||||
def create_autocomplete_token(self, autocomplete_context):
|
||||
token_id = hashlib.sha1(repr(autocomplete_context).encode()).hexdigest()
|
||||
try:
|
||||
token = self.get_autocomplete_token(token_id)
|
||||
except KeyError:
|
||||
token = TransientData(id=token_id, session_id=self.id, data=autocomplete_context)
|
||||
token.store()
|
||||
return token
|
||||
|
||||
def get_autocomplete_token(self, token_id):
|
||||
tokens = TransientData.select([Equal('id', token_id), Equal('session_id', self.id)])
|
||||
if not tokens:
|
||||
raise KeyError(token_id)
|
||||
return tokens[0]
|
||||
|
||||
|
||||
class TrackingCode(SqlMixin, wcs.tracking_code.TrackingCode):
|
||||
_table_name = 'tracking_codes'
|
||||
|
|
Loading…
Reference in New Issue