wcs/wcs/wscalls.py

369 lines
13 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2016 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import collections
import json
import sys
import urllib.parse
import xml.etree.ElementTree as ET
from django.utils.encoding import force_text
from quixote import get_publisher, get_request
from wcs.api_utils import MissingSecret, get_secret_and_orig, sign_url
from wcs.qommon.errors import ConnectionError
from wcs.workflows import WorkflowStatusItem
from .qommon import _, force_str, misc
from .qommon.form import (
CheckboxWidget,
CompositeWidget,
ComputedExpressionWidget,
RadiobuttonsWidget,
StringWidget,
WidgetDict,
)
from .qommon.misc import JSONEncoder, get_variadic_url, json_loads, simplify
from .qommon.template import Template
from .qommon.xml_storage import XmlStorableObject
def get_app_error_code(response, data, response_type):
app_error_code = 0
app_error_code_header = response.headers.get('x-error-code')
if app_error_code_header:
# result is good only if header value is '0'
try:
app_error_code = int(app_error_code_header)
except ValueError:
app_error_code = app_error_code_header
elif response_type == 'json':
try:
d = json_loads(data)
except (ValueError, TypeError):
pass
else:
if isinstance(d, dict) and d.get('err'):
app_error_code = d['err']
return app_error_code
def call_webservice(
url,
qs_data=None,
request_signature_key=None,
method=None,
post_data=None,
post_formdata=None,
formdata=None,
cache=False,
notify_on_errors=False,
record_on_errors=False,
handle_connection_errors=True,
**kwargs,
):
url = url.strip()
if Template.is_template_string(url):
variables = get_publisher().substitutions.get_context_variables(mode='lazy')
url = get_variadic_url(url, variables)
if not request_signature_key:
try:
request_signature_key, orig = get_secret_and_orig(url)
except MissingSecret:
pass
else:
if not qs_data:
qs_data = {}
qs_data['orig'] = orig
if qs_data: # merge qs_data into url
parsed = urllib.parse.urlparse(url)
qs = list(urllib.parse.parse_qsl(parsed.query))
for key, value in qs_data.items():
try:
value = WorkflowStatusItem.compute(value, raises=True)
value = str(value)
except Exception:
get_publisher().notify_of_exception(sys.exc_info())
else:
key = force_str(key)
value = force_str(value)
qs.append((key, value))
qs = urllib.parse.urlencode(qs)
url = urllib.parse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
unsigned_url = url
if method == 'GET' and cache is True: # check cache
request = get_request()
if hasattr(request, 'wscalls_cache') and unsigned_url in request.wscalls_cache:
return (None,) + request.wscalls_cache[unsigned_url]
if request_signature_key:
signature_key = str(WorkflowStatusItem.compute(request_signature_key))
if signature_key:
url = sign_url(url, signature_key)
headers = {'Accept': 'application/json'}
payload = None
# if post_data exists, payload is a dict built from it
if method in ('PATCH', 'PUT', 'POST') and post_data:
payload = {}
with get_publisher().complex_data():
for (key, value) in post_data.items():
try:
payload[key] = WorkflowStatusItem.compute(value, allow_complex=True, raises=True)
except Exception:
get_publisher().notify_of_exception(sys.exc_info())
else:
if payload[key]:
payload[key] = get_publisher().get_cached_complex_data(payload[key])
# if formdata has to be sent, it's the payload. If post_data exists,
# it's added in formdata['extra']
if method in ('PATCH', 'PUT', 'POST') and post_formdata:
if formdata:
formdata_dict = formdata.get_json_export_dict()
if payload is not None:
formdata_dict['extra'] = payload
payload = formdata_dict
try:
if method in ('PATCH', 'PUT', 'POST'):
if payload:
headers['Content-type'] = 'application/json'
payload = json.dumps(payload, cls=JSONEncoder)
response, status, data, dummy = misc._http_request(
url, method=method, body=payload, headers=headers
)
elif method == 'DELETE':
response, status, data, dummy = misc._http_request(url, method='DELETE', headers=headers)
else:
response, status, data, dummy = misc.http_get_page(url, headers=headers)
request = get_request()
if cache is True and request and hasattr(request, 'wscalls_cache'):
request.wscalls_cache[unsigned_url] = (status, data)
except ConnectionError as e:
if not handle_connection_errors:
raise e
if notify_on_errors or record_on_errors:
exc_info = sys.exc_info()
get_publisher().notify_of_exception(
exc_info, context='[WSCALL]', notify=notify_on_errors, record=record_on_errors
)
return (None, None, None)
app_error_code = get_app_error_code(response, data, 'json')
if (app_error_code != 0 or status >= 400) and (notify_on_errors or record_on_errors):
summary = '<no response>'
if response is not None:
summary = '%s %s' % (status, response.reason)
try:
raise Exception(summary)
except Exception:
exc_info = sys.exc_info()
get_publisher().notify_of_exception(
exc_info, context='[WSCALL]', notify=notify_on_errors, record=record_on_errors
)
return (response, status, data)
class NamedWsCall(XmlStorableObject):
_names = 'wscalls'
xml_root_node = 'wscall'
name = None
slug = None
description = None
request = None
notify_on_errors = False
record_on_errors = False
# declarations for serialization
XML_NODES = [
('name', 'str'),
('slug', 'str'),
('description', 'str'),
('request', 'request'),
('notify_on_errors', 'bool'),
('record_on_errors', 'bool'),
]
def __init__(self, name=None):
XmlStorableObject.__init__(self)
self.name = name
def get_admin_url(self):
base_url = get_publisher().get_backoffice_url()
return '%s/settings/wscalls/%s/' % (base_url, self.slug)
def export_request_to_xml(self, element, attribute_name, charset, **kwargs):
request = getattr(self, attribute_name)
for attr in ('url', 'request_signature_key', 'method'):
ET.SubElement(element, attr).text = force_text(request.get(attr) or '', charset)
for attr in ('qs_data', 'post_data'):
data_element = ET.SubElement(element, attr)
for k, v in (request.get(attr) or {}).items():
sub = ET.SubElement(data_element, 'param')
sub.attrib['key'] = force_text(k, charset)
sub.text = force_text(v, charset)
if request.get('post_formdata'):
ET.SubElement(element, 'post_formdata')
def import_request_from_xml(self, element, **kwargs):
request = {}
for attr in ('url', 'request_signature_key', 'method'):
request[attr] = ''
if element.find(attr) is not None and element.find(attr).text:
request[attr] = force_str(element.find(attr).text)
for attr in ('qs_data', 'post_data'):
request[attr] = {}
data_element = element.find(attr)
if data_element is None:
continue
for param in data_element.findall('param'):
request[attr][force_str(param.attrib['key'])] = force_str(param.text)
request['post_formdata'] = bool(element.find('post_formdata') is not None)
return request
def store(self, comment=None):
assert not self.is_readonly()
if self.slug is None:
# set slug if it's not yet there
self.slug = self.get_new_slug()
if self.id and self.slug != self.id:
self.remove_object(self.id)
self.id = self.slug
super().store()
if get_publisher().snapshot_class:
get_publisher().snapshot_class.snap(instance=self, comment=comment)
def get_new_slug(self):
new_slug = simplify(self.name, space='_')
base_new_slug = new_slug
suffix_no = 0
while True:
try:
obj = self.get(new_slug, ignore_migration=True)
except KeyError:
break
if obj.id == self.id:
break
suffix_no += 1
new_slug = '%s_%s' % (base_new_slug, suffix_no)
return new_slug
@classmethod
def get_substitution_variables(cls):
return {'webservice': WsCallsSubstitutionProxy()}
def call(self):
data = call_webservice(
cache=True,
notify_on_errors=self.notify_on_errors,
record_on_errors=self.record_on_errors,
**(self.request or {}),
)[2]
return json_loads(data)
class WsCallsSubstitutionProxy:
def __getattr__(self, attr):
try:
return NamedWsCall.get(attr).call()
except (KeyError, ValueError):
raise AttributeError(attr)
class WsCallRequestWidget(CompositeWidget):
def __init__(self, name, value=None, include_post_formdata=False, **kwargs):
CompositeWidget.__init__(self, name, value, **kwargs)
self.include_post_formdata = include_post_formdata
if not value:
value = {}
self.add(StringWidget, 'url', title=_('URL'), value=value.get('url'), size=80)
self.add(
ComputedExpressionWidget,
'request_signature_key',
title=_('Request Signature Key'),
value=value.get('request_signature_key'),
)
self.add(
WidgetDict,
'qs_data',
title=_('Query string data'),
value=value.get('qs_data') or {},
element_value_type=ComputedExpressionWidget,
)
methods = collections.OrderedDict(
[
('GET', _('GET')),
('POST', _('POST (JSON)')),
('PUT', _('PUT (JSON)')),
('PATCH', _('PATCH (JSON)')),
('DELETE', _('DELETE')),
]
)
self.add(
RadiobuttonsWidget,
'method',
title=_('Method'),
options=list(methods.items()),
value=value.get('method') or 'GET',
attrs={'data-dynamic-display-parent': 'true'},
extra_css_class='widget-inline-radio',
)
method_widget = self.get_widget('method')
if self.include_post_formdata:
self.add(
CheckboxWidget,
'post_formdata',
title=_('Post formdata'),
value=value.get('post_formdata'),
attrs={
'data-dynamic-display-child-of': method_widget.get_name(),
'data-dynamic-display-value': methods.get('POST'),
},
)
self.add(
WidgetDict,
'post_data',
title=_('POST data'),
value=value.get('post_data') or {},
element_value_type=ComputedExpressionWidget,
attrs={
'data-dynamic-display-child-of': method_widget.get_name(),
'data-dynamic-display-value': methods.get('POST'),
},
)
def _parse(self, request):
values = {}
for name in ('url', 'request_signature_key', 'qs_data', 'method', 'post_formdata', 'post_data'):
if not self.include_post_formdata and name == 'post_formdata':
continue
value = self.get(name)
if value:
values[name] = value
self.value = values or None