260 lines
9.8 KiB
Python
260 lines
9.8 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2016 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import collections
|
|
import json
|
|
import sys
|
|
import urllib
|
|
import urlparse
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from quixote import get_publisher, get_request
|
|
|
|
from qommon import _
|
|
from qommon.misc import simplify, get_variadic_url, JSONEncoder, json_loads
|
|
from qommon.xml_storage import XmlStorableObject
|
|
from qommon.form import (CompositeWidget, StringWidget, WidgetDict,
|
|
ComputedExpressionWidget, RadiobuttonsWidget, CheckboxWidget)
|
|
import qommon.misc
|
|
from qommon.template import Template
|
|
|
|
from wcs.api_utils import sign_url, get_secret_and_orig, MissingSecret
|
|
from wcs.workflows import WorkflowStatusItem
|
|
|
|
def call_webservice(url, qs_data=None, request_signature_key=None,
|
|
method=None, post_data=None, post_formdata=None, formdata=None,
|
|
cache=False, **kwargs):
|
|
|
|
url = url.strip()
|
|
if Template.is_template_string(url):
|
|
variables = get_publisher().substitutions.get_context_variables()
|
|
url = get_variadic_url(url, variables)
|
|
|
|
if not request_signature_key:
|
|
try:
|
|
request_signature_key, orig = get_secret_and_orig(url)
|
|
except MissingSecret:
|
|
pass
|
|
else:
|
|
if not qs_data:
|
|
qs_data = {}
|
|
qs_data['orig'] = orig
|
|
|
|
if qs_data: # merge qs_data into url
|
|
publisher = get_publisher()
|
|
parsed = urlparse.urlparse(url)
|
|
qs = list(urlparse.parse_qsl(parsed.query))
|
|
for key, value in qs_data.iteritems():
|
|
try:
|
|
value = WorkflowStatusItem.compute(value, raises=True)
|
|
value = str(value)
|
|
except:
|
|
get_publisher().notify_of_exception(sys.exc_info())
|
|
else:
|
|
key = publisher.sitecharset2utf8(key)
|
|
value = publisher.sitecharset2utf8(value)
|
|
qs.append((key, value))
|
|
qs = urllib.urlencode(qs)
|
|
url = urlparse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
|
|
|
|
unsigned_url = url
|
|
|
|
if method == 'GET' and cache is True: # check cache
|
|
request = get_request()
|
|
if hasattr(request, 'wscalls_cache') and unsigned_url in request.wscalls_cache:
|
|
return (None,) + request.wscalls_cache[unsigned_url]
|
|
|
|
if request_signature_key:
|
|
signature_key = str(WorkflowStatusItem.compute(request_signature_key))
|
|
if signature_key:
|
|
url = sign_url(url, signature_key)
|
|
|
|
headers = {'Accept': 'application/json'}
|
|
payload = None
|
|
|
|
# if post_data exists, payload is a dict built from it
|
|
if method == 'POST' and post_data:
|
|
payload = {}
|
|
for (key, value) in post_data.items():
|
|
try:
|
|
payload[key] = WorkflowStatusItem.compute(value, raises=True)
|
|
except:
|
|
get_publisher().notify_of_exception(sys.exc_info())
|
|
|
|
# if formdata has to be sent, it's the payload. If post_data exists,
|
|
# it's added in formdata['extra']
|
|
if method == 'POST' and post_formdata:
|
|
if formdata:
|
|
formdata_dict = formdata.get_json_export_dict()
|
|
if payload is not None:
|
|
formdata_dict['extra'] = payload
|
|
payload = formdata_dict
|
|
|
|
if method == 'POST':
|
|
if payload:
|
|
headers['Content-type'] = 'application/json'
|
|
payload = json.dumps(payload, cls=JSONEncoder,
|
|
encoding=get_publisher().site_charset)
|
|
response, status, data, auth_header = qommon.misc.http_post_request(
|
|
url, payload, headers=headers)
|
|
else:
|
|
response, status, data, auth_header = qommon.misc.http_get_page(
|
|
url, headers=headers)
|
|
request = get_request()
|
|
if cache is True and request and hasattr(request, 'wscalls_cache'):
|
|
request.wscalls_cache[unsigned_url] = (status, data)
|
|
|
|
return (response, status, data)
|
|
|
|
|
|
class NamedWsCall(XmlStorableObject):
|
|
_names = 'wscalls'
|
|
_xml_tagname = 'wscalls'
|
|
|
|
name = None
|
|
slug = None
|
|
description = None
|
|
request = None
|
|
|
|
# declarations for serialization
|
|
XML_NODES = [('name', 'str'), ('slug', 'str'), ('description', 'str'),
|
|
('request', 'request'),]
|
|
|
|
def __init__(self, name=None):
|
|
XmlStorableObject.__init__(self)
|
|
self.name = name
|
|
|
|
def export_request_to_xml(self, element, attribute_name, charset):
|
|
request = getattr(self, attribute_name)
|
|
for attr in ('url', 'request_signature_key', 'method'):
|
|
ET.SubElement(element, attr).text = unicode(request.get(attr) or '', charset)
|
|
for attr in ('qs_data', 'post_data'):
|
|
data_element = ET.SubElement(element, attr)
|
|
for k, v in (request.get(attr) or {}).items():
|
|
sub = ET.SubElement(data_element, 'param')
|
|
sub.attrib['key'] = unicode(k, charset)
|
|
sub.text = unicode(v, charset)
|
|
if request.get('post_formdata'):
|
|
ET.SubElement(element, 'post_formdata')
|
|
|
|
def import_request_from_xml(self, element, charset):
|
|
request = {}
|
|
for attr in ('url', 'request_signature_key', 'method'):
|
|
request[attr] = ''
|
|
if element.find(attr) is not None and element.find(attr).text:
|
|
request[attr] = element.find(attr).text.encode(charset)
|
|
for attr in ('qs_data', 'post_data'):
|
|
request[attr] = {}
|
|
data_element = element.find(attr)
|
|
if data_element is None:
|
|
continue
|
|
for param in data_element.findall('param'):
|
|
request[attr][param.attrib['key'].encode(charset)] = param.text.encode(charset)
|
|
request['post_formdata'] = bool(element.find('post_formdata') is not None)
|
|
return request
|
|
|
|
def store(self):
|
|
if self.slug is None:
|
|
# set slug if it's not yet there
|
|
self.slug = self.get_new_slug()
|
|
if self.id and self.slug != self.id:
|
|
self.remove_object(self.id)
|
|
self.id = self.slug
|
|
super(NamedWsCall, self).store()
|
|
|
|
def get_new_slug(self):
|
|
new_slug = simplify(self.name, space='_')
|
|
base_new_slug = new_slug
|
|
suffix_no = 0
|
|
while True:
|
|
try:
|
|
obj = self.get(new_slug, ignore_migration=True)
|
|
except KeyError:
|
|
break
|
|
if obj.id == self.id:
|
|
break
|
|
suffix_no += 1
|
|
new_slug = '%s_%s' % (base_new_slug, suffix_no)
|
|
return new_slug
|
|
|
|
@classmethod
|
|
def get_substitution_variables(cls):
|
|
return {'webservice': WsCallsSubstitutionProxy()}
|
|
|
|
def call(self):
|
|
(response, status, data) = call_webservice(cache=True, **self.request)
|
|
return json_loads(data)
|
|
|
|
class WsCallsSubstitutionProxy(object):
|
|
def __getattr__(self, attr):
|
|
try:
|
|
return NamedWsCall.get(attr).call()
|
|
except (KeyError, ValueError):
|
|
raise AttributeError(attr)
|
|
|
|
|
|
class WsCallRequestWidget(CompositeWidget):
|
|
def __init__(self, name, value=None, include_post_formdata=False, **kwargs):
|
|
CompositeWidget.__init__(self, name, value, **kwargs)
|
|
self.include_post_formdata = include_post_formdata
|
|
|
|
if not value:
|
|
value = {}
|
|
|
|
self.add(StringWidget, 'url', title=_('URL'), value=value.get('url'), size=80)
|
|
self.add(ComputedExpressionWidget, 'request_signature_key',
|
|
title=_('Request Signature Key'),
|
|
value=value.get('request_signature_key'))
|
|
self.add(WidgetDict, 'qs_data',
|
|
title=_('Query string data'),
|
|
value=value.get('qs_data') or {},
|
|
element_value_type=ComputedExpressionWidget)
|
|
methods = collections.OrderedDict(
|
|
[('GET', _('GET')), ('POST', _('POST (JSON)'))])
|
|
self.add(RadiobuttonsWidget, 'method',
|
|
title=_('Method'),
|
|
options=methods.items(),
|
|
value=value.get('method') or 'GET',
|
|
attrs={'data-dynamic-display-parent': 'true'})
|
|
method_widget = self.get_widget('method')
|
|
if self.include_post_formdata:
|
|
self.add(CheckboxWidget, 'post_formdata',
|
|
title=_('Post formdata'),
|
|
value=value.get('post_formdata'),
|
|
attrs={
|
|
'data-dynamic-display-child-of': method_widget.get_name(),
|
|
'data-dynamic-display-value': methods.get('POST'),
|
|
})
|
|
self.add(WidgetDict, 'post_data',
|
|
title=_('POST data'),
|
|
value=value.get('post_data') or {},
|
|
element_value_type=ComputedExpressionWidget,
|
|
attrs={
|
|
'data-dynamic-display-child-of': method_widget.get_name(),
|
|
'data-dynamic-display-value': methods.get('POST'),
|
|
})
|
|
|
|
def _parse(self, request):
|
|
values = {}
|
|
for name in ('url', 'request_signature_key', 'qs_data', 'method',
|
|
'post_formdata', 'post_data'):
|
|
if not self.include_post_formdata and name == 'post_formdata':
|
|
continue
|
|
value = self.get(name)
|
|
if value:
|
|
values[name] = value
|
|
self.value = values or None
|