232 lines
7.0 KiB
Python
232 lines
7.0 KiB
Python
import json
|
|
|
|
import pytest
|
|
|
|
from wcs import fields
|
|
from wcs.formdef import FormDef
|
|
from wcs.qommon.http_request import HTTPRequest
|
|
from wcs.qommon.template import Template
|
|
from wcs.wscalls import NamedWsCall
|
|
|
|
from .utilities import clean_temporary_pub, create_temporary_pub, get_app
|
|
|
|
|
|
@pytest.fixture
|
|
def pub():
|
|
pub = create_temporary_pub()
|
|
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
|
|
pub.set_app_dir(req)
|
|
pub._set_request(req)
|
|
pub.load_site_options()
|
|
return pub
|
|
|
|
|
|
def teardown_module(module):
|
|
clean_temporary_pub()
|
|
|
|
|
|
def test_named_wscall(pub):
|
|
# create object
|
|
NamedWsCall.wipe()
|
|
wscall = NamedWsCall()
|
|
wscall.name = 'Hello'
|
|
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': 'b'}}
|
|
wscall.store()
|
|
assert wscall.slug == 'hello'
|
|
|
|
# get object
|
|
wscall = NamedWsCall.get('hello')
|
|
assert wscall.name == 'Hello'
|
|
assert wscall.request.get('url') == 'http://example.net'
|
|
assert wscall.request.get('qs_data') == {'a': 'b'}
|
|
|
|
# create with same name, should get a different slug
|
|
wscall = NamedWsCall()
|
|
wscall.name = 'Hello'
|
|
wscall.request = {'url': 'http://example.net'}
|
|
wscall.store()
|
|
assert wscall.slug == 'hello_1'
|
|
|
|
# change slug, shoulg get a new id
|
|
wscall.slug = 'bye'
|
|
wscall.store()
|
|
assert 'bye' in NamedWsCall.keys()
|
|
assert 'hello_1' not in NamedWsCall.keys()
|
|
|
|
|
|
def test_webservice_substitution_variable(http_requests, pub):
|
|
NamedWsCall.wipe()
|
|
|
|
wscall = NamedWsCall()
|
|
wscall.name = 'Hello world'
|
|
wscall.request = {'url': 'http://remote.example.net/json'}
|
|
wscall.store()
|
|
assert wscall.slug == 'hello_world'
|
|
|
|
variables = pub.substitutions.get_context_variables()
|
|
assert variables['webservice'].hello_world == {'foo': 'bar'}
|
|
|
|
|
|
def test_webservice_auto_sign(http_requests, pub):
|
|
NamedWsCall.wipe()
|
|
|
|
wscall = NamedWsCall()
|
|
wscall.name = 'Hello world'
|
|
wscall.request = {'url': 'http://blah.example.net'}
|
|
try:
|
|
wscall.call()
|
|
except Exception:
|
|
pass
|
|
assert 'signature=' not in http_requests.get_last('url')
|
|
|
|
wscall.request = {'url': 'http://idp.example.net'}
|
|
try:
|
|
wscall.call()
|
|
except Exception:
|
|
pass
|
|
assert 'orig=example.net' in http_requests.get_last('url')
|
|
assert 'signature=' in http_requests.get_last('url')
|
|
|
|
# erroneous space
|
|
wscall.request = {'url': ' http://idp.example.net'}
|
|
try:
|
|
wscall.call()
|
|
except Exception:
|
|
pass
|
|
assert 'orig=example.net' in http_requests.get_last('url')
|
|
assert 'signature=' in http_requests.get_last('url')
|
|
|
|
wscall.request['request_signature_key'] = 'blah'
|
|
try:
|
|
wscall.call()
|
|
except Exception:
|
|
pass
|
|
assert 'orig=example.net' not in http_requests.get_last('url')
|
|
assert 'signature=' in http_requests.get_last('url')
|
|
|
|
|
|
def test_webservice_post_with_no_payload(http_requests, pub):
|
|
NamedWsCall.wipe()
|
|
|
|
wscall = NamedWsCall()
|
|
wscall.name = 'Hello world'
|
|
wscall.request = {'method': 'POST', 'url': 'http://remote.example.net/json'}
|
|
wscall.call()
|
|
assert http_requests.get_last('body') is None
|
|
|
|
|
|
def test_wscall_ezt(http_requests, pub):
|
|
NamedWsCall.wipe()
|
|
|
|
wscall = NamedWsCall()
|
|
wscall.name = 'Hello world'
|
|
wscall.request = {'url': 'http://remote.example.net/json'}
|
|
wscall.store()
|
|
assert wscall.slug == 'hello_world'
|
|
|
|
variables = pub.substitutions.get_context_variables()
|
|
|
|
template = Template('<p>{{ webservice.hello_world.foo }}</p>')
|
|
assert template.render(variables) == '<p>bar</p>'
|
|
|
|
template = Template('<p>[webservice.hello_world.foo]</p>')
|
|
assert template.render(variables) == '<p>bar</p>'
|
|
|
|
# undefined webservice
|
|
template = Template('<p>{{ webservice.hello.foo }}</p>')
|
|
assert template.render(variables) == '<p></p>'
|
|
template = Template('<p>[webservice.hello.foo]</p>')
|
|
assert template.render(variables) == '<p>[webservice.hello.foo]</p>'
|
|
|
|
|
|
def test_webservice_post_put_patch(http_requests, pub):
|
|
NamedWsCall.wipe()
|
|
|
|
for method in ('POST', 'PUT', 'PATCH'):
|
|
wscall = NamedWsCall()
|
|
wscall.name = 'Hello world'
|
|
wscall.request = {
|
|
'method': method,
|
|
'post_data': {'toto': 'coin'},
|
|
'url': 'http://remote.example.net/json',
|
|
}
|
|
try:
|
|
wscall.call()
|
|
except Exception:
|
|
pass
|
|
assert http_requests.get_last('url') == wscall.request['url']
|
|
assert http_requests.get_last('method') == wscall.request['method']
|
|
assert json.loads(http_requests.get_last('body')) == wscall.request['post_data']
|
|
|
|
|
|
def test_webservice_delete(http_requests, pub):
|
|
NamedWsCall.wipe()
|
|
|
|
wscall = NamedWsCall()
|
|
wscall.name = 'Hello world'
|
|
wscall.request = {
|
|
'method': 'DELETE',
|
|
'post_data': {'toto': 'coin'},
|
|
'url': 'http://remote.example.net/json',
|
|
}
|
|
try:
|
|
wscall.call()
|
|
except Exception:
|
|
pass
|
|
assert http_requests.get_last('url') == wscall.request['url']
|
|
assert http_requests.get_last('method') == 'DELETE'
|
|
|
|
|
|
@pytest.mark.parametrize('notify_on_errors', [True, False])
|
|
@pytest.mark.parametrize('record_on_errors', [True, False])
|
|
def test_webservice_on_error_with_sql(http_requests, emails, notify_on_errors, record_on_errors):
|
|
pub = create_temporary_pub(sql_mode=True)
|
|
pub.cfg['debug'] = {'error_email': 'errors@localhost.invalid'}
|
|
pub.write_cfg()
|
|
|
|
NamedWsCall.wipe()
|
|
pub.loggederror_class.wipe()
|
|
FormDef.wipe()
|
|
|
|
wscall = NamedWsCall()
|
|
wscall.name = 'Hello world'
|
|
wscall.notify_on_errors = notify_on_errors
|
|
wscall.record_on_errors = record_on_errors
|
|
wscall.store()
|
|
assert wscall.slug == 'hello_world'
|
|
|
|
formdef = FormDef()
|
|
formdef.name = 'foobar'
|
|
formdef.fields = [
|
|
fields.CommentField(id='0', label='Foo Bar {{ webservice.hello_world }}', type='comment'),
|
|
]
|
|
formdef.store()
|
|
|
|
for url_part in ['json', 'json-err0', 'json-errheader0']:
|
|
wscall.request = {'url': 'http://remote.example.net/%s' % url_part}
|
|
wscall.store()
|
|
resp = get_app(pub).get('/foobar/')
|
|
assert 'Foo Bar ' in resp.text
|
|
assert emails.count() == 0
|
|
assert pub.loggederror_class.count() == 0
|
|
|
|
for url_part in ['400', '400-json', '404', '404-json', '500', 'json-err1', 'json-errheader1']:
|
|
status_code = 200
|
|
if not url_part.startswith('json'):
|
|
status_code = url_part[:3]
|
|
wscall.request = {'url': 'http://remote.example.net/%s' % url_part}
|
|
wscall.store()
|
|
resp = get_app(pub).get('/foobar/')
|
|
assert 'Foo Bar ' in resp.text
|
|
if notify_on_errors:
|
|
assert emails.count() == 1
|
|
assert "[ERROR] [WSCALL] Exception: %s whatever" % status_code in emails.emails
|
|
emails.empty()
|
|
else:
|
|
assert emails.count() == 0
|
|
if record_on_errors:
|
|
assert pub.loggederror_class.count() == 1
|
|
pub.loggederror_class.wipe()
|
|
else:
|
|
assert pub.loggederror_class.count() == 0
|