This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
passerelle-maarch/passerelle_maarch/soap.py

66 lines
2.5 KiB
Python

# passerelle-maarch
# Copyright (C) 2015 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# borrowed from https://pypi.python.org/pypi/suds_requests
# and https://docs.oracle.com/cd/E50245_01/E50253/html/vmprg-soap-example-authentication-python.html
import requests
try:
import cStringIO as StringIO
except ImportError:
import StringIO
from suds.transport.http import HttpAuthenticated
from suds.transport import Reply
from suds.client import Client
# FIXME: WSDL incomplet, ne reference pas le schema suivant
# voir https://fedorahosted.org/suds/ticket/220 pour le fix
from suds.xsd.doctor import ImportDoctor, Import
doctor = ImportDoctor(Import('http://schemas.xmlsoap.org/soap/encoding/'))
class MaarchSudsTransport(HttpAuthenticated):
def __init__(self, **kwargs):
self.maarch = kwargs.pop('maarch', None)
HttpAuthenticated.__init__(self, **kwargs) # oldstyle class...
def get_requests_kwargs(self):
kwargs = {}
if self.maarch.username:
kwargs['auth'] = (self.maarch.username, self.maarch.password)
if self.maarch.keystore:
kwargs['cert'] = (self.maarch.keystore.path, self.maarch.keystore.path)
if not self.maarch.verify_cert:
kwargs['verify'] = False
return kwargs
def open(self, request):
resp = requests.get(request.url, headers=request.headers,
**self.get_requests_kwargs())
return StringIO.StringIO(resp.content)
def send(self, request):
self.addcredentials(request)
resp = requests.post(request.url, data=request.message,
headers=request.headers, **self.get_requests_kwargs())
result = Reply(resp.status_code, resp.headers, resp.content)
return result
def get_client(maarch):
transport = MaarchSudsTransport(maarch=maarch)
return Client(maarch.wsdl_url, transport=transport, cache=None, doctor=doctor)