general: add new catalog of webservice calls, usable in expressions (#11376)

This commit is contained in:
Frédéric Péters 2016-06-17 12:26:47 +02:00
parent cf83f44d51
commit 2dce89362e
10 changed files with 692 additions and 83 deletions

View File

@ -26,6 +26,7 @@ from wcs.qommon.http_request import HTTPRequest
from wcs.qommon.template import get_current_theme
from wcs.categories import Category
from wcs.data_sources import NamedDataSource
from wcs.wscalls import NamedWsCall
from wcs.roles import Role
from wcs.workflows import Workflow, DisplayMessageWorkflowStatusItem, WorkflowCriticalityLevel
from wcs.wf.wscall import WebserviceCallStatusItem
@ -3027,6 +3028,95 @@ def test_data_sources_edit_slug(pub):
resp = resp.forms[0].submit('submit')
assert resp.location == 'http://example.net/backoffice/settings/data-sources/1/'
def test_wscalls_new(pub):
create_superuser(pub)
NamedWsCall.wipe()
app = login(get_app(pub))
# go to the page and cancel
resp = app.get('/backoffice/settings/wscalls/')
resp = resp.click('New webservice call')
resp = resp.forms[0].submit('cancel')
assert resp.location == 'http://example.net/backoffice/settings/wscalls/'
# go to the page and add a webservice call
resp = app.get('/backoffice/settings/wscalls/')
resp = resp.click('New webservice call')
resp.form['name'] = 'a new webservice call'
resp.form['description'] = 'description'
resp.form['request$url'] = 'http://remote.example.net/json'
resp = resp.form.submit('submit')
assert resp.location == 'http://example.net/backoffice/settings/wscalls/'
resp = resp.follow()
assert 'a new webservice call' in resp.body
resp = resp.click('a new webservice call')
assert 'Webservice Call - a new webservice call' in resp.body
resp = resp.click('Edit')
assert 'Edit webservice call' in resp.body
assert NamedWsCall.get('a_new_webservice_call').name == 'a new webservice call'
def test_wscalls_view(pub):
create_superuser(pub)
NamedWsCall.wipe()
wscall = NamedWsCall(name='xxx')
wscall.description = 'description'
wscall.request = {
'url': 'http://remote.example.net/json',
'request_signature_key': 'xxx',
'qs_data': {'a': 'b'},
'method': 'POST',
'post_data': {'c': 'd'},
}
wscall.store()
app = login(get_app(pub))
resp = app.get('/backoffice/settings/wscalls/%s/' % wscall.id)
assert 'http://remote.example.net/json' in resp.body
def test_wscalls_edit(pub):
test_wscalls_view(pub)
app = login(get_app(pub))
resp = app.get('/backoffice/settings/wscalls/xxx/')
resp = resp.click(href='edit')
assert resp.form['name'].value == 'xxx'
assert 'slug' in resp.form.fields
resp.form['description'] = 'bla bla bla'
resp = resp.form.submit('submit')
assert resp.location == 'http://example.net/backoffice/settings/wscalls/xxx/'
resp = resp.follow()
assert NamedWsCall.get('xxx').description == 'bla bla bla'
resp = app.get('/backoffice/settings/wscalls/xxx/')
resp = resp.click(href='edit')
assert resp.form['name'].value == 'xxx'
assert 'slug' in resp.form.fields
resp.form['slug'] = 'yyy'
resp = resp.form.submit('submit')
assert resp.location == 'http://example.net/backoffice/settings/wscalls/yyy/'
def test_wscalls_delete(pub):
test_wscalls_view(pub)
assert NamedWsCall.count() == 1
app = login(get_app(pub))
resp = app.get('/backoffice/settings/wscalls/xxx/')
resp = resp.click(href='delete')
resp = resp.form.submit('cancel')
assert resp.location == 'http://example.net/backoffice/settings/wscalls/'
resp = app.get('/backoffice/settings/wscalls/xxx/')
resp = resp.click(href='delete')
resp = resp.form.submit('submit')
assert resp.location == 'http://example.net/backoffice/settings/wscalls/'
assert NamedWsCall.count() == 0
def test_settings_permissions(pub):
create_superuser(pub)
role1 = create_role()

62
tests/test_wscall.py Normal file
View File

@ -0,0 +1,62 @@
import json
import mock
import pytest
from wcs.qommon.http_request import HTTPRequest
from wcs.wscalls import NamedWsCall
from utilities import create_temporary_pub, clean_temporary_pub, http_requests
@pytest.fixture
def pub():
pub = create_temporary_pub()
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub._set_request(req)
pub.load_site_options()
return pub
def teardown_module(module):
clean_temporary_pub()
def test_named_wscall(pub):
# create object
wscall = NamedWsCall()
wscall.name = 'Hello'
wscall.request = {'url': 'http://example.net', 'qs_data': {'a': 'b'}}
wscall.store()
assert wscall.slug == 'hello'
# get object
wscall = NamedWsCall.get('hello')
assert wscall.name == 'Hello'
assert wscall.request.get('url') == 'http://example.net'
assert wscall.request.get('qs_data') == {'a': 'b'}
# create with same name, should get a different slug
wscall = NamedWsCall()
wscall.name = 'Hello'
wscall.request = {'url': 'http://example.net'}
wscall.store()
assert wscall.slug == 'hello_1'
# change slug, shoulg get a new id
wscall.slug = 'bye'
wscall.store()
assert 'bye' in NamedWsCall.keys()
assert not 'hello_1' in NamedWsCall.keys()
def test_webservice_substitution_variable(pub):
NamedWsCall.wipe()
wscall = NamedWsCall()
wscall.name = 'Hello world'
wscall.request = {'url': 'http://remote.example.net/json'}
wscall.store()
assert wscall.slug == 'hello_world'
pub.substitutions.feed(NamedWsCall)
variables = pub.substitutions.get_context_variables()
assert variables['webservice'].hello_world == {'foo': 'bar'}

View File

@ -53,7 +53,7 @@ from fields import FieldDefPage, FieldsDirectory
from wcs.roles import Role
from .data_sources import NamedDataSourcesDirectory
from .wscalls import NamedWsCallsDirectory
class UserFormDirectory(Directory):
_q_exports = ['']
@ -387,7 +387,7 @@ class SettingsDirectory(QommonSettingsDirectory):
'session', 'download_theme', 'smstest', 'postgresql',
('admin-permissions', 'admin_permissions'),
'theme_preview', 'filetypes',
('data-sources', 'data_sources')]
('data-sources', 'data_sources'), 'wscalls']
emails = EmailsDirectory()
identification = IdentificationDirectory()
@ -396,6 +396,7 @@ class SettingsDirectory(QommonSettingsDirectory):
theme_preview = ThemePreviewDirectory()
filetypes = FileTypesDirectory()
data_sources = NamedDataSourcesDirectory()
wscalls = NamedWsCallsDirectory()
def _q_index(self):
html_top('settings', title = _('Settings'))
@ -498,6 +499,8 @@ class SettingsDirectory(QommonSettingsDirectory):
_('File Types'), _('Configure known file types'))
r += htmltext('<dt><a href="data-sources/">%s</a></dt> <dd>%s</dd>') % (
_('Data sources'), _('Configure data sources'))
r += htmltext('<dt><a href="wscalls/">%s</a></dt> <dd>%s</dd>') % (
_('Webservice calls'), _('Configure webservice calls'))
r += htmltext('</dl>')
r += htmltext('</div>')

241
wcs/admin/wscalls.py Normal file
View File

@ -0,0 +1,241 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2016 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from quixote import redirect
from quixote.directory import Directory
from quixote.html import TemplateIO, htmltext
from qommon import errors
from qommon.form import *
from qommon.backoffice.menu import html_top
from wcs.wscalls import NamedWsCall, WsCallRequestWidget
class NamedWsCallUI(object):
def __init__(self, wscall):
self.wscall = wscall
if self.wscall is None:
self.wscall = NamedWsCall()
def get_form(self):
form = Form(enctype='multipart/form-data',
advanced_label=_('Additional options'))
form.add(StringWidget, 'name', title=_('Name'), required=True, size=30,
value=self.wscall.name)
form.add(TextWidget, 'description', title=_('Description'),
cols=40, rows=5,
value=self.wscall.description)
if self.wscall.slug:
form.add(StringWidget, 'slug',
value=self.wscall.slug,
title=_('Identifier'),
hint=_('Beware it is risky to change it'),
required=True, advanced=True,
)
form.add(WsCallRequestWidget, 'request',
value=self.wscall.request,
title=_('Request'), required=True)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
return form
def submit_form(self, form):
name = form.get_widget('name').parse()
if self.wscall.slug:
slug = form.get_widget('slug').parse()
else:
slug = None
for wscall in NamedWsCall.select():
if wscall.id == self.wscall.id:
continue
if name == wscall.name:
form.get_widget('name').set_error(_('This name is already used.'))
if slug == wscall.slug:
form.get_widget('slug').set_error(_('This value is already used.'))
if form.has_errors():
raise ValueError()
self.wscall.name = name
self.wscall.description = form.get_widget('description').parse()
self.wscall.request = form.get_widget('request').parse()
if self.wscall.slug:
self.wscall.slug = slug
self.wscall.store()
class NamedWsCallPage(Directory):
_q_exports = ['', 'edit', 'delete']
def __init__(self, component):
try:
self.wscall = NamedWsCall.get(component)
except KeyError:
raise errors.TraversalError()
self.wscall_ui = NamedWsCallUI(self.wscall)
get_response().breadcrumb.append((component + '/', self.wscall.name))
def _q_index(self):
html_top('wscalls', title=self.wscall.name)
r = TemplateIO(html=True)
get_response().filter['sidebar'] = self.get_sidebar()
r += htmltext('<h2>%s - ') % _('Webservice Call')
r += self.wscall.name
r += htmltext('</h2>')
if self.wscall.description:
r += htmltext('<div class="bo-block">')
r += self.wscall.description
r += htmltext('</div>')
if self.wscall.request:
r += htmltext('<div class="bo-block">')
r += htmltext('<h3>%s</h3>') % _('Parameters')
r += htmltext('<ul>')
if self.wscall.request.get('url'):
r += htmltext('<li>%s %s</li>') % (
_('URL:'),
self.wscall.request.get('url'))
if self.wscall.request.get('request_signature_key'):
r += htmltext('<li>%s %s</li>') % (
_('Request Signature Key:'),
self.wscall.request.get('request_signature_key'))
if self.wscall.request.get('qs_data'):
r += htmltext('<li>%s<ul>') % _('Query string data:')
for k, v in self.wscall.request.get('qs_data').items():
r += htmltext('<li>%s</li>') % _('%s: %s') % (k, v)
r += htmltext('</ul></li>')
r += htmltext('<li>%s %s</li>') % (
_('Method:'),
'POST' if self.wscall.request.get('method') == 'POST' else 'GET')
if self.wscall.request.get('post_data'):
r += htmltext('<li>%s<ul>') % _('Post data:')
for k, v in self.wscall.request.get('post_data').items():
r += htmltext('<li>%s</li>') % _('%s: %s') % (k, v)
r += htmltext('</ul></li>')
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def get_sidebar(self):
r = TemplateIO(html=True)
r += htmltext('<ul id="sidebar-actions">')
r += htmltext('<li><a href="edit">%s</a></li>') % _('Edit')
r += htmltext('<li><a href="delete" rel="popup">%s</a></li>') % _('Delete')
r += htmltext('</ul>')
return r.getvalue()
def edit(self):
form = self.wscall_ui.get_form()
if form.get_submit() == 'cancel':
return redirect('.')
if form.get_submit() == 'submit' and not form.has_errors():
try:
self.wscall_ui.submit_form(form)
except ValueError:
pass
else:
return redirect('../%s/' % self.wscall.id)
get_response().breadcrumb.append( ('edit', _('Edit')) )
html_top('wscalls', title = _('Edit webservice call'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Edit webservice call')
r += form.render()
return r.getvalue()
def delete(self):
form = Form(enctype='multipart/form-data')
form.widgets.append(HtmlWidget('<p>%s</p>' % _(
'You are about to irrevocably delete this data source.')))
form.add_submit('delete', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('..')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('delete', _('Delete')))
html_top('wscalls', title = _('Delete webservice call'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s %s</h2>') % (_('Deleting webservice call:'), self.wscall.name)
r += form.render()
return r.getvalue()
else:
self.wscall.remove_self()
return redirect('..')
class NamedWsCallsDirectory(Directory):
_q_exports = ['', 'new']
def _q_traverse(self, path):
get_response().breadcrumb.append( ('wscalls/', _('Webservice Calls')) )
return super(NamedWsCallsDirectory, self)._q_traverse(path)
def _q_index(self):
get_response().add_javascript(['jquery.js', 'jquery-ui.js', 'biglist.js',
'ckeditor/ckeditor.js', 'qommon.wysiwyg.js', 'ckeditor/adapters/jquery.js'])
get_response().filter['sidebar'] = self.get_sidebar()
html_top('wscalls', title=_('Webservice Calls'))
r = TemplateIO(html=True)
r += htmltext('<div class="bo-block">')
r += htmltext('<h2>%s</h2>') % _('Webservice Calls')
r += htmltext('<ul class="biglist" id="wscall-list">')
wscalls = NamedWsCall.select(order_by='name')
for wscall in wscalls:
r += htmltext('<li class="biglistitem" id="itemId_%s">') % wscall.id
r += htmltext('<strong class="label"><a href="%s/">%s (%s)</a></strong>') % (
wscall.id, wscall.name, wscall.slug)
r += htmltext('<p class="details">')
r += htmltext('</p></li>')
r += htmltext('</ul>')
r += htmltext('</div>')
return r.getvalue()
def get_sidebar(self):
r = TemplateIO(html=True)
r += htmltext("""<ul id="sidebar-actions">
<li><a class="new-item" href="new">%s</a></li>
</ul>""") % _('New webservice call')
return r.getvalue()
def new(self):
get_response().breadcrumb.append( ('new', _('New')) )
wscall_ui = NamedWsCallUI(None)
form = wscall_ui.get_form()
if form.get_widget('cancel').parse():
return redirect('.')
if form.get_submit() == 'submit' and not form.has_errors():
try:
wscall_ui.submit_form(form)
except ValueError:
pass
else:
return redirect('.')
html_top('wscalls', title = _('New webservice call'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('New webservice call')
r += form.render()
return r.getvalue()
def _q_lookup(self, component):
return NamedWsCallPage(component)

View File

@ -1341,6 +1341,23 @@ class WidgetDict(quixote.form.widget.WidgetDict):
if hint:
self.hint = hint
def render_content(self):
r = TemplateIO(html=True)
for name in self.element_names:
if name in ('add_element', 'added_elements'):
continue
key_widget = self.get_widget(name + 'key')
value_widget = self.get_widget(name + 'value')
r += htmltext('<div class="dict-key">%s</div>'
'<div class="dict-separator">: </div>'
'<div class="dict-value">%s</div>') % (
key_widget.render(),
value_widget.render())
r += htmltext('\n')
r += self.get_widget('add_element').render()
r += self.get_widget('added_elements').render()
return r.getvalue()
class TagsWidget(StringWidget):
def __init__(self, name, value = None, known_tags = None, **kwargs):
StringWidget.__init__(self, name, value, **kwargs)

View File

@ -1168,19 +1168,24 @@ a#filter-settings {
color: white;
}
div.WidgetDict div.content div.StringWidget {
width: 25%;
div.WsCallRequestWidget br { display: none; }
div.WidgetDict div.content div.dict-key { width: 20%; }
div.WidgetDict div.content div.dict-value { width: 70%; }
div.WidgetDict div.content div.dict-key div,
div.WidgetDict div.content div.dict-value div,
div.WidgetDict input
{
width: 100%;
}
div.WidgetDict div.content div + div.ComputedExpressionWidget,
div.WidgetDict div.content div + div.StringWidget {
width: 70%;
padding-left: 1em;
div.WidgetDict div.content div.widget {
margin-bottom: 0.5ex;
}
div.WidgetDict div.content div.content,
div.WidgetDict div.content div.content input {
width: calc(100% - 1em);
div.WidgetDict div.dict-separator {
padding: 0 1ex;
}
div.SetBackofficeFieldsTableWidget table {

View File

@ -48,6 +48,7 @@ from qommon.afterjobs import AfterJobStatusDirectory
from categories import Category
from data_sources import NamedDataSource
from wscalls import NamedWsCall
from wcs.api import ApiDirectory
from myspace import MyspaceDirectory
from forms.preview import PreviewDirectory
@ -281,17 +282,21 @@ class RootDirectory(Directory):
get_response().set_content_type('text/plain')
return json.dumps(results)
def feed_substitution_parts(self):
get_publisher().substitutions.feed(get_session())
get_publisher().substitutions.feed(get_request().user)
get_publisher().substitutions.feed(NamedDataSource)
get_publisher().substitutions.feed(NamedWsCall)
def _q_traverse(self, path):
self.feed_substitution_parts()
response = get_response()
if not hasattr(response, 'filter'):
response.filter = {}
if not hasattr(response, 'breadcrumb'):
response.breadcrumb = [ ('', _('Home')) ]
get_publisher().substitutions.feed(get_session())
get_publisher().substitutions.feed(get_request().user)
get_publisher().substitutions.feed(NamedDataSource)
if not self.admin:
self.admin = get_publisher().admin_directory_class()

View File

@ -22,20 +22,15 @@ import xml.etree.ElementTree as ET
import collections
import mimetypes
from StringIO import StringIO
import urllib
import urlparse
from quixote.html import TemplateIO, htmltext
from qommon.errors import ConnectionError
from qommon.form import *
from qommon.misc import (http_get_page, http_post_request, get_variadic_url,
JSONEncoder, json_loads, site_encode)
from qommon.misc import json_loads, site_encode
from wcs.workflows import (WorkflowStatusItem, register_item_class,
AbortActionException, AttachmentEvolutionPart)
from wcs.api_utils import sign_url
TIMEOUT = 30
from wcs.wscalls import call_webservice
class JournalWsCallErrorPart: #pylint: disable=C1001
content = None
@ -239,68 +234,16 @@ class WebserviceCallStatusItem(WorkflowStatusItem):
if not self.url:
# misconfigured action
return
url = self.url
if '[' in url:
variables = get_publisher().substitutions.get_context_variables()
url = get_variadic_url(url, variables)
if self.qs_data: # merge qs_data into url
publisher = get_publisher()
parsed = urlparse.urlparse(url)
qs = list(urlparse.parse_qsl(parsed.query))
for key, value in self.qs_data.iteritems():
try:
value = self.compute(value, raises=True)
value = str(value)
except:
get_publisher().notify_of_exception(sys.exc_info())
else:
key = publisher.sitecharset2utf8(key)
value = publisher.sitecharset2utf8(value)
qs.append((key, value))
qs = urllib.urlencode(qs)
url = urlparse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
if self.request_signature_key:
signature_key = self.compute(self.request_signature_key)
if signature_key:
url = sign_url(url, signature_key)
headers = {'Content-type': 'application/json',
'Accept': 'application/json'}
post_data = None # payload
# if self.post_data exists, post_data is a dict built from it
if self.method == 'POST' and self.post_data:
post_data = {}
for (key, value) in self.post_data.items():
try:
post_data[key] = self.compute(value, raises=True)
except:
get_publisher().notify_of_exception(sys.exc_info())
# if formdata has to be sent, it's the payload. If post_data exists,
# it's added in formdata['extra']
if self.method == 'POST' and self.post:
formdata_dict = formdata.get_json_export_dict()
if post_data is not None:
formdata_dict['extra'] = post_data
post_data = formdata_dict
try:
if self.method == 'POST':
if post_data:
post_data = json.dumps(post_data, cls=JSONEncoder,
encoding=get_publisher().site_charset)
# increase timeout for huge loads, one second every 65536
# bytes, to match a country 512kbps DSL line.
timeout = TIMEOUT
timeout += len(post_data) / 65536
response, status, data, auth_header = http_post_request(
url, post_data, headers=headers, timeout=timeout)
else:
response, status, data, auth_header = http_get_page(
url, headers=headers, timeout=TIMEOUT)
response, status, data = call_webservice(
url=self.url,
qs_data=self.qs_data,
request_signature_key=self.request_signature_key,
method=self.method,
post_data=self.post_data,
post_formdata=self.post,
formdata=formdata)
except ConnectionError as e:
status = 0
self.action_on_error(self.action_on_network_errors, formdata,

View File

@ -1546,7 +1546,8 @@ class WorkflowStatusItem(XmlSerialisable):
value = getattr(self, '%s_parse' % f)(value)
setattr(self, f, value)
def compute(self, var, do_ezt=True, raises=False):
@classmethod
def compute(cls, var, do_ezt=True, raises=False):
if not isinstance(var, basestring):
return var

242
wcs/wscalls.py Normal file
View File

@ -0,0 +1,242 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2016 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import collections
import json
import sys
import urllib
import urlparse
import xml.etree.ElementTree as ET
from quixote import get_publisher
from qommon.misc import (simplify, http_get_page, http_post_request,
get_variadic_url, JSONEncoder, json_loads)
from qommon.xml_storage import XmlStorableObject
from qommon.form import (CompositeWidget, StringWidget, WidgetDict,
ComputedExpressionWidget, RadiobuttonsWidget, CheckboxWidget)
from wcs.api_utils import sign_url
from wcs.workflows import WorkflowStatusItem
TIMEOUT = 30
def call_webservice(url, qs_data=None, request_signature_key=None,
method=None, post_data=None, post_formdata=None, formdata=None,
**kwargs):
if '[' in url:
variables = get_publisher().substitutions.get_context_variables()
url = get_variadic_url(url, variables)
if qs_data: # merge qs_data into url
publisher = get_publisher()
parsed = urlparse.urlparse(url)
qs = list(urlparse.parse_qsl(parsed.query))
for key, value in qs_data.iteritems():
try:
value = WorkflowStatusItem.compute(value, raises=True)
value = str(value)
except:
get_publisher().notify_of_exception(sys.exc_info())
else:
key = publisher.sitecharset2utf8(key)
value = publisher.sitecharset2utf8(value)
qs.append((key, value))
qs = urllib.urlencode(qs)
url = urlparse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
if request_signature_key:
signature_key = WorkflowStatusItem.compute(request_signature_key)
if signature_key:
url = sign_url(url, signature_key)
headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
payload = None
# if post_data exists, payload is a dict built from it
if method == 'POST' and post_data:
payload = {}
for (key, value) in post_data.items():
try:
payload[key] = WorkflowStatusItem.compute(value, raises=True)
except:
get_publisher().notify_of_exception(sys.exc_info())
# if formdata has to be sent, it's the payload. If post_data exists,
# it's added in formdata['extra']
if method == 'POST' and post_formdata:
if formdata:
formdata_dict = formdata.get_json_export_dict()
if payload is not None:
formdata_dict['extra'] = payload
payload = formdata_dict
if method == 'POST':
if payload:
payload = json.dumps(payload, cls=JSONEncoder,
encoding=get_publisher().site_charset)
# increase timeout for huge loads, one second every 65536
# bytes, to match a country 512kbps DSL line.
timeout = TIMEOUT
timeout += len(payload) / 65536
response, status, data, auth_header = http_post_request(
url, payload, headers=headers, timeout=timeout)
else:
response, status, data, auth_header = http_get_page(
url, headers=headers, timeout=TIMEOUT)
return (response, status, data)
class NamedWsCall(XmlStorableObject):
_names = 'wscalls'
_xml_tagname = 'wscalls'
name = None
slug = None
description = None
request = None
# declarations for serialization
XML_NODES = [('name', 'str'), ('slug', 'str'), ('description', 'str'),
('request', 'request'),]
def __init__(self, name=None):
XmlStorableObject.__init__(self)
self.name = name
@classmethod
def get_substitution_variables(cls):
return {'webservices': WsCallsSubstitutionProxy()}
def export_request_to_xml(self, element, attribute_name, charset):
request = getattr(self, attribute_name)
for attr in ('url', 'request_signature_key', 'method'):
ET.SubElement(element, attr).text = unicode(request.get(attr) or '', charset)
for attr in ('qs_data', 'post_data'):
data_element = ET.SubElement(element, attr)
for k, v in (request.get(attr) or {}).items():
sub = ET.SubElement(data_element, 'param')
sub.attrib['key'] = unicode(k, charset)
sub.text = unicode(v, charset)
if request.get('post_formdata'):
ET.SubElement(element, 'post_formdata')
def import_request_from_xml(self, element, charset):
request = {}
for attr in ('url', 'request_signature_key', 'method'):
request[attr] = ''
if element.find(attr) is not None and element.find(attr).text:
request[attr] = element.find(attr).text.encode(charset)
for attr in ('qs_data', 'post_data'):
request[attr] = {}
data_element = element.find(attr)
if data_element is None:
continue
for param in data_element.findall('param'):
request[attr][param.attrib['key'].encode(charset)] = param.text.encode(charset)
request['post_formdata'] = bool(element.find('post_formdata') is not None)
return request
def store(self):
if self.slug is None:
# set slug if it's not yet there
self.slug = self.get_new_slug()
if self.id and self.slug != self.id:
self.remove_object(self.id)
self.id = self.slug
super(NamedWsCall, self).store()
def get_new_slug(self):
new_slug = simplify(self.name, space='_')
base_new_slug = new_slug
suffix_no = 0
while True:
try:
obj = self.get(new_slug, ignore_migration=True)
except KeyError:
break
if obj.id == self.id:
break
suffix_no += 1
new_slug = '%s_%s' % (base_new_slug, suffix_no)
return new_slug
@classmethod
def get_substitution_variables(cls):
return {'webservice': WsCallsSubstitutionProxy()}
def call(self):
(response, status, data) = call_webservice(**self.request)
return json_loads(data)
class WsCallsSubstitutionProxy(object):
def __getattr__(self, attr):
return NamedWsCall.get(attr).call()
class WsCallRequestWidget(CompositeWidget):
def __init__(self, name, value=None, include_post_formdata=False, **kwargs):
CompositeWidget.__init__(self, name, value, **kwargs)
self.include_post_formdata = include_post_formdata
if not value:
value = {}
self.add(StringWidget, 'url', title=_('URL'), value=value.get('url'), size=80)
self.add(ComputedExpressionWidget, 'request_signature_key',
title=_('Request Signature Key'),
value=value.get('request_signature_key'))
self.add(WidgetDict, 'qs_data',
title=_('Query string data'),
value=value.get('qs_data') or {},
element_value_type=ComputedExpressionWidget)
methods = collections.OrderedDict(
[('GET', _('GET')), ('POST', _('POST (JSON)'))])
self.add(RadiobuttonsWidget, 'method',
title=_('Method'),
options=methods.items(),
value=value.get('method') or 'GET',
attrs={'data-dynamic-display-parent': 'true'})
method_widget = self.get_widget('method')
if self.include_post_formdata:
self.add(CheckboxWidget, 'post_formdata',
title=_('Post formdata'),
value=value.get('post_formdata'),
attrs={
'data-dynamic-display-child-of': method_widget.get_name(),
'data-dynamic-display-value': methods.get('POST'),
})
self.add(WidgetDict, 'post_data',
title=_('Post data'),
value=value.get('post_data') or {},
element_value_type=ComputedExpressionWidget,
attrs={
'data-dynamic-display-child-of': method_widget.get_name(),
'data-dynamic-display-value': methods.get('POST'),
})
def _parse(self, request):
values = {}
for name in ('url', 'request_signature_key', 'qs_data', 'method',
'post_formdata', 'post_data'):
if not self.include_post_formdata and name == 'post_formdata':
continue
value = self.get(name)
if value:
values[name] = value
self.value = values or None