passerelle/passerelle/soap.py

119 lines
4.5 KiB
Python

import os.path
import StringIO
import urlparse
import requests
from suds.client import Client
from suds.transport.http import HttpAuthenticated
from suds.transport import Reply
class Transport(HttpAuthenticated):
def __init__(self, username=None, password=None, certificate=None, private_key=None,
verify=True, domains=None, **kwargs):
assert not (bool(certificate) ^ bool(private_key)), ('a private key is mandatory with a '
'certificate')
assert username is not None or not password, 'a username is mandatory with a password'
self.username = username
self.password = password
self.certificate = certificate
self.private_key = private_key
self.verify = verify
self.domains = domains
HttpAuthenticated.__init__(self, **kwargs)
def get_requests_kwargs(self, request):
kwargs = {}
parsed = urlparse.urlparse(request.url)
domain = '%s://%s' % (parsed.scheme, parsed.netloc)
if not self.domains or domain in self.domains:
if self.username is not None:
kwargs['auth'] = (self.username, self.password)
if self.certificate is not None:
kwargs['cert'] = (self.certificate, self.private_key)
kwargs['verify'] = self.verify
return kwargs
def open(self, request):
# only use our custom handler to fetch service resources, not schemas
# from other namespaces
resp = requests.get(request.url, headers=request.headers,
**self.get_requests_kwargs(request))
return StringIO.StringIO(resp.content)
def send(self, request):
resp = requests.post(request.url, data=request.message, headers=request.headers,
**self.get_requests_kwargs(request))
return Reply(resp.status_code, resp.headers, resp.content)
class Soap(object):
url = None
path = None
username = None
password = None
certificate = None
private_key = None
domains = None
plugins = None
cache = None
def __init__(self, url=None, path=None, username=None, password=None, certificate=None,
private_key=None, verify=None, instance=None, plugins=None, cache=None):
assert not (bool(certificate) ^ bool(private_key)), ('a private key is mandatory with a '
'certificate')
assert username is not None or not password, 'a username is mandatory with a password'
self.domains = set(self.domains) if self.domains else set()
self.plugins = self.plugins or []
if url is not None:
self.url = url
if path is not None:
self.path = None
if username is not None:
self.username = username
self.password = password
if certificate is not None:
self.certificate = certificate
self.private_key = private_key
if verify is not None:
self.verify = verify
if instance is not None:
self.init_from_instance(instance)
if plugins is not None:
self.plugins.extend(plugins)
if cache is not None:
self.cache = cache
self.update_domains()
def update_domains(self):
if self.url is not None:
parsed_url = urlparse.urlparse(self.url)
self.domains.add('%s://%s' % (parsed_url.scheme, parsed_url.netloc))
def init_from_instance(self, instance):
if hasattr(instance, 'wsdl_url'):
self.url = instance.wsdl_url
if hasattr(instance, 'username'):
self.username = instance.username
self.password = instance.password
if hasattr(instance, 'keystore'):
self.certificate = instance.keystore.path
self.private_key = self.certificate
if hasattr(instance, 'verify_cert'):
self.verify = instance.verify_cert
def get_client(self):
url = self.url
if self.path:
url = 'file://' + os.path.abspath(self.path)
return Client(url,
transport=Transport(
username=self.username,
password=self.password,
certificate=self.certificate,
private_key=self.private_key,
verify=self.verify,
domains=self.domains),
plugins=self.plugins,
cache=self.cache)