159 lines
5.9 KiB
Python
159 lines
5.9 KiB
Python
import os.path
|
|
|
|
from django.utils.encoding import force_text
|
|
from django.utils.six import StringIO
|
|
from django.utils.six.moves.urllib import parse as urlparse
|
|
import requests
|
|
from suds.client import Client
|
|
from suds.transport.http import HttpAuthenticated
|
|
from suds.transport import Reply
|
|
import suds.sudsobject
|
|
|
|
|
|
class Transport(HttpAuthenticated):
|
|
def __init__(self, username=None, password=None, certificate=None, private_key=None,
|
|
verify=True, domains=None, **kwargs):
|
|
assert not (bool(certificate) ^ bool(private_key)), ('a private key is mandatory with a '
|
|
'certificate')
|
|
assert username is not None or not password, 'a username is mandatory with a password'
|
|
self.username = username
|
|
self.password = password
|
|
self.certificate = certificate
|
|
self.private_key = private_key
|
|
self.verify = verify
|
|
self.domains = domains
|
|
HttpAuthenticated.__init__(self, **kwargs)
|
|
|
|
def get_requests_kwargs(self, request):
|
|
kwargs = {}
|
|
parsed = urlparse.urlparse(request.url)
|
|
domain = '%s://%s' % (parsed.scheme, parsed.netloc)
|
|
if not self.domains or domain in self.domains:
|
|
if self.username is not None:
|
|
kwargs['auth'] = (self.username, self.password)
|
|
if self.certificate is not None:
|
|
kwargs['cert'] = (self.certificate, self.private_key)
|
|
kwargs['verify'] = self.verify
|
|
return kwargs
|
|
|
|
def open(self, request):
|
|
# only use our custom handler to fetch service resources, not schemas
|
|
# from other namespaces
|
|
resp = requests.get(request.url, headers=request.headers,
|
|
**self.get_requests_kwargs(request))
|
|
return StringIO(resp.content)
|
|
|
|
def send(self, request):
|
|
resp = requests.post(request.url, data=request.message, headers=request.headers,
|
|
**self.get_requests_kwargs(request))
|
|
return Reply(resp.status_code, resp.headers, resp.content)
|
|
|
|
|
|
class Soap(object):
|
|
url = None
|
|
path = None
|
|
username = None
|
|
password = None
|
|
certificate = None
|
|
private_key = None
|
|
domains = None
|
|
plugins = None
|
|
cache = None
|
|
|
|
def __init__(self, url=None, path=None, username=None, password=None, certificate=None,
|
|
private_key=None, verify=None, instance=None, plugins=None, cache=None):
|
|
assert not (bool(certificate) ^ bool(private_key)), ('a private key is mandatory with a '
|
|
'certificate')
|
|
assert username is not None or not password, 'a username is mandatory with a password'
|
|
self.domains = set(self.domains) if self.domains else set()
|
|
self.plugins = self.plugins or []
|
|
if url is not None:
|
|
self.url = url
|
|
if path is not None:
|
|
self.path = None
|
|
if username is not None:
|
|
self.username = username
|
|
self.password = password
|
|
if certificate is not None:
|
|
self.certificate = certificate
|
|
self.private_key = private_key
|
|
if verify is not None:
|
|
self.verify = verify
|
|
if instance is not None:
|
|
self.init_from_instance(instance)
|
|
if plugins is not None:
|
|
self.plugins.extend(plugins)
|
|
if cache is not None:
|
|
self.cache = cache
|
|
self.update_domains()
|
|
|
|
def update_domains(self):
|
|
if self.url is not None:
|
|
parsed_url = urlparse.urlparse(self.url)
|
|
self.domains.add('%s://%s' % (parsed_url.scheme, parsed_url.netloc))
|
|
|
|
def init_from_instance(self, instance):
|
|
if hasattr(instance, 'wsdl_url'):
|
|
self.url = instance.wsdl_url
|
|
if hasattr(instance, 'username'):
|
|
self.username = instance.username
|
|
self.password = instance.password
|
|
if hasattr(instance, 'keystore'):
|
|
self.certificate = instance.keystore.path
|
|
self.private_key = self.certificate
|
|
if hasattr(instance, 'verify_cert'):
|
|
self.verify = instance.verify_cert
|
|
|
|
def get_client(self):
|
|
url = self.url
|
|
if self.path:
|
|
url = 'file://' + os.path.abspath(self.path)
|
|
return Client(url,
|
|
transport=Transport(
|
|
username=self.username,
|
|
password=self.password,
|
|
certificate=self.certificate,
|
|
private_key=self.private_key,
|
|
verify=self.verify,
|
|
domains=self.domains),
|
|
plugins=self.plugins,
|
|
cache=self.cache)
|
|
|
|
|
|
def client_to_jsondict(client):
|
|
"""return description of the client, as dict (for json export)"""
|
|
res = {}
|
|
for i, sd in enumerate(client.sd):
|
|
d = {}
|
|
d['tns'] = sd.wsdl.tns[1]
|
|
d['prefixes'] = dict(p for p in sd.prefixes)
|
|
d['ports'] = {}
|
|
for p in sd.ports:
|
|
d['ports'][p[0].name] = {}
|
|
for m in p[1]:
|
|
d['ports'][p[0].name][m[0]] = dict(
|
|
(mp[0], sd.xlate(mp[1])) for mp in m[1])
|
|
d['types'] = {}
|
|
for t in sd.types:
|
|
ft = client.factory.create(sd.xlate(t[0]))
|
|
d['types'][sd.xlate(t[0])] = force_text(ft)
|
|
res[sd.service.name] = d
|
|
return res
|
|
|
|
|
|
def sudsobject_to_dict(sudsobject):
|
|
out = {}
|
|
for key, value in suds.sudsobject.asdict(sudsobject).items():
|
|
if hasattr(value, '__keylist__'):
|
|
out[key] = sudsobject_to_dict(value)
|
|
elif isinstance(value, list):
|
|
out[key] = []
|
|
for item in value:
|
|
if hasattr(item, '__keylist__'):
|
|
out[key].append(sudsobject_to_dict(item))
|
|
else:
|
|
out[key].append(item)
|
|
else:
|
|
out[key] = value
|
|
return out
|