add a common soap module based on suds and requests (#9163)
The Soap class contains all initialization behaviours needed to create suds Client objects initialized through a service model or through parameters.
This commit is contained in:
parent
a0d344799b
commit
d8bc2a9fcd
|
@ -0,0 +1,118 @@
|
|||
import os.path
|
||||
import StringIO
|
||||
import urlparse
|
||||
|
||||
import requests
|
||||
from suds.client import Client
|
||||
from suds.transport.http import HttpAuthenticated
|
||||
from suds.transport import Reply
|
||||
|
||||
|
||||
class Transport(HttpAuthenticated):
|
||||
def __init__(self, username=None, password=None, certificate=None, private_key=None,
|
||||
verify=True, domains=None, **kwargs):
|
||||
assert not (bool(certificate) ^ bool(private_key)), ('a private key is mandatory with a '
|
||||
'certificate')
|
||||
assert username is not None or not password, 'a username is mandatory with a password'
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.certificate = certificate
|
||||
self.private_key = private_key
|
||||
self.verify = verify
|
||||
self.domains = domains
|
||||
HttpAuthenticated.__init__(self, **kwargs)
|
||||
|
||||
def get_requests_kwargs(self, request):
|
||||
kwargs = {}
|
||||
parsed = urlparse.urlparse(request.url)
|
||||
domain = '%s://%s' % (parsed.scheme, parsed.netloc)
|
||||
if not self.domains or domain in self.domains:
|
||||
if self.username is not None:
|
||||
kwargs['auth'] = (self.username, self.password)
|
||||
if self.certificate is not None:
|
||||
kwargs['cert'] = (self.certificate, self.private_key)
|
||||
kwargs['verify'] = self.verify
|
||||
return kwargs
|
||||
|
||||
def open(self, request):
|
||||
# only use our custom handler to fetch service resources, not schemas
|
||||
# from other namespaces
|
||||
resp = requests.get(request.url, headers=request.headers,
|
||||
**self.get_requests_kwargs(request))
|
||||
return StringIO.StringIO(resp.content)
|
||||
|
||||
def send(self, request):
|
||||
resp = requests.post(request.url, data=request.message, headers=request.headers,
|
||||
**self.get_requests_kwargs(request))
|
||||
return Reply(resp.status_code, resp.headers, resp.content)
|
||||
|
||||
|
||||
class Soap(object):
|
||||
url = None
|
||||
path = None
|
||||
username = None
|
||||
password = None
|
||||
certificate = None
|
||||
private_key = None
|
||||
domains = None
|
||||
plugins = None
|
||||
cache = None
|
||||
|
||||
def __init__(self, url=None, path=None, username=None, password=None, certificate=None,
|
||||
private_key=None, verify=None, instance=None, plugins=None, cache=None):
|
||||
assert not (bool(certificate) ^ bool(private_key)), ('a private key is mandatory with a '
|
||||
'certificate')
|
||||
assert username is not None or not password, 'a username is mandatory with a password'
|
||||
self.domains = set(self.domains) if self.domains else set()
|
||||
self.plugins = self.plugins or []
|
||||
if url is not None:
|
||||
self.url = url
|
||||
if path is not None:
|
||||
self.path = None
|
||||
if username is not None:
|
||||
self.username = username
|
||||
self.password = password
|
||||
if certificate is not None:
|
||||
self.certificate = certificate
|
||||
self.private_key = private_key
|
||||
if verify is not None:
|
||||
self.verify = verify
|
||||
if instance is not None:
|
||||
self.init_from_instance(instance)
|
||||
if plugins is not None:
|
||||
self.plugins.extend(plugins)
|
||||
if cache is not None:
|
||||
self.cache = cache
|
||||
self.update_domains()
|
||||
|
||||
def update_domains(self):
|
||||
if self.url is not None:
|
||||
parsed_url = urlparse.urlparse(self.url)
|
||||
self.domains.add('%s://%s' % (parsed_url.scheme, parsed_url.netloc))
|
||||
|
||||
def init_from_instance(self, instance):
|
||||
if hasattr(instance, 'wsdl_url'):
|
||||
self.url = instance.wsdl_url
|
||||
if hasattr(instance, 'username'):
|
||||
self.username = instance.username
|
||||
self.password = instance.password
|
||||
if hasattr(instance, 'keystore'):
|
||||
self.certificate = instance.keystore.path
|
||||
self.private_key = self.certificate
|
||||
if hasattr(instance, 'verify_cert'):
|
||||
self.verify = instance.verify_cert
|
||||
|
||||
def get_client(self):
|
||||
url = self.url
|
||||
if self.path:
|
||||
url = 'file://' + os.path.abspath(self.path)
|
||||
return Client(url,
|
||||
transport=Transport(
|
||||
username=self.username,
|
||||
password=self.password,
|
||||
certificate=self.certificate,
|
||||
private_key=self.private_key,
|
||||
verify=self.verify,
|
||||
domains=self.domains),
|
||||
plugins=self.plugins,
|
||||
cache=self.cache)
|
Loading…
Reference in New Issue