passerelle/passerelle/utils/soap.py

152 lines
5.2 KiB
Python

# passerelle - uniform access to multiple data sources and services
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from urllib import parse as urlparse
from requests import RequestException
from zeep import Client
from zeep.cache import InMemoryCache
from zeep.exceptions import Error as ZeepError
from zeep.exceptions import Fault, TransportError
from zeep.proxy import OperationProxy, ServiceProxy
from zeep.transports import Transport
from passerelle.utils.jsonresponse import APIError
class SOAPError(APIError):
pass
class SOAPServiceUnreachable(SOAPError):
def __init__(self, client, exception):
super().__init__(
f'SOAP service at {client.wsdl.location} is unreachable. Please contact its administrator',
data={
'wsdl': client.wsdl.location,
'status_code': exception.status_code,
'error': str(exception),
},
)
class SOAPFault(SOAPError):
def __init__(self, client, fault):
super().__init__(
f'SOAP service at {client.wsdl.location} returned an error "{fault.message or fault.code}"',
data={
'soap_fault': fault.__dict__,
},
)
class OperationProxyWrapper(OperationProxy):
def __call__(self, *args, **kwargs):
client = self._proxy._client
try:
return super().__call__(*args, **kwargs)
except TransportError as transport_error:
raise SOAPServiceUnreachable(client, transport_error)
except Fault as fault:
raise SOAPFault(client, fault)
except ZeepError as zeep_error:
raise SOAPError(str(zeep_error))
class ServiceProxyWrapper(ServiceProxy):
def __getitem__(self, key):
operation = super().__getitem__(key)
operation.__class__ = OperationProxyWrapper
return operation
class SOAPClient(Client):
"""Wrapper around zeep.Client
resource muste have a wsdl_url and a requests attribute
"""
def __init__(self, resource, **kwargs):
wsdl_url = kwargs.pop('wsdl_url', None) or resource.wsdl_url
self.api_error = kwargs.pop('api_error', False)
transport_kwargs = kwargs.pop('transport_kwargs', {})
transport_class = getattr(resource, 'soap_transport_class', SOAPTransport)
transport = transport_class(
resource, wsdl_url, session=resource.requests, cache=InMemoryCache(), **transport_kwargs
)
super().__init__(wsdl_url, transport=transport, **kwargs)
@property
def service(self):
service = super().service
if self.api_error:
service.__class__ = ServiceProxyWrapper
return service
class ResponseFixContentWrapper:
def __init__(self, response):
self.response = response
def __getattr__(self, name):
return getattr(self.response, name)
@property
def content(self):
content = self.response.content
if 'multipart/related' not in self.response.headers.get('Content-Type', ''):
try:
first_less_than_sign = content.index(b'<')
last_greater_than_sign = content.rindex(b'>')
content = content[first_less_than_sign : last_greater_than_sign + 1]
except ValueError:
pass
return content
class SOAPTransport(Transport):
"""Wrapper around zeep.Transport
disable basic_authentication hosts unrelated to wsdl's endpoints
"""
def __init__(self, resource, wsdl_url, remove_first_bytes_for_xml=False, **kwargs):
self.resource = resource
self.wsdl_host = urlparse.urlparse(wsdl_url).netloc
# fix content for servers returning unexpected characters before XML document start
self.remove_first_bytes_for_xml = remove_first_bytes_for_xml
super().__init__(**kwargs)
def _load_remote_data(self, url):
try:
if urlparse.urlparse(url).netloc != self.wsdl_host:
response = self.session.get(url, timeout=self.load_timeout, auth=None, cert=None)
response.raise_for_status()
return response.content
return super()._load_remote_data(url)
except RequestException as e:
raise SOAPError(
'SOAP service is down, location %r cannot be loaded: %s' % (url, e), exception=e, url=url
)
def post_xml(self, *args, **kwargs):
response = super().post_xml(*args, **kwargs)
if self.remove_first_bytes_for_xml:
return ResponseFixContentWrapper(response)
return response