wscalls: add missing context manager to get complex query string (#87925) #1239
|
@ -7,7 +7,8 @@ import pytest
|
|||
import responses
|
||||
from quixote import cleanup, get_publisher
|
||||
|
||||
from wcs.fields import BoolField, FileField, ItemField, ItemsField, StringField
|
||||
from wcs.blocks import BlockDef
|
||||
from wcs.fields import BlockField, BoolField, FileField, ItemField, ItemsField, StringField
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.qommon.http_request import HTTPRequest
|
||||
from wcs.wf.wscall import JournalWsCallErrorPart, WebserviceCallStatusItem
|
||||
|
@ -1065,6 +1066,12 @@ def test_webservice_with_complex_data_in_query_string(http_requests, pub):
|
|||
),
|
||||
}
|
||||
|
||||
BlockDef.wipe()
|
||||
block = BlockDef()
|
||||
block.name = 'foobar'
|
||||
block.fields = [StringField(id='1', label='String', varname='string')]
|
||||
block.store()
|
||||
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'baz'
|
||||
|
@ -1075,6 +1082,7 @@ def test_webservice_with_complex_data_in_query_string(http_requests, pub):
|
|||
StringField(id='4', label='4th field', varname='empty_str'),
|
||||
StringField(id='5', label='5th field', varname='none'),
|
||||
BoolField(id='6', label='6th field', varname='bool'),
|
||||
BlockField(id='7', label='7th field', varname='block', block_slug=block.slug, max_items=3),
|
||||
]
|
||||
formdef.workflow_id = wf.id
|
||||
formdef.store()
|
||||
|
@ -1091,6 +1099,17 @@ def test_webservice_with_complex_data_in_query_string(http_requests, pub):
|
|||
formdata.data['4'] = 'empty_str'
|
||||
formdata.data['5'] = None
|
||||
formdata.data['6'] = False
|
||||
formdata.data['7'] = {
|
||||
'data': [
|
||||
{
|
||||
'1': 'plop',
|
||||
},
|
||||
{
|
||||
'1': 'poulpe',
|
||||
},
|
||||
],
|
||||
'schema': {},
|
||||
}
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
|
@ -1115,11 +1134,12 @@ def test_webservice_with_complex_data_in_query_string(http_requests, pub):
|
|||
'none': '{{ form_var_none }}',
|
||||
'bool': '{{ form_var_bool_raw }}',
|
||||
'time': '{{ "13:12"|time }}',
|
||||
'block_template': '{% for b in form_var_block %}{{ b.string }}{% endfor %}',
|
||||
}
|
||||
pub.substitutions.feed(formdata)
|
||||
with get_publisher().complex_data():
|
||||
item.perform(formdata)
|
||||
item.perform(formdata)
|
||||
assert sorted(urllib.parse.parse_qsl(urllib.parse.urlparse(http_requests.get_last('url')).query)) == [
|
||||
('block_template', 'ploppoulpe'),
|
||||
('bool', 'False'),
|
||||
('decimal', '1E+3'),
|
||||
('decimal2', '1000.1'),
|
||||
|
|
|
@ -3314,6 +3314,10 @@ class WorkflowStatusItem(XmlSerialisable):
|
|||
if expression['type'] == 'template':
|
||||
old_allow_complex_value = vars.get('allow_complex')
|
||||
vars['allow_complex'] = allow_complex
|
||||
# make sure complex data context manager is used
|
||||
assert (
|
||||
not vars['allow_complex'] or get_publisher().complex_data_cache is not None
|
||||
), 'missing complex_data context manager'
|
||||
try:
|
||||
return Template(expression['value'], raises=raises, autoescape=False).render(vars)
|
||||
except TemplateError as e:
|
||||
|
|
|
@ -114,18 +114,19 @@ def call_webservice(
|
|||
if qs_data: # merge qs_data into url
|
||||
qs = list(urllib.parse.parse_qsl(parsed.query))
|
||||
for key, value in qs_data.items():
|
||||
try:
|
||||
value = WorkflowStatusItem.compute(value, allow_complex=True, raises=True)
|
||||
except Exception as e:
|
||||
get_publisher().record_error(exception=e, notify=True)
|
||||
else:
|
||||
if value:
|
||||
value = get_publisher().get_cached_complex_data(value)
|
||||
if isinstance(value, (tuple, list, set)):
|
||||
qs.extend((key, x) for x in value)
|
||||
with get_publisher().complex_data():
|
||||
try:
|
||||
value = WorkflowStatusItem.compute(value, allow_complex=True, raises=True)
|
||||
except Exception as e:
|
||||
get_publisher().record_error(exception=e, notify=True)
|
||||
else:
|
||||
value = str(value) if value is not None else ''
|
||||
qs.append((key, value))
|
||||
if value:
|
||||
value = get_publisher().get_cached_complex_data(value)
|
||||
if isinstance(value, (tuple, list, set)):
|
||||
qs.extend((key, x) for x in value)
|
||||
else:
|
||||
value = str(value) if value is not None else ''
|
||||
qs.append((key, value))
|
||||
qs = urllib.parse.urlencode(qs)
|
||||
url = urllib.parse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
|
||||
|
||||
|
|
Loading…
Reference in New Issue