Merging upstream version 0.27.0.

This commit is contained in:
Mathias Behrle 2017-01-31 16:59:58 +01:00
parent fd71ad4720
commit c5291c55e4
99 changed files with 3346 additions and 1615 deletions

36
CHANGES
View File

@ -1,3 +1,39 @@
0.27.0 (2017-01-28)
-------------------
- Add support for SOAP attachments (multipart responses). (Dave Wapstra, #302)
- Update xsd:anyType to return the xml elements when no type is given via the
xsi:type attribute (#284)
- Fix parsing Any elements when a restriction is used (soap-enc:array) (#322)
0.26.0 (2017-01-26)
-------------------
This release again introduces some backwords incompatibilties. The next release
will hopefully be 1.0 which will introduce semver.
- **backwards-incompatible**: The Transport class now accepts a
``requests.Session()`` object instead of ``http_auth`` and ``verify``. This
allows for more flexibility.
- **backwards-incompatible**: Zeep no longer sets a default cache backend.
Please see http://docs.python-zeep.org/en/master/transport.html#caching for
information about how to configure a cache.
- Add ``zeep.xsd.SkipValue`` which instructs the serialize to ignore the
element.
- Support duplicate target namespaces in the wsdl definition (#320)
- Fix resolving element/types for xsd schema's with duplicate tns (#319)
0.25.0 (2017-01-23)
-------------------
- **Important:** Add basic validation against the xsd. It currently will only
validate the minOccurs/maxOccurs but this will be extended in the future.
- Add support for duplicate namespace definitions. Previously imports for
namespaces which were already imported were ignored. It will now search
through all matching schemas with the tns to find a specific object (#204)
- Fix xsd:extension for sequence -> choice. (#257)
- Improve serializing attributes when the values were passed as a dict (#125)
0.24.0 (2016-12-16)
-------------------
- Don't fail the parsing of responses if an xsi:type references an non-existing

39
LICENSE
View File

@ -21,6 +21,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
--
Parts of the XSD handling are heavily inspired by soapfish, see:
https://github.com/FlightDataServices/soapfish
@ -51,3 +53,40 @@ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
OF SUCH DAMAGE.
--
The support for BinarySecurityToken is from py-wsse, see:
https://github.com/orcasgit/py-wsse
Copyright (c) 2015 ORCAS, Inc
Some portions from py-soap-wsse (c) Michael van Tellingen
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials provided
with the distribution.
* Neither the name of the author nor the names of other
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: zeep
Version: 0.24.0
Version: 0.27.0
Summary: A modern/fast Python SOAP client based on lxml / requests
Home-page: http://docs.python-zeep.org
Author: Michael van Tellingen
@ -12,23 +12,14 @@ Description: ========================
A fast and modern Python SOAP client
| Website: http://docs.python-zeep.org/
| IRC: #python-zeep on Freenode
Highlights:
* Modern codebase compatible with Python 2.7, 3.3, 3.4, 3.5 and PyPy
* Compatible with Python 2.7, 3.3, 3.4, 3.5, 3.6 and PyPy
* Build on top of lxml and requests
* Supports recursive WSDL and XSD documents.
* Supports the xsd:choice and xsd:any elements.
* Uses the defusedxml module for handling potential XML security issues
* Support for WSSE (UsernameToken only for now)
* Experimental support for HTTP bindings
* Experimental support for WS-Addressing headers
* Support for Soap 1.1, Soap 1.2 and HTTP bindings
* Support for WS-Addressing headers
* Support for WSSE (UserNameToken / x.509 signing)
* Experimental support for asyncio via aiohttp (Python 3.5+)
Features still in development include:
* WSSE x.509 support (BinarySecurityToken)
* WS Policy support
Please see for more information the documentation at
http://docs.python-zeep.org/
@ -56,7 +47,7 @@ Description: ========================
To quickly inspect a WSDL file use::
python -mzeep <url-to-wsdl>
python -m zeep <url-to-wsdl>
Please see the documentation at http://docs.python-zeep.org for more
@ -85,5 +76,6 @@ Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy

View File

@ -4,23 +4,14 @@ Zeep: Python SOAP client
A fast and modern Python SOAP client
| Website: http://docs.python-zeep.org/
| IRC: #python-zeep on Freenode
Highlights:
* Modern codebase compatible with Python 2.7, 3.3, 3.4, 3.5 and PyPy
* Compatible with Python 2.7, 3.3, 3.4, 3.5, 3.6 and PyPy
* Build on top of lxml and requests
* Supports recursive WSDL and XSD documents.
* Supports the xsd:choice and xsd:any elements.
* Uses the defusedxml module for handling potential XML security issues
* Support for WSSE (UsernameToken only for now)
* Experimental support for HTTP bindings
* Experimental support for WS-Addressing headers
* Support for Soap 1.1, Soap 1.2 and HTTP bindings
* Support for WS-Addressing headers
* Support for WSSE (UserNameToken / x.509 signing)
* Experimental support for asyncio via aiohttp (Python 3.5+)
Features still in development include:
* WSSE x.509 support (BinarySecurityToken)
* WS Policy support
Please see for more information the documentation at
http://docs.python-zeep.org/
@ -71,7 +62,7 @@ Usage
To quickly inspect a WSDL file use::
python -mzeep <url-to-wsdl>
python -m zeep <url-to-wsdl>
Please see the documentation at http://docs.python-zeep.org for more

62
examples/async_client.py Normal file
View File

@ -0,0 +1,62 @@
import asyncio
import time
import zeep
from zeep.asyncio import AsyncTransport
def run_async():
print("async example")
print("=============")
result = []
def handle_future(future):
result.extend(future.result())
loop = asyncio.get_event_loop()
transport = AsyncTransport(loop, cache=None)
client = zeep.Client('http://localhost:8000/?wsdl', transport=transport)
tasks = [
client.service.slow_request('request-1'), # takes 1 sec
client.service.slow_request('request-2'), # takes 1 sec
]
future = asyncio.gather(*tasks, return_exceptions=True)
result = []
future.add_done_callback(handle_future)
st = time.time()
loop.run_until_complete(future)
loop.run_until_complete(transport.session.close())
print("time: %.2f" % (time.time() - st))
print("result: %s", result)
print("")
return result
def run_sync():
print("sync example")
print("============")
transport = zeep.Transport(cache=None)
client = zeep.Client('http://localhost:8000/?wsdl', transport=transport)
st = time.time()
result = [
client.service.slow_request('request-1'), # takes 1 sec
client.service.slow_request('request-2'), # takes 1 sec
]
print("Time: %.2f" % (time.time() - st))
print("result: %s", result)
print("\n")
return result
if __name__ == '__main__':
print("")
run_async()
run_sync()

8
examples/code39.py Normal file
View File

@ -0,0 +1,8 @@
from __future__ import print_function
import zeep
client = zeep.Client(
wsdl='http://www.webservicex.net/barcode.asmx?WSDL')
response = client.service.Code39('1234', 20, ShowCodeString=True, Title='ZEEP')
print(repr(response))

View File

@ -0,0 +1,5 @@
from zeep.client import Client
# RPC style soap service
client = Client('http://www.soapclient.com/xml/soapresponder.wsdl')
print(client.service.Method1('zeep', 'soap'))

View File

@ -0,0 +1,7 @@
from __future__ import print_function
import zeep
client = zeep.Client(
wsdl='http://ec.europa.eu/taxation_customs/vies/checkVatService.wsdl')
print(client.service.checkVat('NL', '170944128B01'))

View File

@ -0,0 +1,14 @@
from __future__ import print_function
import zeep
from zeep.transports import Transport
# Example using basic authentication with a webservice
transport_with_basic_auth = Transport(http_auth=('username', 'password'))
client = zeep.Client(
wsdl='http://nonexistent?WSDL',
transport=transport_with_basic_auth
)
client.wsdl.dump()

16
examples/km_to_miles.py Normal file
View File

@ -0,0 +1,16 @@
from __future__ import print_function
import zeep
client = zeep.Client(
wsdl='http://www.webservicex.net/ConvertSpeed.asmx?WSDL')
client.wsdl.dump()
print (client.service.ConvertSpeed(100, 'kilometersPerhour', 'milesPerhour'))
http_get = client.bind('ConvertSpeeds', 'ConvertSpeedsHttpGet')
http_post = client.bind('ConvertSpeeds', 'ConvertSpeedsHttpPost')
print(http_get.ConvertSpeed(100, 'kilometersPerhour', 'milesPerhour'))
print(http_post.ConvertSpeed(100, 'kilometersPerhour', 'milesPerhour'))

46
examples/soap_server.py Normal file
View File

@ -0,0 +1,46 @@
"""
Example soap server using spyne.
Run with
uwsgi --http :8000 \
--wsgi-file soap_server.py \
--virtualenv ~/.pyenv/versions/3.5.2/envs/zeep \
-p 10
"""
import time
from spyne import Application, ServiceBase, Unicode, rpc
from spyne.protocol.soap import Soap11
from spyne.server.wsgi import WsgiApplication
class ExampleService(ServiceBase):
@rpc(Unicode, _returns=Unicode)
def slow_request(ctx, request_id):
time.sleep(1)
return u'Request: %s' % request_id
application = Application(
services=[ExampleService],
tns='http://tests.python-zeep.org/',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11())
application = WsgiApplication(application)
if __name__ == '__main__':
import logging
from wsgiref.simple_server import make_server
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('spyne.protocol.xml').setLevel(logging.DEBUG)
logging.info("listening to http://127.0.0.1:8000")
logging.info("wsdl is at: http://localhost:8000/?wsdl")
server = make_server('127.0.0.1', 8000, application)
server.serve_forever()

View File

@ -1,9 +1,40 @@
[bumpversion]
current_version = 0.27.0
commit = true
tag = true
tag_name = {new_version}
[tool:pytest]
minversion = 3.0
strict = true
testpaths = tests
[wheel]
universal = 1
[flake8]
max-line-length = 99
[bumpversion:file:setup.py]
[bumpversion:file:docs/conf.py]
[bumpversion:file:src/zeep/__init__.py]
[coverage:run]
branch = True
source =
zeep
[coverage:paths]
source =
src/zeep
.tox/*/lib/python*/site-packages/zeep
.tox/pypy*/site-packages/zeep
[coverage:report]
show_missing = True
[egg_info]
tag_build =
tag_date = 0

View File

@ -1,4 +1,5 @@
import re
from setuptools import find_packages, setup
install_requires = [
@ -8,6 +9,7 @@ install_requires = [
'isodate>=0.5.4',
'lxml>=3.0.0',
'requests>=2.7.0',
'requests-toolbelt>=0.7.0',
'six>=1.9.0',
'pytz',
]
@ -16,17 +18,25 @@ docs_require = [
'sphinx>=1.4.0',
]
async_require = [
'aiohttp>=1.0',
]
xmlsec_require = [
'xmlsec>=0.6.1',
]
tests_require = [
'freezegun==0.3.7',
'freezegun==0.3.8',
'mock==2.0.0',
'pretend==1.0.8',
'pytest-cov==2.3.1',
'pytest==3.0.2',
'pytest-cov==2.4.0',
'pytest==3.0.6',
'requests_mock>=0.7.0',
# Linting
'isort==4.2.5',
'flake8==3.0.3',
'flake8==3.2.1',
'flake8-blind-except==0.1.1',
'flake8-debugger==1.4.0',
]
@ -37,7 +47,7 @@ with open('README.rst') as fh:
setup(
name='zeep',
version='0.24.0',
version='0.27.0',
description='A modern/fast Python SOAP client based on lxml / requests',
long_description=long_description,
author="Michael van Tellingen",
@ -49,6 +59,8 @@ setup(
extras_require={
'docs': docs_require,
'test': tests_require,
'async': async_require,
'xmlsec': xmlsec_require,
},
entry_points={},
package_dir={'': 'src'},
@ -65,6 +77,7 @@ setup(
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy',
],

View File

@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: zeep
Version: 0.24.0
Version: 0.27.0
Summary: A modern/fast Python SOAP client based on lxml / requests
Home-page: http://docs.python-zeep.org
Author: Michael van Tellingen
@ -12,23 +12,14 @@ Description: ========================
A fast and modern Python SOAP client
| Website: http://docs.python-zeep.org/
| IRC: #python-zeep on Freenode
Highlights:
* Modern codebase compatible with Python 2.7, 3.3, 3.4, 3.5 and PyPy
* Compatible with Python 2.7, 3.3, 3.4, 3.5, 3.6 and PyPy
* Build on top of lxml and requests
* Supports recursive WSDL and XSD documents.
* Supports the xsd:choice and xsd:any elements.
* Uses the defusedxml module for handling potential XML security issues
* Support for WSSE (UsernameToken only for now)
* Experimental support for HTTP bindings
* Experimental support for WS-Addressing headers
* Support for Soap 1.1, Soap 1.2 and HTTP bindings
* Support for WS-Addressing headers
* Support for WSSE (UserNameToken / x.509 signing)
* Experimental support for asyncio via aiohttp (Python 3.5+)
Features still in development include:
* WSSE x.509 support (BinarySecurityToken)
* WS Policy support
Please see for more information the documentation at
http://docs.python-zeep.org/
@ -56,7 +47,7 @@ Description: ========================
To quickly inspect a WSDL file use::
python -mzeep <url-to-wsdl>
python -m zeep <url-to-wsdl>
Please see the documentation at http://docs.python-zeep.org for more
@ -85,5 +76,6 @@ Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy

View File

@ -3,12 +3,20 @@ LICENSE
README.rst
setup.cfg
setup.py
examples/async_client.py
examples/code39.py
examples/echo_services.py
examples/eu_vat_service.py
examples/http_basic_auth.py
examples/km_to_miles.py
examples/soap_server.py
src/zeep/__init__.py
src/zeep/__main__.py
src/zeep/cache.py
src/zeep/client.py
src/zeep/exceptions.py
src/zeep/helpers.py
src/zeep/ns.py
src/zeep/parser.py
src/zeep/plugins.py
src/zeep/transports.py
@ -25,6 +33,7 @@ src/zeep/asyncio/__init__.py
src/zeep/asyncio/bindings.py
src/zeep/asyncio/transport.py
src/zeep/wsdl/__init__.py
src/zeep/wsdl/attachments.py
src/zeep/wsdl/definitions.py
src/zeep/wsdl/parse.py
src/zeep/wsdl/utils.py
@ -38,23 +47,38 @@ src/zeep/wsdl/messages/http.py
src/zeep/wsdl/messages/mime.py
src/zeep/wsdl/messages/soap.py
src/zeep/wsse/__init__.py
src/zeep/wsse/compose.py
src/zeep/wsse/signature.py
src/zeep/wsse/username.py
src/zeep/wsse/utils.py
src/zeep/xsd/__init__.py
src/zeep/xsd/builtins.py
src/zeep/xsd/const.py
src/zeep/xsd/context.py
src/zeep/xsd/elements.py
src/zeep/xsd/indicators.py
src/zeep/xsd/parser.py
src/zeep/xsd/printer.py
src/zeep/xsd/schema.py
src/zeep/xsd/types.py
src/zeep/xsd/utils.py
src/zeep/xsd/valueobjects.py
src/zeep/xsd/visitor.py
src/zeep/xsd/elements/__init__.py
src/zeep/xsd/elements/any.py
src/zeep/xsd/elements/attribute.py
src/zeep/xsd/elements/base.py
src/zeep/xsd/elements/builtins.py
src/zeep/xsd/elements/element.py
src/zeep/xsd/elements/indicators.py
src/zeep/xsd/elements/references.py
src/zeep/xsd/types/__init__.py
src/zeep/xsd/types/any.py
src/zeep/xsd/types/base.py
src/zeep/xsd/types/builtins.py
src/zeep/xsd/types/collection.py
src/zeep/xsd/types/complex.py
src/zeep/xsd/types/simple.py
tests/__init__.py
tests/cert_valid.pem
tests/cert_valid_pw.pem
tests/conftest.py
tests/test_asyncio_transport.py
tests/test_cache.py
tests/test_client.py
tests/test_client_factory.py
@ -70,6 +94,7 @@ tests/test_wsdl_messages_document.py
tests/test_wsdl_messages_http.py
tests/test_wsdl_messages_rpc.py
tests/test_wsdl_soap.py
tests/test_wsse_signature.py
tests/test_wsse_username.py
tests/test_wsse_utils.py
tests/test_xsd.py
@ -86,6 +111,7 @@ tests/test_xsd_signatures.py
tests/test_xsd_simple_types.py
tests/test_xsd_types.py
tests/test_xsd_union.py
tests/test_xsd_validation.py
tests/test_xsd_valueobjects.py
tests/test_xsd_visitor.py
tests/utils.py
@ -99,6 +125,7 @@ tests/integration/test_hello_world_recursive.py
tests/integration/test_http_post.py
tests/integration/test_http_post.wsdl
tests/integration/test_recursive_schema.py
tests/wsdl_files/claim.wsdl
tests/wsdl_files/http.wsdl
tests/wsdl_files/soap-enc.xsd
tests/wsdl_files/soap.wsdl

View File

@ -4,20 +4,27 @@ defusedxml>=0.4.1
isodate>=0.5.4
lxml>=3.0.0
requests>=2.7.0
requests-toolbelt>=0.7.0
six>=1.9.0
pytz
[async]
aiohttp>=1.0
[docs]
sphinx>=1.4.0
[test]
freezegun==0.3.7
freezegun==0.3.8
mock==2.0.0
pretend==1.0.8
pytest-cov==2.3.1
pytest==3.0.2
pytest-cov==2.4.0
pytest==3.0.6
requests_mock>=0.7.0
isort==4.2.5
flake8==3.0.3
flake8==3.2.1
flake8-blind-except==0.1.1
flake8-debugger==1.4.0
[xmlsec]
xmlsec>=0.6.1

View File

@ -1,5 +1,6 @@
from zeep.client import Client # noqa
from zeep.transports import Transport # noqa
from zeep.plugins import Plugin # noqa
from zeep.xsd.valueobjects import AnyObject # noqa
__version__ = '0.24.0'
__version__ = '0.27.0'

View File

@ -5,8 +5,9 @@ import logging
import logging.config
import time
import requests
from six.moves.urllib.parse import urlparse
from zeep.cache import InMemoryCache, SqliteCache
from zeep.cache import SqliteCache
from zeep.client import Client
from zeep.transports import Transport
@ -59,17 +60,17 @@ def main(args):
profile = cProfile.Profile()
profile.enable()
cache = SqliteCache() if args.cache else InMemoryCache()
transport_kwargs = {'cache': cache}
cache = SqliteCache() if args.cache else None
session = requests.Session()
if args.no_verify:
transport_kwargs['verify'] = False
session.verify = False
result = urlparse(args.wsdl_file)
if result.username or result.password:
transport_kwargs['http_auth'] = (result.username, result.password)
session.auth = (result.username, result.password)
transport = Transport(**transport_kwargs)
transport = Transport(cache=cache, session=session)
st = time.time()
client = Client(args.wsdl_file, transport=transport)
logger.debug("Loading WSDL took %sms", (time.time() - st) * 1000)

View File

@ -3,10 +3,11 @@ Adds asyncio support to Zeep. Contains Python 3.5+ only syntax!
"""
import asyncio
import logging
import aiohttp
from zeep.transports import Transport
from zeep.utils import get_version
from zeep.wsdl.utils import etree_to_string
__all__ = ['AsyncTransport']
@ -16,21 +17,22 @@ class AsyncTransport(Transport):
"""Asynchronous Transport class using aiohttp."""
supports_async = True
def __init__(self, loop, *args, **kwargs):
def __init__(self, loop, cache=None, timeout=300, operation_timeout=None,
session=None):
self.loop = loop if loop else asyncio.get_event_loop()
super().__init__(*args, **kwargs)
self.cache = cache
self.load_timeout = timeout
self.operation_timeout = operation_timeout
self.logger = logging.getLogger(__name__)
def create_session(self):
connector = aiohttp.TCPConnector(verify_ssl=self.http_verify)
return aiohttp.ClientSession(
connector=connector,
loop=self.loop,
headers=self.http_headers,
auth=self.http_auth)
self.session = session or aiohttp.ClientSession(loop=self.loop)
self.session._default_headers['User-Agent'] = (
'Zeep/%s (www.python-zeep.org)' % (get_version()))
def _load_remote_data(self, url):
result = None
async def _load_remote_data_async():
nonlocal result
with aiohttp.Timeout(self.load_timeout):

View File

@ -5,12 +5,6 @@ from contextlib import contextmanager
from zeep.transports import Transport
from zeep.wsdl import Document
NSMAP = {
'xsd': 'http://www.w3.org/2001/XMLSchema',
'soap': 'http://schemas.xmlsoap.org/wsdl/soap/',
'soap-env': 'http://schemas.xmlsoap.org/soap/envelope/',
}
logger = logging.getLogger(__name__)
@ -75,6 +69,21 @@ class Factory(object):
class Client(object):
"""The zeep Client.
:param wsdl:
:param wsse:
:param transport: Custom transport class.
:param service_name: The service name for the service binding. Defaults to
the first service in the WSDL document.
:param port_name: The port name for the default binding. Defaults to the
first port defined in the service element in the WSDL
document.
:param plugins: a list of Plugin instances
"""
def __init__(self, wsdl, wsse=None, transport=None,
service_name=None, port_name=None, plugins=None):
@ -110,14 +119,15 @@ class Client(object):
def options(self, timeout):
"""Context manager to temporarily overrule various options.
Example::
:param timeout: Set the timeout for POST/GET operations (not used for
loading external WSDL or XSD documents)
To for example set the timeout to 10 seconds use::
client = zeep.Client('foo.wsdl')
with client.options(timeout=10):
client.service.fast_call()
:param timeout: Set the timeout for POST/GET operations (not used for
loading external WSDL or XSD documents)
"""
with self.transport._options(timeout=timeout):
@ -134,19 +144,8 @@ class Client(object):
if not self.wsdl.services:
return
if service_name:
service = self.wsdl.services.get(service_name)
if not service:
raise ValueError("Service not found")
else:
service = next(iter(self.wsdl.services.values()), None)
if port_name:
port = service.ports.get(port_name)
if not port:
raise ValueError("Port not found")
else:
port = list(service.ports.values())[0]
service = self._get_service(service_name)
port = self._get_port(service, port_name)
return ServiceProxy(self, port.binding, **port.binding_options)
def create_service(self, binding_name, address):
@ -164,16 +163,40 @@ class Client(object):
"are: %s" % (', '.join(self.wsdl.bindings.keys())))
return ServiceProxy(self, binding, address=address)
def create_message(self, operation, service_name=None, port_name=None,
args=None, kwargs=None):
"""Create the payload for the given operation."""
service = self._get_service(service_name)
port = self._get_port(service, port_name)
args = args or tuple()
kwargs = kwargs or {}
envelope, http_headers = port.binding._create(operation, args, kwargs)
return envelope
def type_factory(self, namespace):
"""Return a type factory for the given namespace.
Example::
factory = client.type_factory('ns0')
user = factory.User(name='John')
"""
return Factory(self.wsdl.types, 'type', namespace)
def get_type(self, name):
"""Return the type for the given qualified name."""
return self.wsdl.types.get_type(name)
def get_element(self, name):
"""Return the element for the given qualified name."""
return self.wsdl.types.get_element(name)
def set_ns_prefix(self, prefix, namespace):
"""Set a shortcut for the given namespace.
"""
self.wsdl.types.set_ns_prefix(prefix, namespace)
def set_default_soapheaders(self, headers):
@ -186,3 +209,21 @@ class Client(object):
"""
self._default_soapheaders = headers
def _get_port(self, service, name):
if name:
port = service.ports.get(name)
if not port:
raise ValueError("Port not found")
else:
port = list(service.ports.values())[0]
return port
def _get_service(self, name):
if name:
service = self.wsdl.services.get(name)
if not service:
raise ValueError("Service not found")
else:
service = next(iter(self.wsdl.services.values()), None)
return service

View File

@ -1,5 +1,5 @@
class Error(Exception):
def __init__(self, message):
def __init__(self, message=''):
super(Exception, self).__init__(message)
self.message = message
@ -12,7 +12,18 @@ class XMLSyntaxError(Error):
class XMLParseError(Error):
pass
def __init__(self, *args, **kwargs):
self.filename = kwargs.pop('filename', None)
self.sourceline = kwargs.pop('sourceline', None)
super(XMLParseError, self).__init__(*args, **kwargs)
def __str__(self):
location = None
if self.filename and self.sourceline:
location = '%s:%s' % (self.filename, self.sourceline)
if location:
return '%s (%s)' % (self.message, location)
return self.message
class UnexpectedElementError(Error):
@ -47,3 +58,19 @@ class Fault(Error):
class ZeepWarning(RuntimeWarning):
pass
class ValidationError(Error):
def __init__(self, *args, **kwargs):
self.path = kwargs.pop('path', [])
super(ValidationError, self).__init__(*args, **kwargs)
def __str__(self):
if self.path:
path = '.'.join(str(x) for x in self.path)
return '%s (%s)' % (self.message, path)
return self.message
class SignatureVerificationFailed(Error):
pass

View File

@ -1,25 +1,77 @@
import datetime
from collections import OrderedDict
from lxml import etree
from zeep import xsd
from zeep.xsd.valueobjects import CompoundValue
def serialize_object(obj):
def serialize_object(obj, target_cls=OrderedDict):
"""Serialize zeep objects to native python data structures"""
if obj is None:
return obj
if isinstance(obj, etree._Element):
return obj
if isinstance(obj, list):
return [serialize_object(sub) for sub in obj]
return [serialize_object(sub, target_cls) for sub in obj]
result = OrderedDict()
for key in obj:
value = obj[key]
if isinstance(value, (list, CompoundValue)):
value = serialize_object(value)
result[key] = value
return result
if isinstance(obj, (dict, CompoundValue)):
result = target_cls()
for key in obj:
result[key] = serialize_object(obj[key], target_cls)
return result
return obj
def create_xml_soap_map(values):
"""Create an http://xml.apache.org/xml-soap#Map value."""
Map = xsd.ComplexType(
xsd.Sequence([
xsd.Element(
'item',
xsd.AnyType(),
min_occurs=1,
max_occurs="unbounded"),
]),
qname=etree.QName('{http://xml.apache.org/xml-soap}Map'))
KeyValueData = xsd.Element(
'{http://xml.apache.org/xml-soap}KeyValueData',
xsd.ComplexType(
xsd.Sequence([
xsd.Element(
'key',
xsd.AnyType(),
),
xsd.Element(
'value',
xsd.AnyType(),
),
]),
),
)
return Map(item=[
KeyValueData(
xsd.AnyObject(xsd.String(), key),
xsd.AnyObject(guess_xsd_type(value), value)
) for key, value in values.items()
])
def guess_xsd_type(obj):
"""Return the XSD Type for the given object"""
if isinstance(obj, bool):
return xsd.Boolean()
if isinstance(obj, int):
return xsd.Integer()
if isinstance(obj, float):
return xsd.Float()
if isinstance(obj, datetime.datetime):
return xsd.DateTime()
if isinstance(obj, datetime.date):
return xsd.Date()
return xsd.String()
def Nil():
"""Return an xsi:nil element"""
return xsd.AnyObject(None, None)

18
src/zeep/ns.py Normal file
View File

@ -0,0 +1,18 @@
SOAP_11 = 'http://schemas.xmlsoap.org/wsdl/soap/'
SOAP_12 = 'http://schemas.xmlsoap.org/wsdl/soap12/'
SOAP_ENV_11 = 'http://schemas.xmlsoap.org/soap/envelope/'
SOAP_ENV_12 = 'http://www.w3.org/2003/05/soap-envelope'
XSD = 'http://www.w3.org/2001/XMLSchema'
WSDL = 'http://schemas.xmlsoap.org/wsdl/'
HTTP = 'http://schemas.xmlsoap.org/wsdl/http/'
MIME = 'http://schemas.xmlsoap.org/wsdl/mime/'
WSA = 'http://www.w3.org/2005/08/addressing'
DS = 'http://www.w3.org/2000/09/xmldsig#'
WSSE = 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd'
WSU = 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd'

View File

@ -8,7 +8,8 @@ from zeep.exceptions import XMLSyntaxError
def parse_xml(content, base_url=None, recover=False):
parser = etree.XMLParser(remove_comments=True, recover=recover)
parser = etree.XMLParser(
remove_comments=True, recover=recover, resolve_entities=False)
try:
return fromstring(content, parser=parser, base_url=base_url)
except etree.XMLSyntaxError as exc:
@ -24,7 +25,7 @@ def load_external(url, transport, base_url=None):
def absolute_location(location, base):
if location == base or location.startswith('intschema'):
if location == base:
return location
if urlparse(location).scheme in ('http', 'https'):
@ -36,5 +37,13 @@ def absolute_location(location, base):
if os.path.isabs(location):
return location
if base:
return os.path.join(os.path.dirname(base), location)
return os.path.realpath(
os.path.join(os.path.dirname(base), location))
return location
def is_relative_path(value):
"""Check if the given value is a relative path"""
if urlparse(value).scheme in ('http', 'https', 'file'):
return False
return not os.path.isabs(value)

View File

@ -5,9 +5,25 @@ class Plugin(object):
"""Base plugin"""
def ingress(self, envelope, http_headers, operation):
"""Override to update the envelope or http headers when receiving a
message.
:param envelope: The envelope as XML node
:param http_headers: Dict with the HTTP headers
"""
return envelope, http_headers
def egress(self, envelope, http_headers, operation, binding_options):
"""Override to update the envelope or http headers when sending a
message.
:param envelope: The envelope as XML node
:param http_headers: Dict with the HTTP headers
:param operation: The associated Operation instance
:param binding_options: Binding specific options for the operation
"""
return envelope, http_headers

View File

@ -5,45 +5,32 @@ from contextlib import contextmanager
import requests
from six.moves.urllib.parse import urlparse
from zeep.cache import SqliteCache
from zeep.utils import NotSet, get_version
from zeep.utils import get_version
from zeep.wsdl.utils import etree_to_string
class Transport(object):
"""The transport object handles all communication to the SOAP server.
:param cache: The cache object to be used to cache GET requests
:param timeout: The timeout for loading wsdl and xsd documents.
:param operation_timeout: The timeout for operations (POST/GET). By
default this is None (no timeout).
:param session: A request.Session() object (optional)
"""
supports_async = False
def __init__(self, cache=NotSet, timeout=300, operation_timeout=None,
verify=True, http_auth=None):
"""The transport object handles all communication to the SOAP server.
:param cache: The cache object to be used to cache GET requests
:param timeout: The timeout for loading wsdl and xsd documents.
:param operation_timeout: The timeout for operations (POST/GET). By
default this is None (no timeout).
:param verify: Boolean to indicate if the SSL certificate needs to be
verified.
:param http_auth: HTTP authentication, passed to requests.
"""
self.cache = SqliteCache() if cache is NotSet else cache
def __init__(self, cache=None, timeout=300, operation_timeout=None,
session=None):
self.cache = cache
self.load_timeout = timeout
self.operation_timeout = operation_timeout
self.logger = logging.getLogger(__name__)
self.http_verify = verify
self.http_auth = http_auth
self.http_headers = {
'User-Agent': 'Zeep/%s (www.python-zeep.org)' % (get_version())
}
self.session = self.create_session()
def create_session(self):
session = requests.Session()
session.verify = self.http_verify
session.auth = self.http_auth
session.headers = self.http_headers
return session
self.session = session or requests.Session()
self.session.headers['User-Agent'] = (
'Zeep/%s (www.python-zeep.org)' % (get_version()))
def get(self, address, params, headers):
"""Proxy to requests.get()
@ -81,9 +68,12 @@ class Transport(object):
timeout=self.operation_timeout)
if self.logger.isEnabledFor(logging.DEBUG):
log_message = response.content
if isinstance(log_message, bytes):
log_message = log_message.decode('utf-8')
if 'multipart/related' in response.headers.get('Content-Type'):
log_message = response.content
else:
log_message = response.content
if isinstance(log_message, bytes):
log_message = log_message.decode('utf-8')
self.logger.debug(
"HTTP Response from %s (status: %d):\n%s",

View File

@ -1,17 +1,8 @@
import inspect
from lxml import etree
class _NotSetClass(object):
def __repr__(self):
return 'NotSet'
NotSet = _NotSetClass()
def qname_attr(node, attr_name, target_namespace=None):
value = node.get(attr_name)
if value is not None:
@ -65,3 +56,8 @@ def get_base_class(objects):
break
base_class = bases[0][i]
return base_class
def detect_soap_env(envelope):
root_tag = etree.QName(envelope)
return root_tag.namespace

View File

@ -3,15 +3,16 @@ import uuid
from lxml import etree
from lxml.builder import ElementMaker
from zeep import ns
from zeep.plugins import Plugin
from zeep.wsdl.utils import get_or_create_header
WSA = ElementMaker(namespace='http://www.w3.org/2005/08/addressing')
WSA = ElementMaker(namespace=ns.WSA, nsmap={'wsa': ns.WSA})
class WsAddressingPlugin(Plugin):
nsmap = {
'wsa': 'http://www.w3.org/2005/08/addressing'
'wsa': ns.WSA
}
def egress(self, envelope, http_headers, operation, binding_options):

View File

@ -0,0 +1,63 @@
"""Basic implementation to support SOAP-Attachments
See https://www.w3.org/TR/SOAP-attachments
"""
import base64
from io import BytesIO
from cached_property import cached_property
from requests.structures import CaseInsensitiveDict
class MessagePack(object):
def __init__(self, parts):
self._parts = parts
def __repr__(self):
return '<MessagePack(attachments=[%s])>' % (
', '.join(repr(a) for a in self.attachments))
@property
def root(self):
return self._root
def _set_root(self, root):
self._root = root
@cached_property
def attachments(self):
return [Attachment(part) for part in self._parts]
def get_by_content_id(self, content_id):
for attachment in self.attachments:
if attachment.content_id == content_id:
return attachment
class Attachment(object):
def __init__(self, part):
self.headers = CaseInsensitiveDict({
k.decode(part.encoding): v.decode(part.encoding)
for k, v in part.headers.items()
})
self.content_type = self.headers.get('Content-Type', None)
self.content_id = self.headers.get('Content-ID', None)
self.content_location = self.headers.get('Content-Location', None)
self._part = part
def __repr__(self):
return '<Attachment(%r, %r)>' % (self.content_id, self.content_type)
@cached_property
def content(self):
encoding = self.headers.get('Content-Transfer-Encoding', None)
content = self._part.content
if encoding == 'base64':
return base64.b64decode(content)
elif encoding == 'binary':
return content
else:
return content

View File

@ -3,6 +3,7 @@ import logging
import six
from lxml import etree
from zeep import ns
from zeep.exceptions import Fault
from zeep.utils import qname_attr
from zeep.wsdl import messages
@ -11,9 +12,9 @@ from zeep.wsdl.definitions import Binding, Operation
logger = logging.getLogger(__name__)
NSMAP = {
'http': 'http://schemas.xmlsoap.org/wsdl/http/',
'wsdl': 'http://schemas.xmlsoap.org/wsdl/',
'mime': 'http://schemas.xmlsoap.org/wsdl/mime/',
'http': ns.HTTP,
'wsdl': ns.WSDL,
'mime': ns.MIME,
}
@ -114,7 +115,7 @@ class HttpGetBinding(HttpBinding):
:type node: lxml.etree._Element
"""
http_node = node.find(etree.QName(NSMAP['http'], 'binding'))
http_node = node.find(etree.QName(ns.HTTP, 'binding'))
return http_node is not None and http_node.get('verb') == 'GET'
@ -158,13 +159,13 @@ class HttpOperation(Operation):
message_node = node.getchildren()[0]
message_class = None
if message_node is not None:
if message_node.tag == etree.QName(NSMAP['http'], 'urlEncoded'):
if message_node.tag == etree.QName(ns.HTTP, 'urlEncoded'):
message_class = messages.UrlEncoded
elif message_node.tag == etree.QName(NSMAP['http'], 'urlReplacement'):
elif message_node.tag == etree.QName(ns.HTTP, 'urlReplacement'):
message_class = messages.UrlReplacement
elif message_node.tag == etree.QName(NSMAP['mime'], 'content'):
elif message_node.tag == etree.QName(ns.MIME, 'content'):
message_class = messages.MimeContent
elif message_node.tag == etree.QName(NSMAP['mime'], 'mimeXml'):
elif message_node.tag == etree.QName(ns.MIME, 'mimeXml'):
message_class = messages.MimeXML
if message_class:

View File

@ -1,14 +1,16 @@
import logging
from lxml import etree
from requests_toolbelt.multipart.decoder import MultipartDecoder
from zeep import plugins, wsa
from zeep import ns, plugins, wsa
from zeep.exceptions import Fault, TransportError, XMLSyntaxError
from zeep.parser import parse_xml
from zeep.utils import as_qname, qname_attr
from zeep.wsdl.attachments import MessagePack
from zeep.wsdl.definitions import Binding, Operation
from zeep.wsdl.messages import DocumentMessage, RpcMessage
from zeep.wsdl.utils import etree_to_string
from zeep.wsdl.utils import etree_to_string, url_http_to_https
logger = logging.getLogger(__name__)
@ -83,7 +85,7 @@ class SoapBinding(Binding):
# Apply WSSE
if client.wsse:
envelope, http_headers = client.wsse.sign(envelope, http_headers)
envelope, http_headers = client.wsse.apply(envelope, http_headers)
return envelope, http_headers
def send(self, client, options, operation, args, kwargs):
@ -128,8 +130,18 @@ class SoapBinding(Binding):
u'Server returned HTTP status %d (no content available)'
% response.status_code)
content_type = response.headers.get('Content-Type', 'text/xml')
if 'multipart/related' in content_type:
decoder = MultipartDecoder(response.content, content_type, 'utf-8')
content = decoder.parts[0].content
if len(decoder.parts) > 1:
message_pack = MessagePack(parts=decoder.parts[1:])
else:
content = response.content
message_pack = None
try:
doc = parse_xml(response.content, recover=True)
doc = parse_xml(content)
except XMLSyntaxError:
raise TransportError(
u'Server returned HTTP status %d (%s)'
@ -148,7 +160,12 @@ class SoapBinding(Binding):
if response.status_code != 200 or fault_node is not None:
return self.process_error(doc, operation)
return operation.process_reply(doc)
result = operation.process_reply(doc)
if message_pack:
message_pack._set_root(result)
return message_pack
return result
def process_error(self, doc, operation):
raise NotImplementedError
@ -158,9 +175,10 @@ class SoapBinding(Binding):
# Force the usage of HTTPS when the force_https boolean is true
location = address_node.get('location')
if force_https and location and location.startswith('http://'):
logger.warning("Forcing soap:address location to HTTPS")
location = 'https://' + location[7:]
if force_https and location:
location = url_http_to_https(location)
if location != address_node.get('location'):
logger.warning("Forcing soap:address location to HTTPS")
return {
'address': location
@ -207,10 +225,10 @@ class SoapBinding(Binding):
class Soap11Binding(SoapBinding):
nsmap = {
'soap': 'http://schemas.xmlsoap.org/wsdl/soap/',
'soap-env': 'http://schemas.xmlsoap.org/soap/envelope/',
'wsdl': 'http://schemas.xmlsoap.org/wsdl/',
'xsd': 'http://www.w3.org/2001/XMLSchema',
'soap': ns.SOAP_11,
'soap-env': ns.SOAP_ENV_11,
'wsdl': ns.WSDL,
'xsd': ns.XSD,
}
def process_error(self, doc, operation):
@ -241,10 +259,10 @@ class Soap11Binding(SoapBinding):
class Soap12Binding(SoapBinding):
nsmap = {
'soap': 'http://schemas.xmlsoap.org/wsdl/soap12/',
'soap-env': 'http://www.w3.org/2003/05/soap-envelope',
'wsdl': 'http://schemas.xmlsoap.org/wsdl/',
'xsd': 'http://www.w3.org/2001/XMLSchema',
'soap': ns.SOAP_12,
'soap-env': ns.SOAP_ENV_12,
'wsdl': ns.WSDL,
'xsd': ns.XSD,
}
def process_error(self, doc, operation):

View File

@ -2,7 +2,7 @@ import six
from defusedxml.lxml import fromstring
from lxml import etree
from zeep import xsd
from zeep import ns, xsd
from zeep.helpers import serialize_object
from zeep.wsdl.messages.base import ConcreteMessage, SerializedMessage
from zeep.wsdl.utils import etree_to_string
@ -16,7 +16,7 @@ __all__ = [
class MimeMessage(ConcreteMessage):
_nsmap = {
'mime': 'http://schemas.xmlsoap.org/wsdl/mime/',
'mime': ns.MIME,
}
def __init__(self, wsdl, name, operation, part_name):

View File

@ -272,8 +272,12 @@ class SoapMessage(ConcreteMessage):
"""
all_elements = xsd.Sequence([
xsd.Element('body', self.body.type),
xsd.Element('header', self.header.type),
])
if self.header.type._element:
all_elements.append(
xsd.Element('header', self.header.type))
return xsd.Element('envelope', xsd.ComplexType(all_elements))
def _serialize_header(self, headers_value, nsmap):
@ -299,8 +303,12 @@ class SoapMessage(ConcreteMessage):
raise ValueError(
"_soapheaders only accepts a dictionary if the wsdl "
"defines the headers.")
# Only render headers for which we have a value
headers_value = self.header(**headers_value)
self.header.type.render(header, headers_value)
for name, elm in self.header.type.elements:
if name in headers_value and headers_value[name] is not None:
elm.render(header, headers_value[name], ['header', name])
else:
raise ValueError("Invalid value given to _soapheaders")

View File

@ -1,12 +1,14 @@
from lxml import etree
from six.moves.urllib.parse import urlparse, urlunparse
from zeep.utils import detect_soap_env
def get_or_create_header(envelope):
# find the namespace of the SOAP Envelope (because it's different for SOAP 1.1 and 1.2)
root_tag = etree.QName(envelope)
soap_envelope_namespace = root_tag.namespace
soap_env = detect_soap_env(envelope)
# look for the Header element and create it if not found
header_qname = '{%s}Header' % soap_envelope_namespace
header_qname = '{%s}Header' % soap_env
header = envelope.find(header_qname)
if header is None:
header = etree.Element(header_qname)
@ -17,3 +19,17 @@ def get_or_create_header(envelope):
def etree_to_string(node):
return etree.tostring(
node, pretty_print=True, xml_declaration=True, encoding='utf-8')
def url_http_to_https(value):
parts = urlparse(value)
if parts.scheme != 'http':
return value
# Check if the url contains ':80' and remove it if that is the case
netloc_parts = parts.netloc.rsplit(':', 1)
if len(netloc_parts) == 2 and netloc_parts[1] == '80':
netloc = netloc_parts[0]
else:
netloc = parts.netloc
return urlunparse(('https', netloc) + parts[2:])

View File

@ -2,16 +2,17 @@ from __future__ import print_function
import logging
import operator
import os
from collections import OrderedDict
import six
from lxml import etree
from zeep.parser import absolute_location, load_external, parse_xml
from zeep.parser import (
absolute_location, is_relative_path, load_external, parse_xml)
from zeep.utils import findall_multiple_ns
from zeep.wsdl import parse
from zeep.xsd import Schema
from zeep.xsd.context import ParserContext
NSMAP = {
'wsdl': 'http://schemas.xmlsoap.org/wsdl/',
@ -35,7 +36,7 @@ class Document(object):
"""
def __init__(self, location, transport):
def __init__(self, location, transport, base=None):
"""Initialize a WSDL document.
The root definition properties are exposed as entry points.
@ -46,15 +47,18 @@ class Document(object):
:type transport: zeep.transports.Transport
"""
self.location = location if not hasattr(location, 'read') else None
if isinstance(location, six.string_types):
if is_relative_path(location):
location = os.path.abspath(location)
self.location = location
else:
self.location = base
self.transport = transport
# Dict with all definition objects within this WSDL
self._definitions = {}
self.types = Schema([], transport=self.transport)
# Dict with internal schema objects, used for lxml.ImportResolver
self._parser_context = ParserContext()
self.types = Schema([], transport=self.transport, location=self.location)
document = self._load_content(location)
@ -126,6 +130,10 @@ class Document(object):
return parse_xml(location.read())
return load_external(location, self.transport, self.location)
def _add_definition(self, definition):
key = (definition.target_namespace, definition.location)
self._definitions[key] = definition
class Definition(object):
"""The Definition represents one wsdl:definition within a Document."""
@ -145,7 +153,7 @@ class Definition(object):
self._resolved_imports = False
self.target_namespace = doc.get('targetNamespace')
self.wsdl._definitions[self.target_namespace] = self
self.wsdl._add_definition(self)
self.nsmap = doc.nsmap
# Process the definitions
@ -218,18 +226,20 @@ class Definition(object):
"""
for import_node in doc.findall("wsdl:import", namespaces=NSMAP):
location = import_node.get('location')
namespace = import_node.get('namespace')
if namespace in self.wsdl._definitions:
self.imports[namespace] = self.wsdl._definitions[namespace]
location = import_node.get('location')
location = absolute_location(location, self.location)
key = (namespace, location)
if key in self.wsdl._definitions:
self.imports[key] = self.wsdl._definitions[key]
else:
document = self.wsdl._load_content(location)
location = absolute_location(location, self.location)
if etree.QName(document.tag).localname == 'schema':
self.types.add_documents([document], location)
else:
wsdl = Definition(self.wsdl, document, location)
self.imports[namespace] = wsdl
self.imports[key] = wsdl
def parse_types(self, doc):
"""Return an xsd.Schema() instance for the given wsdl:types element.

View File

@ -0,0 +1,3 @@
from .compose import Compose # noqa
from .signature import Signature # noqa
from .username import UsernameToken # noqa

12
src/zeep/wsse/compose.py Normal file
View File

@ -0,0 +1,12 @@
class Compose(object):
def __init__(self, wsse_objects):
self.wsse_objects = wsse_objects
def apply(self, envelope, headers):
for obj in self.wsse_objects:
envelope, headers = obj.apply(envelope, headers)
return envelope, headers
def verify(self, envelope):
for obj in self.wsse_objects:
obj.verify(envelope)

255
src/zeep/wsse/signature.py Normal file
View File

@ -0,0 +1,255 @@
"""Functions for WS-Security (WSSE) signature creation and verification.
Heavily based on test examples in https://github.com/mehcode/python-xmlsec as
well as the xmlsec documentation at https://www.aleksey.com/xmlsec/.
Reading the xmldsig, xmlenc, and ws-security standards documents, though
admittedly painful, will likely assist in understanding the code in this
module.
"""
from lxml import etree
from lxml.etree import QName
try:
import xmlsec
except ImportError:
xmlsec = None
from zeep import ns
from zeep.utils import detect_soap_env
from zeep.exceptions import SignatureVerificationFailed
from zeep.wsse.utils import ensure_id, get_security_header
# SOAP envelope
SOAP_NS = 'http://schemas.xmlsoap.org/soap/envelope/'
class Signature(object):
"""Sign given SOAP envelope with WSSE sig using given key and cert."""
def __init__(self, key_file, certfile, password=None):
check_xmlsec_import()
self.key_file = key_file
self.certfile = certfile
self.password = password
def apply(self, envelope, headers):
sign_envelope(envelope, self.key_file, self.certfile, self.password)
return envelope, headers
def verify(self, envelope):
verify_envelope(envelope, self.certfile)
return envelope
def check_xmlsec_import():
if xmlsec is None:
raise ImportError(
"The xmlsec module is required for wsse.Signature()\n" +
"You can install xmlsec with: pip install xmlsec\n" +
"or install zeep via: pip install zeep[xmlsec]\n"
)
def sign_envelope(envelope, keyfile, certfile, password=None):
"""Sign given SOAP envelope with WSSE sig using given key and cert.
Sign the wsu:Timestamp node in the wsse:Security header and the soap:Body;
both must be present.
Add a ds:Signature node in the wsse:Security header containing the
signature.
Use EXCL-C14N transforms to normalize the signed XML (so that irrelevant
whitespace or attribute ordering changes don't invalidate the
signature). Use SHA1 signatures.
Expects to sign an incoming document something like this (xmlns attributes
omitted for readability):
<soap:Envelope>
<soap:Header>
<wsse:Security mustUnderstand="true">
<wsu:Timestamp>
<wsu:Created>2015-06-25T21:53:25.246276+00:00</wsu:Created>
<wsu:Expires>2015-06-25T21:58:25.246276+00:00</wsu:Expires>
</wsu:Timestamp>
</wsse:Security>
</soap:Header>
<soap:Body>
...
</soap:Body>
</soap:Envelope>
After signing, the sample document would look something like this (note the
added wsu:Id attr on the soap:Body and wsu:Timestamp nodes, and the added
ds:Signature node in the header, with ds:Reference nodes with URI attribute
referencing the wsu:Id of the signed nodes):
<soap:Envelope>
<soap:Header>
<wsse:Security mustUnderstand="true">
<Signature xmlns="http://www.w3.org/2000/09/xmldsig#">
<SignedInfo>
<CanonicalizationMethod
Algorithm="http://www.w3.org/2001/10/xml-exc-c14n#"/>
<SignatureMethod
Algorithm="http://www.w3.org/2000/09/xmldsig#rsa-sha1"/>
<Reference URI="#id-d0f9fd77-f193-471f-8bab-ba9c5afa3e76">
<Transforms>
<Transform
Algorithm="http://www.w3.org/2001/10/xml-exc-c14n#"/>
</Transforms>
<DigestMethod
Algorithm="http://www.w3.org/2000/09/xmldsig#sha1"/>
<DigestValue>nnjjqTKxwl1hT/2RUsBuszgjTbI=</DigestValue>
</Reference>
<Reference URI="#id-7c425ac1-534a-4478-b5fe-6cae0690f08d">
<Transforms>
<Transform
Algorithm="http://www.w3.org/2001/10/xml-exc-c14n#"/>
</Transforms>
<DigestMethod
Algorithm="http://www.w3.org/2000/09/xmldsig#sha1"/>
<DigestValue>qAATZaSqAr9fta9ApbGrFWDuCCQ=</DigestValue>
</Reference>
</SignedInfo>
<SignatureValue>Hz8jtQb...bOdT6ZdTQ==</SignatureValue>
<KeyInfo>
<wsse:SecurityTokenReference>
<X509Data>
<X509Certificate>MIIDnzC...Ia2qKQ==</X509Certificate>
<X509IssuerSerial>
<X509IssuerName>...</X509IssuerName>
<X509SerialNumber>...</X509SerialNumber>
</X509IssuerSerial>
</X509Data>
</wsse:SecurityTokenReference>
</KeyInfo>
</Signature>
<wsu:Timestamp wsu:Id="id-7c425ac1-534a-4478-b5fe-6cae0690f08d">
<wsu:Created>2015-06-25T22:00:29.821700+00:00</wsu:Created>
<wsu:Expires>2015-06-25T22:05:29.821700+00:00</wsu:Expires>
</wsu:Timestamp>
</wsse:Security>
</soap:Header>
<soap:Body wsu:Id="id-d0f9fd77-f193-471f-8bab-ba9c5afa3e76">
...
</soap:Body>
</soap:Envelope>
"""
# Create the Signature node.
signature = xmlsec.template.create(
envelope,
xmlsec.Transform.EXCL_C14N,
xmlsec.Transform.RSA_SHA1,
)
# Add a KeyInfo node with X509Data child to the Signature. XMLSec will fill
# in this template with the actual certificate details when it signs.
key_info = xmlsec.template.ensure_key_info(signature)
x509_data = xmlsec.template.add_x509_data(key_info)
xmlsec.template.x509_data_add_issuer_serial(x509_data)
xmlsec.template.x509_data_add_certificate(x509_data)
# Load the signing key and certificate.
key = xmlsec.Key.from_file(keyfile, xmlsec.KeyFormat.PEM, password=password)
key.load_cert_from_file(certfile, xmlsec.KeyFormat.PEM)
# Insert the Signature node in the wsse:Security header.
security = get_security_header(envelope)
security.insert(0, signature)
# Perform the actual signing.
ctx = xmlsec.SignatureContext()
ctx.key = key
security.append(etree.Element(QName(ns.WSU, 'Timestamp')))
soap_env = detect_soap_env(envelope)
_sign_node(ctx, signature, envelope.find(QName(soap_env, 'Body')))
_sign_node(ctx, signature, security.find(QName(ns.WSU, 'Timestamp')))
ctx.sign(signature)
# Place the X509 data inside a WSSE SecurityTokenReference within
# KeyInfo. The recipient expects this structure, but we can't rearrange
# like this until after signing, because otherwise xmlsec won't populate
# the X509 data (because it doesn't understand WSSE).
sec_token_ref = etree.SubElement(
key_info, QName(ns.WSSE, 'SecurityTokenReference'))
sec_token_ref.append(x509_data)
def verify_envelope(envelope, certfile):
"""Verify WS-Security signature on given SOAP envelope with given cert.
Expects a document like that found in the sample XML in the ``sign()``
docstring.
Raise SignatureValidationFailed on failure, silent on success.
"""
soap_env = detect_soap_env(envelope)
header = envelope.find(QName(soap_env, 'Header'))
security = header.find(QName(ns.WSSE, 'Security'))
signature = security.find(QName(ns.DS, 'Signature'))
ctx = xmlsec.SignatureContext()
# Find each signed element and register its ID with the signing context.
refs = signature.xpath(
'ds:SignedInfo/ds:Reference', namespaces={'ds': ns.DS})
for ref in refs:
# Get the reference URI and cut off the initial '#'
referenced_id = ref.get('URI')[1:]
referenced = envelope.xpath(
"//*[@wsu:Id='%s']" % referenced_id,
namespaces={'wsu': ns.WSU},
)[0]
ctx.register_id(referenced, 'Id', ns.WSU)
key = xmlsec.Key.from_file(certfile, xmlsec.KeyFormat.CERT_PEM, None)
ctx.key = key
try:
ctx.verify(signature)
except xmlsec.Error:
# Sadly xmlsec gives us no details about the reason for the failure, so
# we have nothing to pass on except that verification failed.
raise SignatureVerificationFailed()
def _sign_node(ctx, signature, target):
"""Add sig for ``target`` in ``signature`` node, using ``ctx`` context.
Doesn't actually perform the signing; ``ctx.sign(signature)`` should be
called later to do that.
Adds a Reference node to the signature with URI attribute pointing to the
target node, and registers the target node's ID so XMLSec will be able to
find the target node by ID when it signs.
"""
# Ensure the target node has a wsu:Id attribute and get its value.
node_id = ensure_id(target)
# Unlike HTML, XML doesn't have a single standardized Id. WSSE suggests the
# use of the wsu:Id attribute for this purpose, but XMLSec doesn't
# understand that natively. So for XMLSec to be able to find the referenced
# node by id, we have to tell xmlsec about it using the register_id method.
ctx.register_id(target, 'Id', ns.WSU)
# Add reference to signature with URI attribute pointing to that ID.
ref = xmlsec.template.add_reference(
signature, xmlsec.Transform.SHA1, uri='#' + node_id)
# This is an XML normalization transform which will be performed on the
# target node contents before signing. This ensures that changes to
# irrelevant whitespace, attribute ordering, etc won't invalidate the
# signature.
xmlsec.template.add_transform(ref, xmlsec.Transform.EXCL_C14N)

View File

@ -2,17 +2,9 @@ import base64
import hashlib
import os
from lxml.builder import ElementMaker
from zeep import ns
from zeep.wsse import utils
NSMAP = {
'wsse': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd',
'wsu': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd',
}
WSSE = ElementMaker(namespace=NSMAP['wsse'])
WSU = ElementMaker(namespace=NSMAP['wsu'])
class UsernameToken(object):
"""UsernameToken Profile 1.1
@ -54,19 +46,19 @@ class UsernameToken(object):
self.created = created
self.use_digest = use_digest
def sign(self, envelope, headers):
def apply(self, envelope, headers):
security = utils.get_security_header(envelope)
# The token placeholder might already exists since it is specified in
# the WSDL.
token = security.find('{%s}UsernameToken' % NSMAP['wsse'])
token = security.find('{%s}UsernameToken' % ns.WSSE)
if token is None:
token = WSSE.UsernameToken()
token = utils.WSSE.UsernameToken()
security.append(token)
# Create the sub elements of the UsernameToken element
elements = [
WSSE.Username(self.username)
utils.WSSE.Username(self.username)
]
if self.password is not None or self.password_digest is not None:
if self.use_digest:
@ -82,7 +74,7 @@ class UsernameToken(object):
def _create_password_text(self):
return [
WSSE.Password(
utils.WSSE.Password(
self.password,
Type='%s#PasswordText' % self.username_token_profile_ns)
]
@ -106,13 +98,13 @@ class UsernameToken(object):
digest = self.password_digest
return [
WSSE.Password(
utils.WSSE.Password(
digest,
Type='%s#PasswordDigest' % self.username_token_profile_ns
),
WSSE.Nonce(
utils.WSSE.Nonce(
base64.b64encode(nonce).decode('utf-8'),
EncodingType='%s#Base64Binary' % self.soap_message_secutity_ns
),
WSU.Created(timestamp)
utils.WSU.Created(timestamp)
]

View File

@ -1,14 +1,20 @@
from uuid import uuid4
from lxml import etree
import datetime
import pytz
from lxml.builder import ElementMaker
from zeep import ns
from zeep.wsdl.utils import get_or_create_header
NSMAP = {
'wsse': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd',
'wsse': ns.WSSE,
'wsu': ns.WSU,
}
WSSE = ElementMaker(namespace=NSMAP['wsse'])
WSSE = ElementMaker(namespace=NSMAP['wsse'], nsmap={'wsse': ns.WSSE})
WSU = ElementMaker(namespace=NSMAP['wsu'], nsmap={'wsu': ns.WSU})
ID_ATTR = etree.QName(NSMAP['wsu'], 'Id')
def get_security_header(doc):
@ -28,3 +34,21 @@ def get_timestamp(timestamp=None):
timestamp = timestamp or datetime.datetime.utcnow()
timestamp = timestamp.replace(tzinfo=pytz.utc, microsecond=0)
return timestamp.isoformat()
def get_unique_id():
return 'id-{0}'.format(uuid4())
def ensure_id(node):
"""Ensure given node has a wsu:Id attribute; add unique one if not.
Return found/created attribute value.
"""
assert node is not None
id_val = node.get(ID_ATTR)
if not id_val:
id_val = get_unique_id()
node.set(ID_ATTR, id_val)
return id_val

View File

@ -1,6 +1,6 @@
from zeep.xsd.builtins import * # noqa
from zeep.xsd.const import SkipValue # noqa
from zeep.xsd.elements import * # noqa
from zeep.xsd.types import * # noqa
from zeep.xsd.valueobjects import * # noqa
from zeep.xsd.schema import Schema # noqa
from zeep.xsd.indicators import * # noqa
from zeep.xsd.types import * # noqa
from zeep.xsd.types.builtins import * # noqa
from zeep.xsd.valueobjects import * # noqa

View File

@ -10,3 +10,15 @@ def xsi_ns(localname):
def xsd_ns(localname):
return etree.QName(NS_XSD, localname)
class _StaticIdentity(object):
def __init__(self, val):
self.__value__ = val
def __repr__(self):
return self.__value__
NotSet = _StaticIdentity('NotSet')
SkipValue = _StaticIdentity('SkipValue')

View File

@ -1,47 +1,3 @@
class SchemaRepository(object):
"""Mapping between schema target namespace and schema object"""
def __init__(self):
self._schemas = {}
def add(self, schema):
self._schemas[schema._target_namespace] = schema
def get(self, namespace):
if namespace in self._schemas:
return self._schemas[namespace]
def __contains__(self, namespace):
return namespace in self._schemas
def __len__(self):
return len(self._schemas)
class SchemaNodeRepository(object):
"""Mapping between schema target namespace and lxml node"""
def __init__(self):
self._nodes = {}
def add(self, key, value):
self._nodes[key] = value
def get(self, key):
return self._nodes[key]
def __len__(self):
return len(self._nodes)
class ParserContext(object):
"""Parser context when parsing wsdl/xsd files"""
def __init__(self):
self.schema_nodes = SchemaNodeRepository()
self.schema_objects = SchemaRepository()
# Mapping between internal nodes and original location
self.schema_locations = {}
class XmlParserContext(object):
"""Parser context when parsing XML elements"""

View File

@ -1,500 +0,0 @@
import copy
import logging
from lxml import etree
from zeep import exceptions
from zeep.exceptions import UnexpectedElementError
from zeep.utils import qname_attr
from zeep.xsd.const import xsi_ns
from zeep.xsd.context import XmlParserContext
from zeep.xsd.utils import max_occurs_iter
from zeep.xsd.valueobjects import AnyObject # cyclic import / FIXME
logger = logging.getLogger(__name__)
class Base(object):
@property
def accepts_multiple(self):
return self.max_occurs != 1
@property
def default_value(self):
return None
@property
def is_optional(self):
return self.min_occurs == 0
def parse_args(self, args, index=0):
result = {}
if not args:
return result, args, index
value = args[index]
index += 1
return {self.attr_name: value}, args, index
def parse_kwargs(self, kwargs, name, available_kwargs):
raise NotImplementedError()
def parse_xmlelements(self, xmlelements, schema, name=None, context=None):
"""Consume matching xmlelements and call parse() on each of them"""
raise NotImplementedError()
def signature(self, depth=()):
return ''
class Any(Base):
name = None
def __init__(self, max_occurs=1, min_occurs=1, process_contents='strict',
restrict=None):
"""
:param process_contents: Specifies how the XML processor should handle
validation against the elements specified by
this any element
:type process_contents: str (strict, lax, skip)
"""
super(Any, self).__init__()
self.max_occurs = max_occurs
self.min_occurs = min_occurs
self.restrict = restrict
self.process_contents = process_contents
# cyclic import
from zeep.xsd.builtins import AnyType
self.type = AnyType()
def __call__(self, any_object):
return any_object
def __repr__(self):
return '<%s(name=%r)>' % (self.__class__.__name__, self.name)
def accept(self, value):
return True
def parse(self, xmlelement, schema, context=None):
if self.process_contents == 'skip':
return xmlelement
qname = etree.QName(xmlelement.tag)
for context_schema in context.schemas:
if qname.namespace in context_schema._schemas:
schema = context_schema
break
xsd_type = qname_attr(xmlelement, xsi_ns('type'))
if xsd_type is not None:
xsd_type = schema.get_type(xsd_type)
return xsd_type.parse_xmlelement(xmlelement, schema, context=context)
try:
element = schema.get_element(xmlelement.tag)
return element.parse(xmlelement, schema, context=context)
except (exceptions.NamespaceError, exceptions.LookupError):
return xmlelement
def parse_kwargs(self, kwargs, name, available_kwargs):
if name in available_kwargs:
available_kwargs.remove(name)
value = kwargs[name]
return {name: value}
return {}
def parse_xmlelements(self, xmlelements, schema, name=None, context=None):
"""Consume matching xmlelements and call parse() on each of them"""
result = []
for i in max_occurs_iter(self.max_occurs):
if xmlelements:
xmlelement = xmlelements.popleft()
item = self.parse(xmlelement, schema, context=context)
if item is not None:
result.append(item)
else:
break
if not self.accepts_multiple:
result = result[0] if result else None
return result
def render(self, parent, value):
assert parent is not None
if self.accepts_multiple and isinstance(value, list):
from zeep.xsd import SimpleType
if isinstance(self.restrict, SimpleType):
for val in value:
node = etree.SubElement(parent, 'item')
node.set(xsi_ns('type'), self.restrict.qname)
self._render_value_item(node, val)
elif self.restrict:
for val in value:
node = etree.SubElement(parent, self.restrict.name)
# node.set(xsi_ns('type'), self.restrict.qname)
self._render_value_item(node, val)
else:
for val in value:
self._render_value_item(parent, val)
else:
self._render_value_item(parent, value)
def _render_value_item(self, parent, value):
if value is None: # can be an lxml element
return
# Check if we received a proper value object. If we receive the wrong
# type then return a nice error message
if self.restrict:
expected_types = (etree._Element,) + self.restrict.accepted_types
else:
expected_types = (etree._Element, AnyObject)
if not isinstance(value, expected_types):
type_names = [
'%s.%s' % (t.__module__, t.__name__) for t in expected_types
]
err_message = "Any element received object of type %r, expected %s" % (
type(value).__name__, ' or '.join(type_names))
raise TypeError('\n'.join((
err_message,
"See http://docs.python-zeep.org/en/master/datastructures.html"
"#any-objects for more information"
)))
if isinstance(value, etree._Element):
parent.append(value)
elif self.restrict:
if isinstance(value, list):
for val in value:
self.restrict.render(parent, val)
else:
self.restrict.render(parent, value)
else:
if isinstance(value.value, list):
for val in value.value:
value.xsd_elm.render(parent, val)
else:
value.xsd_elm.render(parent, value.value)
def resolve(self):
return self
def signature(self, depth=()):
if self.restrict:
base = self.restrict.name
else:
base = 'ANY'
if self.accepts_multiple:
return '%s[]' % base
return base
class Element(Base):
def __init__(self, name, type_=None, min_occurs=1, max_occurs=1,
nillable=False, default=None, is_global=False, attr_name=None):
if name is None:
raise ValueError("name cannot be None", self.__class__)
if not isinstance(name, etree.QName):
name = etree.QName(name)
self.name = name.localname if name else None
self.qname = name
self.type = type_
self.min_occurs = min_occurs
self.max_occurs = max_occurs
self.nillable = nillable
self.is_global = is_global
self.default = default
self.attr_name = attr_name or self.name
# assert type_
def __str__(self):
if self.type:
return '%s(%s)' % (self.name, self.type.signature())
return '%s()' % self.name
def __call__(self, *args, **kwargs):
instance = self.type(*args, **kwargs)
if hasattr(instance, '_xsd_type'):
instance._xsd_elm = self
return instance
def __repr__(self):
return '<%s(name=%r, type=%r)>' % (
self.__class__.__name__, self.name, self.type)
def __eq__(self, other):
return (
other is not None and
self.__class__ == other.__class__ and
self.__dict__ == other.__dict__)
@property
def default_value(self):
value = [] if self.accepts_multiple else self.default
return value
def clone(self, name=None, min_occurs=1, max_occurs=1):
new = copy.copy(self)
if name:
if not isinstance(name, etree.QName):
name = etree.QName(name)
new.name = name.localname
new.qname = name
new.attr_name = new.name
new.min_occurs = min_occurs
new.max_occurs = max_occurs
return new
def parse(self, xmlelement, schema, allow_none=False, context=None):
"""Process the given xmlelement. If it has an xsi:type attribute then
use that for further processing. This should only be done for subtypes
of the defined type but for now we just accept everything.
"""
context = context or XmlParserContext()
instance_type = qname_attr(xmlelement, xsi_ns('type'))
xsd_type = None
if instance_type:
xsd_type = schema.get_type(instance_type, fail_silently=True)
xsd_type = xsd_type or self.type
return xsd_type.parse_xmlelement(
xmlelement, schema, allow_none=allow_none, context=context)
def parse_kwargs(self, kwargs, name, available_kwargs):
return self.type.parse_kwargs(
kwargs, name or self.attr_name, available_kwargs)
def parse_xmlelements(self, xmlelements, schema, name=None, context=None):
"""Consume matching xmlelements and call parse() on each of them"""
result = []
num_matches = 0
for i in max_occurs_iter(self.max_occurs):
if not xmlelements:
break
# Workaround for SOAP servers which incorrectly use unqualified
# or qualified elements in the responses (#170, #176). To make the
# best of it we compare the full uri's if both elements have a
# namespace. If only one has a namespace then only compare the
# localname.
# If both elements have a namespace and they don't match then skip
element_tag = etree.QName(xmlelements[0].tag)
if (
element_tag.namespace and self.qname.namespace and
element_tag.namespace != self.qname.namespace
):
break
# Only compare the localname
if element_tag.localname == self.qname.localname:
xmlelement = xmlelements.popleft()
num_matches += 1
item = self.parse(
xmlelement, schema, allow_none=True, context=context)
if item is not None:
result.append(item)
else:
# If the element passed doesn't match and the current one is
# not optional then throw an error
if num_matches == 0 and not self.is_optional:
raise UnexpectedElementError(
"Unexpected element %r, expected %r" % (
element_tag.text, self.qname.text))
break
if not self.accepts_multiple:
result = result[0] if result else None
return result
def render(self, parent, value):
"""Render the value(s) on the parent lxml.Element.
This actually just calls _render_value_item for each value.
"""
assert parent is not None
if self.accepts_multiple and isinstance(value, list):
for val in value:
self._render_value_item(parent, val)
else:
self._render_value_item(parent, value)
def _render_value_item(self, parent, value):
"""Render the value on the parent lxml.Element"""
if value is None:
if self.is_optional:
return
elm = etree.SubElement(parent, self.qname)
if self.nillable:
elm.set(xsi_ns('nil'), 'true')
return
node = etree.SubElement(parent, self.qname)
xsd_type = getattr(value, '_xsd_type', self.type)
if xsd_type != self.type:
return value._xsd_type.render(node, value, xsd_type)
return self.type.render(node, value)
def resolve_type(self):
self.type = self.type.resolve()
def resolve(self):
self.resolve_type()
return self
def signature(self, depth=()):
if len(depth) > 0 and self.is_global:
return self.name + '()'
value = self.type.signature(depth)
if self.accepts_multiple:
return '%s[]' % value
return value
class Attribute(Element):
def __init__(self, name, type_=None, required=False, default=None):
super(Attribute, self).__init__(name=name, type_=type_, default=default)
self.required = required
self.array_type = None
def parse(self, value):
try:
return self.type.pythonvalue(value)
except (TypeError, ValueError):
logger.exception("Error during xml -> python translation")
return None
def render(self, parent, value):
if value is None and not self.required:
return
value = self.type.xmlvalue(value)
parent.set(self.qname, value)
def clone(self, *args, **kwargs):
array_type = kwargs.pop('array_type', None)
new = super(Attribute, self).clone(*args, **kwargs)
new.array_type = array_type
return new
def resolve(self):
retval = super(Attribute, self).resolve()
self.type = self.type.resolve()
if self.array_type:
retval.array_type = self.array_type.resolve()
return retval
class AttributeGroup(Element):
def __init__(self, name, attributes):
self.name = name
self.type = None
self._attributes = attributes
super(AttributeGroup, self).__init__(name, is_global=True)
@property
def attributes(self):
result = []
for attr in self._attributes:
if isinstance(attr, AttributeGroup):
result.extend(attr.attributes)
else:
result.append(attr)
return result
def resolve(self):
resolved = []
for attribute in self._attributes:
value = attribute.resolve()
assert value is not None
if isinstance(value, list):
resolved.extend(value)
else:
resolved.append(value)
self._attributes = resolved
return self
def signature(self, depth=()):
return ', '.join(attr.signature() for attr in self._attributes)
class AnyAttribute(Base):
name = None
def __init__(self, process_contents='strict'):
self.qname = None
self.process_contents = process_contents
def parse(self, attributes, context=None):
return attributes
def resolve(self):
return self
def render(self, parent, value):
if value is None:
return
for name, val in value.items():
parent.set(name, val)
def signature(self, depth=()):
return '{}'
class RefElement(object):
def __init__(self, tag, ref, schema, is_qualified=False,
min_occurs=1, max_occurs=1):
self._ref = ref
self._is_qualified = is_qualified
self._schema = schema
self.min_occurs = min_occurs
self.max_occurs = max_occurs
def resolve(self):
elm = self._schema.get_element(self._ref)
elm = elm.clone(
elm.qname, min_occurs=self.min_occurs, max_occurs=self.max_occurs)
return elm.resolve()
class RefAttribute(RefElement):
def __init__(self, *args, **kwargs):
self._array_type = kwargs.pop('array_type', None)
super(RefAttribute, self).__init__(*args, **kwargs)
def resolve(self):
attrib = self._schema.get_attribute(self._ref)
attrib = attrib.clone(attrib.qname, array_type=self._array_type)
return attrib.resolve()
class RefAttributeGroup(RefElement):
def resolve(self):
value = self._schema.get_attribute_group(self._ref)
return value.resolve()
class RefGroup(RefElement):
def resolve(self):
return self._schema.get_group(self._ref)

View File

@ -0,0 +1,5 @@
from .any import * # noqa
from .attribute import * # noqa
from .element import * # noqa
from .indicators import * # noqa
from .references import * # noqa

View File

@ -0,0 +1,223 @@
import logging
from lxml import etree
from zeep import exceptions
from zeep.utils import qname_attr
from zeep.xsd.const import xsi_ns, NotSet
from zeep.xsd.elements.base import Base
from zeep.xsd.utils import max_occurs_iter
from zeep.xsd.valueobjects import AnyObject
logger = logging.getLogger(__name__)
__all__ = ['Any', 'AnyAttribute']
class Any(Base):
name = None
def __init__(self, max_occurs=1, min_occurs=1, process_contents='strict',
restrict=None):
"""
:param process_contents: Specifies how the XML processor should handle
validation against the elements specified by
this any element
:type process_contents: str (strict, lax, skip)
"""
super(Any, self).__init__()
self.max_occurs = max_occurs
self.min_occurs = min_occurs
self.restrict = restrict
self.process_contents = process_contents
# cyclic import
from zeep.xsd import AnyType
self.type = AnyType()
def __call__(self, any_object):
return any_object
def __repr__(self):
return '<%s(name=%r)>' % (self.__class__.__name__, self.name)
def accept(self, value):
return True
def parse(self, xmlelement, schema, context=None):
if self.process_contents == 'skip':
return xmlelement
# If a schema was passed inline then check for a matching one
qname = etree.QName(xmlelement.tag)
if context and context.schemas:
for context_schema in context.schemas:
if context_schema._has_schema_document(qname.namespace):
schema = context_schema
break
# Lookup type via xsi:type attribute
xsd_type = qname_attr(xmlelement, xsi_ns('type'))
if xsd_type is not None:
xsd_type = schema.get_type(xsd_type)
return xsd_type.parse_xmlelement(xmlelement, schema, context=context)
# Check if a restrict is used
if self.restrict:
return self.restrict.parse_xmlelement(
xmlelement, schema, context=context)
try:
element = schema.get_element(xmlelement.tag)
return element.parse(xmlelement, schema, context=context)
except (exceptions.NamespaceError, exceptions.LookupError):
return xmlelement
def parse_kwargs(self, kwargs, name, available_kwargs):
if name in available_kwargs:
available_kwargs.remove(name)
value = kwargs[name]
return {name: value}
return {}
def parse_xmlelements(self, xmlelements, schema, name=None, context=None):
"""Consume matching xmlelements and call parse() on each of them"""
result = []
for _unused in max_occurs_iter(self.max_occurs):
if xmlelements:
xmlelement = xmlelements.popleft()
item = self.parse(xmlelement, schema, context=context)
if item is not None:
result.append(item)
else:
break
if not self.accepts_multiple:
result = result[0] if result else None
return result
def render(self, parent, value, render_path=None):
assert parent is not None
self.validate(value, render_path)
if self.accepts_multiple and isinstance(value, list):
from zeep.xsd import AnySimpleType
if isinstance(self.restrict, AnySimpleType):
for val in value:
node = etree.SubElement(parent, 'item')
node.set(xsi_ns('type'), self.restrict.qname)
self._render_value_item(node, val, render_path)
elif self.restrict:
for val in value:
node = etree.SubElement(parent, self.restrict.name)
# node.set(xsi_ns('type'), self.restrict.qname)
self._render_value_item(node, val, render_path)
else:
for val in value:
self._render_value_item(parent, val, render_path)
else:
self._render_value_item(parent, value, render_path)
def _render_value_item(self, parent, value, render_path):
if value is None: # can be an lxml element
return
elif isinstance(value, etree._Element):
parent.append(value)
elif self.restrict:
if isinstance(value, list):
for val in value:
self.restrict.render(parent, val, None, render_path)
else:
self.restrict.render(parent, value, None, render_path)
else:
if isinstance(value.value, list):
for val in value.value:
value.xsd_elm.render(parent, val, render_path)
else:
value.xsd_elm.render(parent, value.value, render_path)
def validate(self, value, render_path):
if self.accepts_multiple and isinstance(value, list):
# Validate bounds
if len(value) < self.min_occurs:
raise exceptions.ValidationError(
"Expected at least %d items (minOccurs check)" % self.min_occurs)
if self.max_occurs != 'unbounded' and len(value) > self.max_occurs:
raise exceptions.ValidationError(
"Expected at most %d items (maxOccurs check)" % self.min_occurs)
for val in value:
self._validate_item(val, render_path)
else:
if not self.is_optional and value in (None, NotSet):
raise exceptions.ValidationError("Missing element for Any")
self._validate_item(value, render_path)
def _validate_item(self, value, render_path):
if value is None: # can be an lxml element
return
# Check if we received a proper value object. If we receive the wrong
# type then return a nice error message
if self.restrict:
expected_types = (etree._Element,) + self.restrict.accepted_types
else:
expected_types = (etree._Element, AnyObject)
if not isinstance(value, expected_types):
type_names = [
'%s.%s' % (t.__module__, t.__name__) for t in expected_types
]
err_message = "Any element received object of type %r, expected %s" % (
type(value).__name__, ' or '.join(type_names))
raise TypeError('\n'.join((
err_message,
"See http://docs.python-zeep.org/en/master/datastructures.html"
"#any-objects for more information"
)))
def resolve(self):
return self
def signature(self, depth=()):
if self.restrict:
base = self.restrict.name
else:
base = 'ANY'
if self.accepts_multiple:
return '%s[]' % base
return base
class AnyAttribute(Base):
name = None
def __init__(self, process_contents='strict'):
self.qname = None
self.process_contents = process_contents
def parse(self, attributes, context=None):
return attributes
def resolve(self):
return self
def render(self, parent, value, render_path=None):
if value is None:
return
for name, val in value.items():
parent.set(name, val)
def signature(self, depth=()):
return '{}'

View File

@ -0,0 +1,92 @@
import logging
from lxml import etree
from zeep import exceptions
from zeep.xsd.const import NotSet
from zeep.xsd.elements.element import Element
logger = logging.getLogger(__name__)
__all__ = ['Attribute', 'AttributeGroup']
class Attribute(Element):
def __init__(self, name, type_=None, required=False, default=None):
super(Attribute, self).__init__(name=name, type_=type_, default=default)
self.required = required
self.array_type = None
def parse(self, value):
try:
return self.type.pythonvalue(value)
except (TypeError, ValueError):
logger.exception("Error during xml -> python translation")
return None
def render(self, parent, value, render_path=None):
if value in (None, NotSet) and not self.required:
return
self.validate(value, render_path)
value = self.type.xmlvalue(value)
parent.set(self.qname, value)
def validate(self, value, render_path):
try:
self.type.validate(value, required=self.required)
except exceptions.ValidationError as exc:
raise exceptions.ValidationError(
"The attribute %s is not valid: %s" % (self.qname, exc.message),
path=render_path)
def clone(self, *args, **kwargs):
array_type = kwargs.pop('array_type', None)
new = super(Attribute, self).clone(*args, **kwargs)
new.array_type = array_type
return new
def resolve(self):
retval = super(Attribute, self).resolve()
self.type = self.type.resolve()
if self.array_type:
retval.array_type = self.array_type.resolve()
return retval
class AttributeGroup(object):
def __init__(self, name, attributes):
if not isinstance(name, etree.QName):
name = etree.QName(name)
self.name = name.localname
self.qname = name
self.type = None
self._attributes = attributes
self.is_global = True
@property
def attributes(self):
result = []
for attr in self._attributes:
if isinstance(attr, AttributeGroup):
result.extend(attr.attributes)
else:
result.append(attr)
return result
def resolve(self):
resolved = []
for attribute in self._attributes:
value = attribute.resolve()
assert value is not None
if isinstance(value, list):
resolved.extend(value)
else:
resolved.append(value)
self._attributes = resolved
return self
def signature(self, depth=()):
return ', '.join(attr.signature() for attr in self._attributes)

View File

@ -0,0 +1,32 @@
class Base(object):
@property
def accepts_multiple(self):
return self.max_occurs != 1
@property
def default_value(self):
return None
@property
def is_optional(self):
return self.min_occurs == 0
def parse_args(self, args, index=0):
result = {}
if not args:
return result, args, index
value = args[index]
index += 1
return {self.attr_name: value}, args, index
def parse_kwargs(self, kwargs, name, available_kwargs):
raise NotImplementedError()
def parse_xmlelements(self, xmlelements, schema, name=None, context=None):
"""Consume matching xmlelements and call parse() on each of them"""
raise NotImplementedError()
def signature(self, depth=()):
return ''

View File

@ -0,0 +1,40 @@
from __future__ import division
from zeep.xsd.const import xsd_ns
from zeep.xsd.elements.base import Base
class Schema(Base):
name = 'schema'
attr_name = 'schema'
qname = xsd_ns('schema')
def clone(self, qname, min_occurs=1, max_occurs=1):
return self.__class__()
def parse_kwargs(self, kwargs, name, available_kwargs):
if name in available_kwargs:
value = kwargs[name]
available_kwargs.remove(name)
return {name: value}
return {}
def parse(self, xmlelement, schema, context=None):
from zeep.xsd.schema import Schema
schema = Schema(xmlelement, schema._transport)
context.schemas.append(schema)
return schema
def parse_xmlelements(self, xmlelements, schema, name=None, context=None):
if xmlelements[0].tag == self.qname:
xmlelement = xmlelements.popleft()
result = self.parse(xmlelement, schema, context=context)
return result
def resolve(self):
return self
default_elements = {
xsd_ns('schema'): Schema(),
}

View File

@ -0,0 +1,225 @@
import copy
import logging
from lxml import etree
from zeep import exceptions
from zeep.exceptions import UnexpectedElementError
from zeep.utils import qname_attr
from zeep.xsd.const import NotSet, xsi_ns
from zeep.xsd.context import XmlParserContext
from zeep.xsd.elements.base import Base
from zeep.xsd.utils import max_occurs_iter
logger = logging.getLogger(__name__)
__all__ = ['Element']
class Element(Base):
def __init__(self, name, type_=None, min_occurs=1, max_occurs=1,
nillable=False, default=None, is_global=False, attr_name=None):
if name is None:
raise ValueError("name cannot be None", self.__class__)
if not isinstance(name, etree.QName):
name = etree.QName(name)
self.name = name.localname if name else None
self.qname = name
self.type = type_
self.min_occurs = min_occurs
self.max_occurs = max_occurs
self.nillable = nillable
self.is_global = is_global
self.default = default
self.attr_name = attr_name or self.name
# assert type_
def __str__(self):
if self.type:
return '%s(%s)' % (self.name, self.type.signature())
return '%s()' % self.name
def __call__(self, *args, **kwargs):
instance = self.type(*args, **kwargs)
if hasattr(instance, '_xsd_type'):
instance._xsd_elm = self
return instance
def __repr__(self):
return '<%s(name=%r, type=%r)>' % (
self.__class__.__name__, self.name, self.type)
def __eq__(self, other):
return (
other is not None and
self.__class__ == other.__class__ and
self.__dict__ == other.__dict__)
@property
def default_value(self):
value = [] if self.accepts_multiple else self.default
return value
def clone(self, name=None, min_occurs=1, max_occurs=1):
new = copy.copy(self)
if name:
if not isinstance(name, etree.QName):
name = etree.QName(name)
new.name = name.localname
new.qname = name
new.attr_name = new.name
new.min_occurs = min_occurs
new.max_occurs = max_occurs
return new
def parse(self, xmlelement, schema, allow_none=False, context=None):
"""Process the given xmlelement. If it has an xsi:type attribute then
use that for further processing. This should only be done for subtypes
of the defined type but for now we just accept everything.
"""
context = context or XmlParserContext()
instance_type = qname_attr(xmlelement, xsi_ns('type'))
xsd_type = None
if instance_type:
xsd_type = schema.get_type(instance_type, fail_silently=True)
xsd_type = xsd_type or self.type
return xsd_type.parse_xmlelement(
xmlelement, schema, allow_none=allow_none, context=context)
def parse_kwargs(self, kwargs, name, available_kwargs):
return self.type.parse_kwargs(
kwargs, name or self.attr_name, available_kwargs)
def parse_xmlelements(self, xmlelements, schema, name=None, context=None):
"""Consume matching xmlelements and call parse() on each of them"""
result = []
num_matches = 0
for _unused in max_occurs_iter(self.max_occurs):
if not xmlelements:
break
# Workaround for SOAP servers which incorrectly use unqualified
# or qualified elements in the responses (#170, #176). To make the
# best of it we compare the full uri's if both elements have a
# namespace. If only one has a namespace then only compare the
# localname.
# If both elements have a namespace and they don't match then skip
element_tag = etree.QName(xmlelements[0].tag)
if (
element_tag.namespace and self.qname.namespace and
element_tag.namespace != self.qname.namespace
):
break
# Only compare the localname
if element_tag.localname == self.qname.localname:
xmlelement = xmlelements.popleft()
num_matches += 1
item = self.parse(
xmlelement, schema, allow_none=True, context=context)
if item is not None:
result.append(item)
else:
# If the element passed doesn't match and the current one is
# not optional then throw an error
if num_matches == 0 and not self.is_optional:
raise UnexpectedElementError(
"Unexpected element %r, expected %r" % (
element_tag.text, self.qname.text))
break
if not self.accepts_multiple:
result = result[0] if result else None
return result
def render(self, parent, value, render_path=None):
"""Render the value(s) on the parent lxml.Element.
This actually just calls _render_value_item for each value.
"""
if not render_path:
render_path = [self.qname.localname]
assert parent is not None
self.validate(value, render_path)
if self.accepts_multiple and isinstance(value, list):
for val in value:
self._render_value_item(parent, val, render_path)
else:
self._render_value_item(parent, value, render_path)
def _render_value_item(self, parent, value, render_path):
"""Render the value on the parent lxml.Element"""
if value is None or value is NotSet:
if self.is_optional:
return
elm = etree.SubElement(parent, self.qname)
if self.nillable:
elm.set(xsi_ns('nil'), 'true')
return
node = etree.SubElement(parent, self.qname)
xsd_type = getattr(value, '_xsd_type', self.type)
if xsd_type != self.type:
return value._xsd_type.render(node, value, xsd_type, render_path)
return self.type.render(node, value, None, render_path)
def validate(self, value, render_path=None):
"""Validate that the value is valid"""
if self.accepts_multiple and isinstance(value, list):
# Validate bounds
if len(value) < self.min_occurs:
raise exceptions.ValidationError(
"Expected at least %d items (minOccurs check)" % self.min_occurs,
path=render_path)
elif self.max_occurs != 'unbounded' and len(value) > self.max_occurs:
raise exceptions.ValidationError(
"Expected at most %d items (maxOccurs check)" % self.min_occurs,
path=render_path)
for val in value:
self._validate_item(val, render_path)
else:
if not self.is_optional and not self.nillable and value in (None, NotSet):
raise exceptions.ValidationError(
"Missing element %s" % (self.name), path=render_path)
self._validate_item(value, render_path)
def _validate_item(self, value, render_path):
if self.nillable and value in (None, NotSet):
return
try:
self.type.validate(value, required=True)
except exceptions.ValidationError as exc:
raise exceptions.ValidationError(
"The element %s is not valid: %s" % (self.qname, exc.message),
path=render_path)
def resolve_type(self):
self.type = self.type.resolve()
def resolve(self):
self.resolve_type()
return self
def signature(self, depth=()):
if len(depth) > 0 and self.is_global:
return self.name + '()'
value = self.type.signature(depth)
if self.accepts_multiple:
return '%s[]' % value
return value

View File

@ -1,5 +1,3 @@
from __future__ import print_function
import copy
import operator
from collections import OrderedDict, defaultdict, deque
@ -7,7 +5,9 @@ from collections import OrderedDict, defaultdict, deque
from cached_property import threaded_cached_property
from zeep.exceptions import UnexpectedElementError
from zeep.xsd.elements import Any, Base, Element
from zeep.xsd.const import NotSet, SkipValue
from zeep.xsd.elements import Any, Element
from zeep.xsd.elements.base import Base
from zeep.xsd.utils import (
NamePrefixGenerator, UniqueNameGenerator, max_occurs_iter)
@ -36,11 +36,8 @@ class OrderIndicator(Indicator, list):
def __init__(self, elements=None, min_occurs=1, max_occurs=1):
self.min_occurs = min_occurs
self.max_occurs = max_occurs
if elements is None:
super(OrderIndicator, self).__init__()
else:
super(OrderIndicator, self).__init__()
super(OrderIndicator, self).__init__()
if elements is not None:
self.extend(elements)
def clone(self, name, min_occurs=1, max_occurs=1):
@ -125,7 +122,7 @@ class OrderIndicator(Indicator, list):
item_kwargs = [item_kwargs]
result = []
for i, item_value in zip(max_occurs_iter(self.max_occurs), item_kwargs):
for item_value in max_occurs_iter(self.max_occurs, item_kwargs):
item_kwargs = set(item_value.keys())
subresult = OrderedDict()
for item_name, element in self.elements:
@ -163,24 +160,31 @@ class OrderIndicator(Indicator, list):
self[i] = elm.resolve()
return self
def render(self, parent, value):
def render(self, parent, value, render_path):
"""Create subelements in the given parent object."""
if not isinstance(value, list):
values = [value]
else:
values = value
for i, value in zip(max_occurs_iter(self.max_occurs), values):
for value in max_occurs_iter(self.max_occurs, values):
for name, element in self.elements_nested:
if name:
if name in value:
element_value = value[name]
child_path = render_path + [name]
else:
element_value = None
element_value = NotSet
child_path = render_path
else:
element_value = value
child_path = render_path
if element_value is SkipValue:
continue
if element_value is not None or not element.is_optional:
element.render(parent, element_value)
element.render(parent, element_value, child_path)
def signature(self, depth=()):
"""
@ -249,7 +253,7 @@ class Choice(OrderIndicator):
"""Return a dictionary"""
result = []
for i in max_occurs_iter(self.max_occurs):
for _unused in max_occurs_iter(self.max_occurs):
if not xmlelements:
break
@ -374,7 +378,7 @@ class Choice(OrderIndicator):
result = {name: result}
return result
def render(self, parent, value):
def render(self, parent, value, render_path):
"""Render the value to the parent element tree node.
This is a bit more complex then the order render methods since we need
@ -388,7 +392,7 @@ class Choice(OrderIndicator):
result = self._find_element_to_render(item)
if result:
element, choice_value = result
element.render(parent, choice_value)
element.render(parent, choice_value, render_path)
def accept(self, values):
"""Return the number of values which are accepted by this choice.
@ -454,7 +458,7 @@ class Sequence(OrderIndicator):
def parse_xmlelements(self, xmlelements, schema, name=None, context=None):
result = []
for item in max_occurs_iter(self.max_occurs):
for _unused in max_occurs_iter(self.max_occurs):
if not xmlelements:
break
@ -527,7 +531,7 @@ class Group(Indicator):
result = []
sub_name = '_value_1' if self.child.accepts_multiple else None
for i, sub_kwargs in zip(max_occurs_iter(self.max_occurs), item_kwargs):
for sub_kwargs in max_occurs_iter(self.max_occurs, item_kwargs):
available_sub_kwargs = set(sub_kwargs.keys())
subresult = self.child.parse_kwargs(
sub_kwargs, sub_name, available_sub_kwargs)
@ -543,7 +547,7 @@ class Group(Indicator):
def parse_xmlelements(self, xmlelements, schema, name=None, context=None):
result = []
for i in max_occurs_iter(self.max_occurs):
for _unused in max_occurs_iter(self.max_occurs):
result.append(
self.child.parse_xmlelements(
xmlelements, schema, name, context=context)

View File

@ -0,0 +1,40 @@
__all__ = ['RefElement', 'RefAttribute', 'RefAttributeGroup', 'RefGroup']
class RefElement(object):
def __init__(self, tag, ref, schema, is_qualified=False,
min_occurs=1, max_occurs=1):
self._ref = ref
self._is_qualified = is_qualified
self._schema = schema
self.min_occurs = min_occurs
self.max_occurs = max_occurs
def resolve(self):
elm = self._schema.get_element(self._ref)
elm = elm.clone(
elm.qname, min_occurs=self.min_occurs, max_occurs=self.max_occurs)
return elm.resolve()
class RefAttribute(RefElement):
def __init__(self, *args, **kwargs):
self._array_type = kwargs.pop('array_type', None)
super(RefAttribute, self).__init__(*args, **kwargs)
def resolve(self):
attrib = self._schema.get_attribute(self._ref)
attrib = attrib.clone(attrib.qname, array_type=self._array_type)
return attrib.resolve()
class RefAttributeGroup(RefElement):
def resolve(self):
value = self._schema.get_attribute_group(self._ref)
return value.resolve()
class RefGroup(RefElement):
def resolve(self):
return self._schema.get_group(self._ref)

View File

@ -1,42 +0,0 @@
from defusedxml.lxml import fromstring
from lxml import etree
from six.moves.urllib.parse import urlparse
from zeep.exceptions import XMLSyntaxError
from zeep.parser import absolute_location
class ImportResolver(etree.Resolver):
def __init__(self, transport, parser_context):
self.parser_context = parser_context
self.transport = transport
def resolve(self, url, pubid, context):
if url.startswith('intschema'):
text = etree.tostring(self.parser_context.schema_nodes.get(url))
return self.resolve_string(text, context)
if urlparse(url).scheme in ('http', 'https'):
content = self.transport.load(url)
return self.resolve_string(content, context)
def parse_xml(content, transport, parser_context=None, base_url=None):
parser = etree.XMLParser(remove_comments=True)
parser.resolvers.add(ImportResolver(transport, parser_context))
try:
return fromstring(content, parser=parser, base_url=base_url)
except etree.XMLSyntaxError as exc:
raise XMLSyntaxError("Invalid XML content received (%s)" % exc.message)
def load_external(url, transport, parser_context=None, base_url=None):
if url.startswith('intschema'):
assert parser_context
return parser_context.schema_nodes.get(url)
if base_url:
url = absolute_location(url, base_url)
response = transport.load(url)
return parse_xml(response, transport, parser_context, base_url)

View File

@ -4,9 +4,9 @@ from collections import OrderedDict
from lxml import etree
from zeep import exceptions
from zeep.xsd import builtins as xsd_builtins
from zeep.xsd import const
from zeep.xsd.context import ParserContext
from zeep.xsd.elements import builtins as xsd_builtins_elements
from zeep.xsd.types import builtins as xsd_builtins_types
from zeep.xsd.visitor import SchemaVisitor
logger = logging.getLogger(__name__)
@ -15,12 +15,10 @@ logger = logging.getLogger(__name__)
class Schema(object):
"""A schema is a collection of schema documents."""
def __init__(self, node=None, transport=None, location=None,
parser_context=None):
self._parser_context = parser_context or ParserContext()
def __init__(self, node=None, transport=None, location=None):
self._transport = transport
self._schemas = OrderedDict()
self._documents = OrderedDict()
self._prefix_map_auto = {}
self._prefix_map_custom = {}
@ -30,26 +28,11 @@ class Schema(object):
nodes = node
self.add_documents(nodes, location)
def add_documents(self, schema_nodes, location):
documents = []
for node in schema_nodes:
document = SchemaDocument(
node, self._transport, self, location,
self._parser_context, location)
documents.append(document)
for document in documents:
document.resolve()
self._prefix_map_auto = self._create_prefix_map()
def __repr__(self):
if self._schemas:
main_doc = next(iter(self._schemas.values()))
location = main_doc._location
else:
location = '<none>'
return '<Schema(location=%r)>' % location
@property
def documents(self):
for documents in self._documents.values():
for document in documents:
yield document
@property
def prefix_map(self):
@ -64,47 +47,59 @@ class Schema(object):
@property
def is_empty(self):
"""Boolean to indicate if this schema contains any types or elements"""
return all(schema.is_empty for schema in self._schemas.values())
return all(document.is_empty for document in self.documents)
@property
def namespaces(self):
return set(self._schemas.keys())
return set(self._documents.keys())
@property
def elements(self):
"""Yield all globla xsd.Type objects"""
for schema in self._schemas.values():
for element in schema._elements.values():
for document in self.documents:
for element in document._elements.values():
yield element
@property
def types(self):
"""Yield all globla xsd.Type objects"""
for schema in self._schemas.values():
for type_ in schema._types.values():
for document in self.documents:
for type_ in document._types.values():
yield type_
def __repr__(self):
if self._documents:
main_doc = next(self.documents)
location = main_doc._location
else:
location = '<none>'
return '<Schema(location=%r)>' % location
def add_documents(self, schema_nodes, location):
documents = []
for node in schema_nodes:
document = self.create_new_document(node, location)
documents.append(document)
for document in documents:
document.resolve()
self._prefix_map_auto = self._create_prefix_map()
def get_element(self, qname):
"""Return a global xsd.Element object with the given qname"""
qname = self._create_qname(qname)
if qname.text in xsd_builtins.default_elements:
return xsd_builtins.default_elements[qname]
if qname.text in xsd_builtins_elements.default_elements:
return xsd_builtins_elements.default_elements[qname]
# Handle XSD namespace items
if qname.namespace == const.NS_XSD:
try:
return xsd_builtins.default_elements[qname]
return xsd_builtins_elements.default_elements[qname]
except KeyError:
raise exceptions.LookupError("No such type %r" % qname.text)
try:
schema = self._get_schema_document(qname.namespace)
return schema.get_element(qname)
except exceptions.NamespaceError:
raise exceptions.NamespaceError((
"Unable to resolve element %s. " +
"No schema available for the namespace %r."
) % (qname.text, qname.namespace))
return self._get_instance(qname, 'get_element', 'element')
def get_type(self, qname, fail_silently=False):
"""Return a global xsd.Type object with the given qname"""
@ -113,66 +108,78 @@ class Schema(object):
# Handle XSD namespace items
if qname.namespace == const.NS_XSD:
try:
return xsd_builtins.default_types[qname]
return xsd_builtins_types.default_types[qname]
except KeyError:
raise exceptions.LookupError("No such type %r" % qname.text)
try:
schema = self._get_schema_document(qname.namespace)
return schema.get_type(qname)
except exceptions.NamespaceError:
message = (
"Unable to resolve type %s. " +
"No schema available for the namespace %r."
) % (qname.text, qname.namespace)
return self._get_instance(qname, 'get_type', 'type')
except exceptions.NamespaceError as exc:
if fail_silently:
logger.info(message)
logger.info(str(exc))
else:
raise exceptions.NamespaceError(message)
raise
def get_group(self, qname):
"""Return a global xsd.Group object with the given qname"""
qname = self._create_qname(qname)
try:
schema = self._get_schema_document(qname.namespace)
return schema.get_group(qname)
except exceptions.NamespaceError:
raise exceptions.NamespaceError((
"Unable to resolve group %s. " +
"No schema available for the namespace %r."
) % (qname.text, qname.namespace))
return self._get_instance(qname, 'get_group', 'group')
def get_attribute(self, qname):
"""Return a global xsd.attributeGroup object with the given qname"""
qname = self._create_qname(qname)
try:
schema = self._get_schema_document(qname.namespace)
return schema.get_attribute(qname)
except exceptions.NamespaceError:
raise exceptions.NamespaceError((
"Unable to resolve attribute %s. " +
"No schema available for the namespace %r."
) % (qname.text, qname.namespace))
return self._get_instance(qname, 'get_attribute', 'attribute')
def get_attribute_group(self, qname):
"""Return a global xsd.attributeGroup object with the given qname"""
qname = self._create_qname(qname)
return self._get_instance(qname, 'get_attribute_group', 'attributeGroup')
def set_ns_prefix(self, prefix, namespace):
self._prefix_map_custom[prefix] = namespace
def get_ns_prefix(self, prefix):
try:
schema = self._get_schema_document(qname.namespace)
return schema.get_attribute_group(qname)
except exceptions.NamespaceError:
raise exceptions.NamespaceError((
"Unable to resolve attributeGroup %s. " +
"No schema available for the namespace %r."
) % (qname.text, qname.namespace))
try:
return self._prefix_map_custom[prefix]
except KeyError:
return self._prefix_map_auto[prefix]
except KeyError:
raise ValueError("No such prefix %r" % prefix)
def create_new_document(self, node, url, base_url=None):
namespace = node.get('targetNamespace') if node is not None else None
if base_url is None:
base_url = url
schema = SchemaDocument(namespace, url, base_url)
self._add_schema_document(schema)
schema.load(self, node)
return schema
def merge(self, schema):
"""Merge an other XSD schema in this one"""
for namespace, _schema in schema._schemas.items():
self._schemas[namespace] = _schema
for document in schema.documents:
self._add_schema_document(document)
self._prefix_map_auto = self._create_prefix_map()
def _get_instance(self, qname, method_name, name):
"""Return an object from one of the SchemaDocument's"""
qname = self._create_qname(qname)
try:
last_exception = None
for schema in self._get_schema_documents(qname.namespace):
method = getattr(schema, method_name)
try:
return method(qname)
except exceptions.LookupError as exc:
last_exception = exc
continue
raise last_exception
except exceptions.NamespaceError:
raise exceptions.NamespaceError((
"Unable to resolve %s %s. " +
"No schema available for the namespace %r."
) % (name, qname.text, qname.namespace))
def _create_qname(self, name):
"""Create an `lxml.etree.QName()` object for the given qname string.
@ -198,48 +205,42 @@ class Schema(object):
prefix_map = {
'xsd': 'http://www.w3.org/2001/XMLSchema',
}
for i, namespace in enumerate(self._schemas.keys()):
for i, namespace in enumerate(self._documents.keys()):
if namespace is None:
continue
prefix_map['ns%d' % i] = namespace
return prefix_map
def set_ns_prefix(self, prefix, namespace):
self._prefix_map_custom[prefix] = namespace
def get_ns_prefix(self, prefix):
try:
try:
return self._prefix_map_custom[prefix]
except KeyError:
return self._prefix_map_auto[prefix]
except KeyError:
raise ValueError("No such prefix %r" % prefix)
def _has_schema_document(self, namespace):
return namespace in self._documents
def _add_schema_document(self, document):
logger.info("Add document with tns %s to schema %s", document._target_namespace, id(self))
self._schemas[document._target_namespace] = document
logger.info("Add document with tns %s to schema %s", document.namespace, id(self))
documents = self._documents.setdefault(document.namespace, [])
documents.append(document)
def _get_schema_document(self, namespace):
if namespace not in self._schemas:
def _get_schema_document(self, namespace, location):
for document in self._documents.get(namespace, []):
if document._location == location:
return document
def _get_schema_documents(self, namespace, fail_silently=False):
if namespace not in self._documents:
if fail_silently:
return []
raise exceptions.NamespaceError(
"No schema available for the namespace %r" % namespace)
return self._schemas[namespace]
return self._documents[namespace]
class SchemaDocument(object):
def __init__(self, node, transport, schema, location, parser_context, base_url):
def __init__(self, namespace, location, base_url):
logger.debug("Init schema document for %r", location)
assert node is not None
assert parser_context
# Internal
self._schema = schema
self._base_url = base_url or location
self._location = location
self._transport = transport
self._target_namespace = (
node.get('targetNamespace') if node is not None else None)
self._target_namespace = namespace
self._elm_instances = []
self._attribute_groups = {}
@ -254,21 +255,33 @@ class SchemaDocument(object):
self._resolved = False
# self._xml_schema = None
self._schema._add_schema_document(self)
parser_context.schema_objects.add(self)
if node is not None:
# Disable XML schema validation for now
# if len(node) > 0:
# self.xml_schema = etree.XMLSchema(node)
visitor = SchemaVisitor(self, parser_context)
visitor.visit_schema(node)
def __repr__(self):
return '<SchemaDocument(location=%r, tns=%r, is_empty=%r)>' % (
self._location, self._target_namespace, self.is_empty)
@property
def namespace(self):
return self._target_namespace
@property
def is_empty(self):
return not bool(self._imports or self._types or self._elements)
def load(self, schema, node):
if node is None:
return
if not schema._has_schema_document(self._target_namespace):
raise RuntimeError(
"The document needs to be registered in the schema before " +
"it can be loaded")
# Disable XML schema validation for now
# if len(node) > 0:
# self.xml_schema = etree.XMLSchema(node)
visitor = SchemaVisitor(schema, self)
visitor.visit_schema(node)
def resolve(self):
logger.info("Resolving in schema %s", self)
@ -276,8 +289,9 @@ class SchemaDocument(object):
return
self._resolved = True
for schema in self._imports.values():
schema.resolve()
for schemas in self._imports.values():
for schema in schemas:
schema.resolve()
def _resolve_dict(val):
for key, obj in val.items():
@ -295,6 +309,13 @@ class SchemaDocument(object):
element.resolve()
self._elm_instances = []
def register_import(self, namespace, schema):
schemas = self._imports.setdefault(namespace, [])
schemas.append(schema)
def is_imported(self, namespace):
return namespace in self._imports
def register_type(self, name, value):
assert not isinstance(value, type)
assert value is not None
@ -330,59 +351,36 @@ class SchemaDocument(object):
def get_type(self, qname):
"""Return a xsd.Type object from this schema"""
try:
return self._types[qname]
except KeyError:
known_items = ', '.join(self._types.keys())
raise exceptions.LookupError((
"No type '%s' in namespace %s. " +
"Available types are: %s"
) % (qname.localname, qname.namespace, known_items or ' - '))
return self._get_instance(qname, self._types, 'type')
def get_element(self, qname):
"""Return a xsd.Element object from this schema"""
try:
return self._elements[qname]
except KeyError:
known_items = ', '.join(self._elements.keys())
raise exceptions.LookupError((
"No element '%s' in namespace %s. " +
"Available elements are: %s"
) % (qname.localname, qname.namespace, known_items or ' - '))
return self._get_instance(qname, self._elements, 'element')
def get_group(self, qname):
"""Return a xsd.Group object from this schema"""
try:
return self._groups[qname]
except KeyError:
known_items = ', '.join(self._groups.keys())
raise exceptions.LookupError((
"No group '%s' in namespace %s. " +
"Available attributes are: %s"
) % (qname.localname, qname.namespace, known_items or ' - '))
return self._get_instance(qname, self._groups, 'group')
def get_attribute(self, qname):
"""Return a xsd.Attribute object from this schema"""
try:
return self._attributes[qname]
except KeyError:
known_items = ', '.join(self._attributes.keys())
raise exceptions.LookupError((
"No attribute '%s' in namespace %s. " +
"Available attributes are: %s"
) % (qname.localname, qname.namespace, known_items or ' - '))
return self._get_instance(qname, self._attributes, 'attribute')
def get_attribute_group(self, qname):
"""Return a xsd.AttributeGroup object from this schema"""
try:
return self._attribute_groups[qname]
except KeyError:
known_items = ', '.join(self._attribute_groups.keys())
raise exceptions.LookupError((
"No attributeGroup '%s' in namespace %s. " +
"Available attributeGroups are: %s"
) % (qname.localname, qname.namespace, known_items or ' - '))
return self._get_instance(qname, self._attribute_groups, 'attributeGroup')
@property
def is_empty(self):
return not bool(self._imports or self._types or self._elements)
def _get_instance(self, qname, items, item_name):
try:
return items[qname]
except KeyError:
known_items = ', '.join(items.keys())
raise exceptions.LookupError((
"No %(item_name)s '%(localname)s' in namespace %(namespace)s. " +
"Available %(item_name_plural)s are: %(known_items)s"
) % {
'item_name': item_name,
'item_name_plural': item_name + 's',
'localname': qname.localname,
'namespace': qname.namespace,
'known_items': known_items or ' - '
})

View File

@ -0,0 +1,5 @@
from .any import * # noqa
from .base import * # noqa
from .collection import * # noqa
from .complex import * # noqa
from .simple import * # noqa

74
src/zeep/xsd/types/any.py Normal file
View File

@ -0,0 +1,74 @@
import logging
from zeep.utils import qname_attr
from zeep.xsd.const import xsd_ns, xsi_ns
from zeep.xsd.types.base import Type
from zeep.xsd.valueobjects import AnyObject
logger = logging.getLogger(__name__)
__all__ = ['AnyType']
class AnyType(Type):
_default_qname = xsd_ns('anyType')
def render(self, parent, value, xsd_type=None, render_path=None):
if isinstance(value, AnyObject):
if value.xsd_type is None:
parent.set(xsi_ns('nil'), 'true')
else:
value.xsd_type.render(parent, value.value, None, render_path)
parent.set(xsi_ns('type'), value.xsd_type.qname)
elif hasattr(value, '_xsd_elm'):
value._xsd_elm.render(parent, value, render_path)
parent.set(xsi_ns('type'), value._xsd_elm.qname)
else:
parent.text = self.xmlvalue(value)
def parse_xmlelement(self, xmlelement, schema=None, allow_none=True,
context=None):
xsi_type = qname_attr(xmlelement, xsi_ns('type'))
xsi_nil = xmlelement.get(xsi_ns('nil'))
children = list(xmlelement.getchildren())
# Handle xsi:nil attribute
if xsi_nil == 'true':
return None
# Check if a xsi:type is defined and try to parse the xml according
# to that type.
if xsi_type and schema:
xsd_type = schema.get_type(xsi_type, fail_silently=True)
# If we were unable to resolve a type for the xsi:type (due to
# buggy soap servers) then we just return the lxml element.
if not xsd_type:
return children
# If the xsd_type is xsd:anyType then we will recurs so ignore
# that.
if isinstance(xsd_type, self.__class__):
return xmlelement.text or None
return xsd_type.parse_xmlelement(
xmlelement, schema, context=context)
# If no xsi:type is set and the element has children then there is
# not much we can do. Just return the children
elif children:
return children
elif xmlelement.text is not None:
return self.pythonvalue(xmlelement.text)
return None
def resolve(self):
return self
def xmlvalue(self, value):
return value
def pythonvalue(self, value, schema=None):
return value

111
src/zeep/xsd/types/base.py Normal file
View File

@ -0,0 +1,111 @@
class Type(object):
def __init__(self, qname=None, is_global=False):
self.qname = qname
self.name = qname.localname if qname else None
self._resolved = False
self.is_global = is_global
def accept(self, value):
raise NotImplementedError
def validate(self, value, required=False):
return
def parse_kwargs(self, kwargs, name, available_kwargs):
value = None
name = name or self.name
if name in available_kwargs:
value = kwargs[name]
available_kwargs.remove(name)
return {name: value}
return {}
def parse_xmlelement(self, xmlelement, schema=None, allow_none=True,
context=None):
raise NotImplementedError(
'%s.parse_xmlelement() is not implemented' % self.__class__.__name__)
def parsexml(self, xml, schema=None):
raise NotImplementedError
def render(self, parent, value, xsd_type=None, render_path=None):
raise NotImplementedError(
'%s.render() is not implemented' % self.__class__.__name__)
def resolve(self):
raise NotImplementedError(
'%s.resolve() is not implemented' % self.__class__.__name__)
def extend(self, child):
raise NotImplementedError(
'%s.extend() is not implemented' % self.__class__.__name__)
def restrict(self, child):
raise NotImplementedError(
'%s.restrict() is not implemented' % self.__class__.__name__)
@property
def attributes(self):
return []
@classmethod
def signature(cls, depth=()):
return ''
class UnresolvedType(Type):
def __init__(self, qname, schema):
self.qname = qname
assert self.qname.text != 'None'
self.schema = schema
def __repr__(self):
return '<%s(qname=%r)>' % (self.__class__.__name__, self.qname)
def render(self, parent, value, xsd_type=None, render_path=None):
raise RuntimeError(
"Unable to render unresolved type %s. This is probably a bug." % (
self.qname))
def resolve(self):
retval = self.schema.get_type(self.qname)
return retval.resolve()
class UnresolvedCustomType(Type):
def __init__(self, qname, base_type, schema):
assert qname is not None
self.qname = qname
self.name = str(qname.localname)
self.schema = schema
self.base_type = base_type
def __repr__(self):
return '<%s(qname=%r, base_type=%r)>' % (
self.__class__.__name__, self.qname.text, self.base_type)
def resolve(self):
base = self.base_type
base = base.resolve()
cls_attributes = {
'__module__': 'zeep.xsd.dynamic_types',
}
from zeep.xsd.types.collection import UnionType # FIXME
from zeep.xsd.types.simple import AnySimpleType # FIXME
if issubclass(base.__class__, UnionType):
xsd_type = type(self.name, (base.__class__,), cls_attributes)
return xsd_type(base.item_types)
elif issubclass(base.__class__, AnySimpleType):
xsd_type = type(self.name, (base.__class__,), cls_attributes)
return xsd_type(self.qname)
else:
xsd_type = type(self.name, (base.base_class,), cls_attributes)
return xsd_type(self.qname)

View File

@ -1,56 +1,3 @@
"""
Primitive datatypes
- string
- boolean
- decimal
- float
- double
- duration
- dateTime
- time
- date
- gYearMonth
- gYear
- gMonthDay
- gDay
- gMonth
- hexBinary
- base64Binary
- anyURI
- QName
- NOTATION
Derived datatypes
- normalizedString
- token
- language
- NMTOKEN
- NMTOKENS
- Name
- NCName
- ID
- IDREF
- IDREFS
- ENTITY
- ENTITIES
- integer
- nonPositiveInteger
- negativeInteger
- long
- int
- short
- byte
- nonNegativeInteger
- unsignedLong
- unsignedInt
- unsignedShort
- unsignedByte
- positiveInteger
"""
from __future__ import division
import base64
import datetime
import math
@ -60,13 +7,10 @@ from decimal import Decimal as _Decimal
import isodate
import pytz
import six
from lxml import etree
from zeep.utils import qname_attr
from zeep.xsd.const import xsd_ns, xsi_ns, NS_XSD
from zeep.xsd.elements import Base
from zeep.xsd.types import SimpleType
from zeep.xsd.valueobjects import AnyObject
from zeep.xsd.const import xsd_ns
from zeep.xsd.types.any import AnyType
from zeep.xsd.types.simple import AnySimpleType
class ParseError(ValueError):
@ -84,33 +28,23 @@ def check_no_collection(func):
return _wrapper
class _BuiltinType(SimpleType):
def __init__(self, qname=None, is_global=False):
super(_BuiltinType, self).__init__(
qname or etree.QName(self._default_qname), is_global)
def signature(self, depth=()):
if self.qname.namespace == NS_XSD:
return 'xsd:%s' % self.name
return self.name
##
# Primitive types
class String(_BuiltinType):
class String(AnySimpleType):
_default_qname = xsd_ns('string')
accepted_types = six.string_types
@check_no_collection
def xmlvalue(self, value):
if isinstance(value, bytes):
return value.decode('utf-8')
return six.text_type(value if value is not None else '')
def pythonvalue(self, value):
return value
class Boolean(_BuiltinType):
class Boolean(AnySimpleType):
_default_qname = xsd_ns('boolean')
accepted_types = (bool,)
@ -126,7 +60,7 @@ class Boolean(_BuiltinType):
return value in ('true', '1')
class Decimal(_BuiltinType):
class Decimal(AnySimpleType):
_default_qname = xsd_ns('decimal')
accepted_types = (_Decimal, float) + six.string_types
@ -138,7 +72,7 @@ class Decimal(_BuiltinType):
return _Decimal(value)
class Float(_BuiltinType):
class Float(AnySimpleType):
_default_qname = xsd_ns('float')
accepted_types = (float, _Decimal) + six.string_types
@ -149,7 +83,7 @@ class Float(_BuiltinType):
return float(value)
class Double(_BuiltinType):
class Double(AnySimpleType):
_default_qname = xsd_ns('double')
accepted_types = (_Decimal, float) + six.string_types
@ -161,7 +95,7 @@ class Double(_BuiltinType):
return float(value)
class Duration(_BuiltinType):
class Duration(AnySimpleType):
_default_qname = xsd_ns('duration')
accepted_types = (isodate.duration.Duration,) + six.string_types
@ -173,13 +107,23 @@ class Duration(_BuiltinType):
return isodate.parse_duration(value)
class DateTime(_BuiltinType):
class DateTime(AnySimpleType):
_default_qname = xsd_ns('dateTime')
accepted_types = (datetime.datetime,) + six.string_types
@check_no_collection
def xmlvalue(self, value):
if value.microsecond:
# Bit of a hack, since datetime is a subclass of date we can't just
# test it with an isinstance(). And actually, we should not really
# care about the type, as long as it has the required attributes
if not all(hasattr(value, attr) for attr in ('hour', 'minute', 'second')):
value = datetime.datetime.combine(value, datetime.time(
getattr(value, 'hour', 0),
getattr(value, 'minute', 0),
getattr(value, 'second', 0)))
if getattr(value, 'microsecond', 0):
return isodate.isostrf.strftime(value, '%Y-%m-%dT%H:%M:%S.%f%Z')
return isodate.isostrf.strftime(value, '%Y-%m-%dT%H:%M:%S%Z')
@ -187,7 +131,7 @@ class DateTime(_BuiltinType):
return isodate.parse_datetime(value)
class Time(_BuiltinType):
class Time(AnySimpleType):
_default_qname = xsd_ns('time')
accepted_types = (datetime.time,) + six.string_types
@ -201,7 +145,7 @@ class Time(_BuiltinType):
return isodate.parse_time(value)
class Date(_BuiltinType):
class Date(AnySimpleType):
_default_qname = xsd_ns('date')
accepted_types = (datetime.date,) + six.string_types
@ -215,7 +159,7 @@ class Date(_BuiltinType):
return isodate.parse_date(value)
class gYearMonth(_BuiltinType):
class gYearMonth(AnySimpleType):
"""gYearMonth represents a specific gregorian month in a specific gregorian
year.
@ -242,7 +186,7 @@ class gYearMonth(_BuiltinType):
_parse_timezone(group['timezone']))
class gYear(_BuiltinType):
class gYear(AnySimpleType):
"""gYear represents a gregorian calendar year.
Lexical representation: CCYY
@ -265,7 +209,7 @@ class gYear(_BuiltinType):
return (int(group['year']), _parse_timezone(group['timezone']))
class gMonthDay(_BuiltinType):
class gMonthDay(AnySimpleType):
"""gMonthDay is a gregorian date that recurs, specifically a day of the
year such as the third of May.
@ -293,7 +237,7 @@ class gMonthDay(_BuiltinType):
_parse_timezone(group['timezone']))
class gDay(_BuiltinType):
class gDay(AnySimpleType):
"""gDay is a gregorian day that recurs, specifically a day of the month
such as the 5th of the month
@ -317,7 +261,7 @@ class gDay(_BuiltinType):
return (int(group['day']), _parse_timezone(group['timezone']))
class gMonth(_BuiltinType):
class gMonth(AnySimpleType):
"""gMonth is a gregorian month that recurs every year.
Lexical representation: --MM
@ -340,7 +284,7 @@ class gMonth(_BuiltinType):
return (int(group['month']), _parse_timezone(group['timezone']))
class HexBinary(_BuiltinType):
class HexBinary(AnySimpleType):
accepted_types = six.string_types
_default_qname = xsd_ns('hexBinary')
@ -352,7 +296,7 @@ class HexBinary(_BuiltinType):
return value
class Base64Binary(_BuiltinType):
class Base64Binary(AnySimpleType):
accepted_types = six.string_types
_default_qname = xsd_ns('base64Binary')
@ -364,7 +308,7 @@ class Base64Binary(_BuiltinType):
return base64.b64decode(value)
class AnyURI(_BuiltinType):
class AnyURI(AnySimpleType):
accepted_types = six.string_types
_default_qname = xsd_ns('anyURI')
@ -376,7 +320,7 @@ class AnyURI(_BuiltinType):
return value
class QName(_BuiltinType):
class QName(AnySimpleType):
accepted_types = six.string_types
_default_qname = xsd_ns('QName')
@ -388,7 +332,7 @@ class QName(_BuiltinType):
return value
class Notation(_BuiltinType):
class Notation(AnySimpleType):
accepted_types = six.string_types
_default_qname = xsd_ns('NOTATION')
@ -508,61 +452,6 @@ class PositiveInteger(NonNegativeInteger):
##
# Other
class AnyType(_BuiltinType):
_default_qname = xsd_ns('anyType')
def render(self, parent, value):
if isinstance(value, AnyObject):
value.xsd_type.render(parent, value.value)
parent.set(xsi_ns('type'), value.xsd_type.qname)
elif hasattr(value, '_xsd_elm'):
value._xsd_elm.render(parent, value)
parent.set(xsi_ns('type'), value._xsd_elm.qname)
else:
parent.text = self.xmlvalue(value)
def parse_xmlelement(self, xmlelement, schema=None, allow_none=True,
context=None):
xsi_type = qname_attr(xmlelement, xsi_ns('type'))
xsi_nil = xmlelement.get(xsi_ns('nil'))
# Handle xsi:nil attribute
if xsi_nil == "true":
return None
if xsi_type and schema:
xsd_type = schema.get_type(xsi_type, fail_silently=True)
# If we were unable to resolve a type for the xsi:type (due to
# buggy soap servers) then we just return the lxml element.
if not xsd_type:
return xmlelement.getchildren()
# If the xsd_type is xsd:anyType then we will recurs so ignore
# that.
if isinstance(xsd_type, self.__class__):
return xmlelement.text or None
return xsd_type.parse_xmlelement(
xmlelement, schema, context=context)
if xmlelement.text is None:
return
return self.pythonvalue(xmlelement.text)
def xmlvalue(self, value):
return value
def pythonvalue(self, value, schema=None):
return value
class AnySimpleType(AnyType):
_default_qname = xsd_ns('anySimpleType')
def _parse_timezone(val):
"""Return a pytz.tzinfo object"""
if not val:
@ -650,39 +539,3 @@ default_types = {
AnySimpleType,
]
}
class Schema(Base):
name = 'schema'
attr_name = 'schema'
qname = xsd_ns('schema')
def clone(self, qname, min_occurs=1, max_occurs=1):
return self.__class__()
def parse_kwargs(self, kwargs, name, available_kwargs):
if name in available_kwargs:
value = kwargs[name]
available_kwargs.remove(name)
return {name: value}
return {}
def parse(self, xmlelement, schema, context=None):
from zeep.xsd.schema import Schema
schema = Schema(xmlelement, schema._transport)
context.schemas.append(schema)
return schema
def parse_xmlelements(self, xmlelements, schema, name=None, context=None):
if xmlelements[0].tag == self.qname:
xmlelement = xmlelements.popleft()
result = self.parse(xmlelement, schema, context=context)
return result
def resolve(self):
return self
default_elements = {
xsd_ns('schema'): Schema(),
}

View File

@ -0,0 +1,73 @@
from zeep.utils import get_base_class
from zeep.xsd.types.simple import AnySimpleType
__all__ = ['ListType', 'UnionType']
class ListType(AnySimpleType):
"""Space separated list of simpleType values"""
def __init__(self, item_type):
self.item_type = item_type
super(ListType, self).__init__()
def __call__(self, value):
return value
def render(self, parent, value, xsd_type=None, render_path=None):
parent.text = self.xmlvalue(value)
def resolve(self):
self.item_type = self.item_type.resolve()
self.base_class = self.item_type.__class__
return self
def xmlvalue(self, value):
item_type = self.item_type
return ' '.join(item_type.xmlvalue(v) for v in value)
def pythonvalue(self, value):
if not value:
return []
item_type = self.item_type
return [item_type.pythonvalue(v) for v in value.split()]
def signature(self, depth=()):
return self.item_type.signature(depth) + '[]'
class UnionType(AnySimpleType):
"""Simple type existing out of multiple other types"""
def __init__(self, item_types):
self.item_types = item_types
self.item_class = None
assert item_types
super(UnionType, self).__init__(None)
def resolve(self):
self.item_types = [item.resolve() for item in self.item_types]
base_class = get_base_class(self.item_types)
if issubclass(base_class, AnySimpleType) and base_class != AnySimpleType:
self.item_class = base_class
return self
def signature(self, depth=()):
return ''
def parse_xmlelement(self, xmlelement, schema=None, allow_none=True,
context=None):
if self.item_class:
return self.item_class().parse_xmlelement(
xmlelement, schema, allow_none, context)
return xmlelement.text
def pythonvalue(self, value):
if self.item_class:
return self.item_class().pythonvalue(value)
return value
def xmlvalue(self, value):
if self.item_class:
return self.item_class().xmlvalue(value)
return value

View File

@ -3,194 +3,24 @@ import logging
from collections import OrderedDict, deque
from itertools import chain
import six
from cached_property import threaded_cached_property
from zeep.exceptions import XMLParseError, UnexpectedElementError
from zeep.xsd.const import xsi_ns
from zeep.xsd.elements import Any, AnyAttribute, AttributeGroup, Element
from zeep.xsd.indicators import Group, OrderIndicator, Sequence
from zeep.exceptions import UnexpectedElementError, XMLParseError
from zeep.xsd.const import xsi_ns, SkipValue, NotSet
from zeep.xsd.elements import (
Any, AnyAttribute, AttributeGroup, Choice, Element, Group, Sequence)
from zeep.xsd.elements.indicators import OrderIndicator
from zeep.xsd.types.any import AnyType
from zeep.xsd.types.simple import AnySimpleType
from zeep.xsd.utils import NamePrefixGenerator
from zeep.utils import get_base_class
from zeep.xsd.valueobjects import CompoundValue
logger = logging.getLogger(__name__)
class Type(object):
def __init__(self, qname=None, is_global=False):
self.qname = qname
self.name = qname.localname if qname else None
self._resolved = False
self.is_global = is_global
def accept(self, value):
raise NotImplementedError
def parse_kwargs(self, kwargs, name, available_kwargs):
value = None
name = name or self.name
if name in available_kwargs:
value = kwargs[name]
available_kwargs.remove(name)
return {name: value}
return {}
def parse_xmlelement(self, xmlelement, schema=None, allow_none=True,
context=None):
raise NotImplementedError(
'%s.parse_xmlelement() is not implemented' % self.__class__.__name__)
def parsexml(self, xml, schema=None):
raise NotImplementedError
def render(self, parent, value):
raise NotImplementedError(
'%s.render() is not implemented' % self.__class__.__name__)
def resolve(self):
raise NotImplementedError(
'%s.resolve() is not implemented' % self.__class__.__name__)
def extend(self, child):
raise NotImplementedError(
'%s.extend() is not implemented' % self.__class__.__name__)
def restrict(self, child):
raise NotImplementedError(
'%s.restrict() is not implemented' % self.__class__.__name__)
@property
def attributes(self):
return []
@classmethod
def signature(cls, depth=()):
return ''
__all__ = ['ComplexType']
class UnresolvedType(Type):
def __init__(self, qname, schema):
self.qname = qname
assert self.qname.text != 'None'
self.schema = schema
def __repr__(self):
return '<%s(qname=%r)>' % (self.__class__.__name__, self.qname)
def render(self, parent, value):
raise RuntimeError(
"Unable to render unresolved type %s. This is probably a bug." % (
self.qname))
def resolve(self):
retval = self.schema.get_type(self.qname)
return retval.resolve()
class UnresolvedCustomType(Type):
def __init__(self, qname, base_type, schema):
assert qname is not None
self.qname = qname
self.name = str(qname.localname)
self.schema = schema
self.base_type = base_type
def __repr__(self):
return '<%s(qname=%r, base_type=%r)>' % (
self.__class__.__name__, self.qname.text, self.base_type)
def resolve(self):
base = self.base_type
base = base.resolve()
cls_attributes = {
'__module__': 'zeep.xsd.dynamic_types',
}
if issubclass(base.__class__, UnionType):
xsd_type = type(self.name, (base.__class__,), cls_attributes)
return xsd_type(base.item_types)
elif issubclass(base.__class__, SimpleType):
xsd_type = type(self.name, (base.__class__,), cls_attributes)
return xsd_type(self.qname)
else:
xsd_type = type(self.name, (base.base_class,), cls_attributes)
return xsd_type(self.qname)
@six.python_2_unicode_compatible
class SimpleType(Type):
accepted_types = six.string_types
def __call__(self, *args, **kwargs):
"""Return the xmlvalue for the given value.
Expects only one argument 'value'. The args, kwargs handling is done
here manually so that we can return readable error messages instead of
only '__call__ takes x arguments'
"""
num_args = len(args) + len(kwargs)
if num_args != 1:
raise TypeError((
'%s() takes exactly 1 argument (%d given). ' +
'Simple types expect only a single value argument'
) % (self.__class__.__name__, num_args))
if kwargs and 'value' not in kwargs:
raise TypeError((
'%s() got an unexpected keyword argument %r. ' +
'Simple types expect only a single value argument'
) % (self.__class__.__name__, next(six.iterkeys(kwargs))))
value = args[0] if args else kwargs['value']
return self.xmlvalue(value)
def __eq__(self, other):
return (
other is not None and
self.__class__ == other.__class__ and
self.__dict__ == other.__dict__)
def __str__(self):
return '%s(value)' % (self.__class__.__name__)
def parse_xmlelement(self, xmlelement, schema=None, allow_none=True,
context=None):
if xmlelement.text is None:
return
try:
return self.pythonvalue(xmlelement.text)
except (TypeError, ValueError):
logger.exception("Error during xml -> python translation")
return None
def pythonvalue(self, xmlvalue):
raise NotImplementedError(
'%s.pytonvalue() not implemented' % self.__class__.__name__)
def render(self, parent, value):
parent.text = self.xmlvalue(value)
def resolve(self):
return self
def signature(self, depth=()):
return self.name
def xmlvalue(self, value):
raise NotImplementedError(
'%s.xmlvalue() not implemented' % self.__class__.__name__)
class ComplexType(Type):
class ComplexType(AnyType):
_xsd_name = None
def __init__(self, element=None, attributes=None,
@ -293,7 +123,7 @@ class ComplexType(Type):
# If this complexType extends a simpleType then we have no nested
# elements. Parse it directly via the type object. This is the case
# for xsd:simpleContent
if isinstance(self._element, Element) and isinstance(self._element.type, SimpleType):
if isinstance(self._element, Element) and isinstance(self._element.type, AnySimpleType):
name, element = self.elements_nested[0]
init_kwargs[name] = element.type.parse_xmlelement(
xmlelement, schema, name, context=context)
@ -330,30 +160,39 @@ class ComplexType(Type):
return self(**init_kwargs)
def render(self, parent, value, xsd_type=None):
def render(self, parent, value, xsd_type=None, render_path=None):
"""Serialize the given value lxml.Element subelements on the parent
element.
"""
if not render_path:
render_path = [self.name]
if not self.elements_nested and not self.attributes:
return
# Render attributes
for name, attribute in self.attributes:
attr_value = getattr(value, name, None)
attribute.render(parent, attr_value)
attr_value = value[name] if name in value else NotSet
child_path = render_path + [name]
attribute.render(parent, attr_value, child_path)
# Render sub elements
for name, element in self.elements_nested:
if isinstance(element, Element) or element.accepts_multiple:
element_value = getattr(value, name, None)
element_value = value[name] if name in value else NotSet
child_path = render_path + [name]
else:
element_value = value
child_path = list(render_path)
if element_value is SkipValue:
continue
if isinstance(element, Element):
element.type.render(parent, element_value)
element.type.render(parent, element_value, None, child_path)
else:
element.render(parent, element_value)
element.render(parent, element_value, child_path)
if xsd_type:
if xsd_type._xsd_name:
@ -435,6 +274,9 @@ class ComplexType(Type):
Used for handling xsd:extension tags
TODO: Needs a rewrite where the child containers are responsible for
the extend functionality.
"""
if isinstance(base, ComplexType):
base_attributes = base._attributes_unwrapped
@ -460,9 +302,13 @@ class ComplexType(Type):
element = []
if self._element and base_element:
element = self._element.clone(self._element.name)
if isinstance(element, OrderIndicator) and isinstance(base_element, OrderIndicator):
for item in reversed(base_element):
element.insert(0, item)
if isinstance(base_element, OrderIndicator):
if isinstance(self._element, Choice):
element = base_element.clone(self._element.name)
element.append(self._element)
elif isinstance(element, OrderIndicator):
for item in reversed(base_element):
element.insert(0, item)
elif isinstance(self._element, Group):
raise NotImplementedError('TODO')
@ -528,73 +374,3 @@ class ComplexType(Type):
if len(depth) > 1:
value = '{%s}' % value
return value
class ListType(SimpleType):
"""Space separated list of simpleType values"""
def __init__(self, item_type):
self.item_type = item_type
super(ListType, self).__init__()
def __call__(self, value):
return value
def render(self, parent, value):
parent.text = self.xmlvalue(value)
def resolve(self):
self.item_type = self.item_type.resolve()
self.base_class = self.item_type.__class__
return self
def xmlvalue(self, value):
item_type = self.item_type
return ' '.join(item_type.xmlvalue(v) for v in value)
def pythonvalue(self, value):
if not value:
return []
item_type = self.item_type
return [item_type.pythonvalue(v) for v in value.split()]
def signature(self, depth=()):
return self.item_type.signature(depth) + '[]'
class UnionType(SimpleType):
def __init__(self, item_types):
self.item_types = item_types
self.item_class = None
assert item_types
super(UnionType, self).__init__(None)
def resolve(self):
from zeep.xsd.builtins import _BuiltinType
self.item_types = [item.resolve() for item in self.item_types]
base_class = get_base_class(self.item_types)
if issubclass(base_class, _BuiltinType) and base_class != _BuiltinType:
self.item_class = base_class
return self
def signature(self, depth=()):
return ''
def parse_xmlelement(self, xmlelement, schema=None, allow_none=True,
context=None):
if self.item_class:
return self.item_class().parse_xmlelement(
xmlelement, schema, allow_none, context)
return xmlelement.text
def pythonvalue(self, value):
if self.item_class:
return self.item_class().pythonvalue(value)
return value
def xmlvalue(self, value):
if self.item_class:
return self.item_class().xmlvalue(value)
return value

View File

@ -0,0 +1,84 @@
import logging
import six
from lxml import etree
from zeep.exceptions import ValidationError
from zeep.xsd.const import NS_XSD, xsd_ns
from zeep.xsd.types.any import AnyType
logger = logging.getLogger(__name__)
__all__ = ['AnySimpleType']
@six.python_2_unicode_compatible
class AnySimpleType(AnyType):
_default_qname = xsd_ns('anySimpleType')
def __init__(self, qname=None, is_global=False):
super(AnySimpleType, self).__init__(
qname or etree.QName(self._default_qname), is_global)
def __call__(self, *args, **kwargs):
"""Return the xmlvalue for the given value.
Expects only one argument 'value'. The args, kwargs handling is done
here manually so that we can return readable error messages instead of
only '__call__ takes x arguments'
"""
num_args = len(args) + len(kwargs)
if num_args != 1:
raise TypeError((
'%s() takes exactly 1 argument (%d given). ' +
'Simple types expect only a single value argument'
) % (self.__class__.__name__, num_args))
if kwargs and 'value' not in kwargs:
raise TypeError((
'%s() got an unexpected keyword argument %r. ' +
'Simple types expect only a single value argument'
) % (self.__class__.__name__, next(six.iterkeys(kwargs))))
value = args[0] if args else kwargs['value']
return self.xmlvalue(value)
def __eq__(self, other):
return (
other is not None and
self.__class__ == other.__class__ and
self.__dict__ == other.__dict__)
def __str__(self):
return '%s(value)' % (self.__class__.__name__)
def parse_xmlelement(self, xmlelement, schema=None, allow_none=True,
context=None):
if xmlelement.text is None:
return
try:
return self.pythonvalue(xmlelement.text)
except (TypeError, ValueError):
logger.exception("Error during xml -> python translation")
return None
def pythonvalue(self, xmlvalue):
raise NotImplementedError(
'%s.pytonvalue() not implemented' % self.__class__.__name__)
def render(self, parent, value, xsd_type=None, render_path=None):
parent.text = self.xmlvalue(value)
def signature(self, depth=()):
if self.qname.namespace == NS_XSD:
return 'xsd:%s' % self.name
return self.name
def validate(self, value, required=False):
if required and value is None:
raise ValidationError("Value is required")
def xmlvalue(self, value):
raise NotImplementedError(
'%s.xmlvalue() not implemented' % self.__class__.__name__)

View File

@ -1,4 +1,10 @@
from defusedxml.lxml import fromstring
from lxml import etree
from six.moves import range
from six.moves.urllib.parse import urlparse
from zeep.exceptions import XMLSyntaxError
from zeep.parser import absolute_location
class NamePrefixGenerator(object):
@ -25,9 +31,41 @@ class UniqueNameGenerator(object):
return name
def max_occurs_iter(max_occurs):
class ImportResolver(etree.Resolver):
"""Custom lxml resolve to use the transport object"""
def __init__(self, transport):
self.transport = transport
def resolve(self, url, pubid, context):
if urlparse(url).scheme in ('http', 'https'):
content = self.transport.load(url)
return self.resolve_string(content, context)
def parse_xml(content, transport, base_url=None):
parser = etree.XMLParser(remove_comments=True, resolve_entities=False)
parser.resolvers.add(ImportResolver(transport))
try:
return fromstring(content, parser=parser, base_url=base_url)
except etree.XMLSyntaxError as exc:
raise XMLSyntaxError("Invalid XML content received (%s)" % exc.message)
def load_external(url, transport, base_url=None):
if base_url:
url = absolute_location(url, base_url)
response = transport.load(url)
return parse_xml(response, transport, base_url)
def max_occurs_iter(max_occurs, items=None):
assert max_occurs is not None
if max_occurs == 'unbounded':
return range(0, 2**31-1)
generator = range(0, max_occurs if max_occurs != 'unbounded' else 2**31-1)
if items is not None:
for i, sub_kwargs in zip(generator, items):
yield sub_kwargs
else:
return range(max_occurs)
for i in generator:
yield i

View File

@ -1,14 +1,18 @@
import copy
from collections import OrderedDict
import six
from zeep.xsd.printer import PrettyPrinter
__all__ = ['AnyObject', 'CompoundValue']
class AnyObject(object):
"""Create an any object
:param xsd_object: the xsd type
:param value: The value
"""
def __init__(self, xsd_object, value):
self.xsd_obj = xsd_object
self.value = value

View File

@ -1,20 +1,17 @@
import keyword
import logging
import re
import warnings
from lxml import etree
from zeep import exceptions
from zeep.exceptions import XMLParseError, ZeepWarning
from zeep.exceptions import XMLParseError
from zeep.parser import absolute_location
from zeep.utils import as_qname, qname_attr
from zeep.xsd import builtins as xsd_builtins
from zeep.xsd import elements as xsd_elements
from zeep.xsd import indicators as xsd_indicators
from zeep.xsd import types as xsd_types
from zeep.xsd.const import xsd_ns
from zeep.xsd.parser import load_external
from zeep.xsd.utils import load_external
logger = logging.getLogger(__name__)
@ -41,10 +38,9 @@ class SchemaVisitor(object):
types in the given schema.
"""
def __init__(self, document, parser_context=None):
def __init__(self, schema, document):
self.document = document
self.schema = document._schema
self.parser_context = parser_context
self.schema = schema
self._includes = set()
def process(self, node, parent):
@ -131,25 +127,17 @@ class SchemaVisitor(object):
if not namespace and not self.document._target_namespace:
raise XMLParseError(
"The attribute 'namespace' must be existent if the "
"importing schema has no target namespace.")
"importing schema has no target namespace.",
filename=self._document.location,
sourceline=node.sourceline)
# Check if the schema is already imported before based on the
# namespace. Schema's without namespace are registered as 'None'
schema = self.parser_context.schema_objects.get(namespace)
if schema:
if location and schema._location != location:
# Use same warning message as libxml2
message = (
"Skipping import of schema located at %r " +
"for the namespace %r, since the namespace was " +
"already imported with the schema located at %r"
) % (location, namespace or '(null)', schema._location)
warnings.warn(message, ZeepWarning, stacklevel=6)
return
document = self.schema._get_schema_document(namespace, location)
if document:
logger.debug("Returning existing schema: %r", location)
self.document._imports[namespace] = schema
return schema
self.document.register_import(namespace, document)
return document
# Hardcode the mapping between the xml namespace and the xsd for now.
# This seems to fix issues with exchange wsdl's, see #220
@ -165,8 +153,7 @@ class SchemaVisitor(object):
return
# Load the XML
schema_node = load_external(
location, self.document._transport, self.parser_context)
schema_node = load_external(location, self.schema._transport)
# Check if the xsd:import namespace matches the targetNamespace. If
# the xsd:import statement didn't specify a namespace then make sure
@ -176,28 +163,12 @@ class SchemaVisitor(object):
raise XMLParseError((
"The namespace defined on the xsd:import doesn't match the "
"imported targetNamespace located at %r "
) % (location))
elif schema_tns in self.parser_context.schema_objects:
schema = self.parser_context.schema_objects.get(schema_tns)
message = (
"Skipping import of schema located at %r " +
"for the namespace %r, since the namespace was " +
"already imported with the schema located at %r"
) % (location, namespace or '(null)', schema._location)
warnings.warn(message, ZeepWarning, stacklevel=6)
) % (location),
filename=self.document._location,
sourceline=node.sourceline)
# If this schema location is 'internal' then retrieve the original
# location since that is used as base url for sub include/imports
if location in self.parser_context.schema_locations:
base_url = self.parser_context.schema_locations[location]
else:
base_url = location
schema = self.document.__class__(
schema_node, self.document._transport, self.schema, location,
self.parser_context, base_url)
self.document._imports[namespace] = schema
schema = self.schema.create_new_document(schema_node, location)
self.document.register_import(namespace, schema)
return schema
def visit_include(self, node, parent):
@ -217,8 +188,7 @@ class SchemaVisitor(object):
return
schema_node = load_external(
location, self.document._transport, self.parser_context,
base_url=self.document._base_url)
location, self.schema._transport, base_url=self.document._base_url)
self._includes.add(location)
return self.visit_schema(schema_node)
@ -288,7 +258,7 @@ class SchemaVisitor(object):
if node_type:
xsd_type = self._get_type(node_type.text)
else:
xsd_type = xsd_builtins.AnyType()
xsd_type = xsd_types.AnyType()
# Naive workaround to mark fields which are part of a choice element
# as optional
@ -360,7 +330,7 @@ class SchemaVisitor(object):
if node_type:
xsd_type = self._get_type(node_type)
else:
xsd_type = xsd_builtins.AnyType()
xsd_type = xsd_types.AnyType()
# TODO: We ignore 'prohobited' for now
required = node.get('use') == 'required'
@ -690,7 +660,7 @@ class SchemaVisitor(object):
tags.group, tags.sequence
]
min_occurs, max_occurs = _process_occurs_attrs(node)
result = xsd_indicators.Sequence(
result = xsd_elements.Sequence(
min_occurs=min_occurs, max_occurs=max_occurs)
annotation, items = self._pop_annotation(node.getchildren())
@ -719,7 +689,7 @@ class SchemaVisitor(object):
sub_types = [
tags.annotation, tags.element
]
result = xsd_indicators.All()
result = xsd_elements.All()
for child in node.iterchildren():
assert child.tag in sub_types, child
@ -757,7 +727,7 @@ class SchemaVisitor(object):
child = children[0]
item = self.process(child, parent)
elm = xsd_indicators.Group(name=qname, child=item)
elm = xsd_elements.Group(name=qname, child=item)
if parent.tag == tags.schema:
self.document.register_group(qname, elm)
@ -804,7 +774,7 @@ class SchemaVisitor(object):
for child in children:
elm = self.process(child, node)
choices.append(elm)
return xsd_indicators.Choice(
return xsd_elements.Choice(
choices, min_occurs=min_occurs, max_occurs=max_occurs)
def visit_union(self, node, parent):
@ -922,7 +892,7 @@ class SchemaVisitor(object):
# referenced.
if (
name.namespace == 'http://schemas.xmlsoap.org/soap/encoding/' and
name.namespace not in self.document._imports
not self.document.is_imported(name.namespace)
):
import_node = etree.Element(
tags.import_,
@ -943,11 +913,14 @@ class SchemaVisitor(object):
def _process_attributes(self, node, items):
attributes = []
for child in items:
attribute = self.process(child, node)
if child.tag in (tags.attribute, tags.attributeGroup, tags.anyAttribute):
attribute = self.process(child, node)
attributes.append(attribute)
else:
raise XMLParseError("Unexpected tag: %s" % child.tag)
raise XMLParseError(
"Unexpected tag `%s`" % (child.tag),
filename=self.document._location,
sourceline=node.sourceline)
return attributes
visitors = {

54
tests/cert_valid.pem Normal file
View File

@ -0,0 +1,54 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAyrWxlRtnAT7q2IgWEaQUCwG+0gNuqTNPUi3fLZ/7XI7lOBem
TT3YeCZc7fbTLrABH8l4NEdXG58CFa/Mkdsm05Xt4G3avrYT+CNZA3tOf42TsGIB
5/Vvqd9LS4uhBUo2qqcTUz1VA8jP7CKY25dq8RLEd1gw3lLjPGqYa69sUkEYzABC
uJfP9xUEySLeclxWx+ky2Pp8AAXPpDtlGdb9rszxYhSJtce/KEsbZqaCQ9a2ZAJe
FauJiUvODPNigs8vUQLGNYHcmEm4SvVkSSgvpKekAk1gC51OqLYypp8c916n1DG2
PC5b9kTj6mUXkBoa9jjWJQYkE8zG5wbaM7xAGwIDAQABAoIBAQCPlGqYRrSK+Vm2
zY10KVfZA/95Gd1EE4fXmY4+7tZIFR7ewEGW7HtrfyDLnMywgRIKxdVSkkVs1w/O
9JpdpXC25bd8A9OwyZ8TX1YpVSmgx1MY2BKpjfrtw6+9bsU6zfoynezeRM72w0Ii
686Bm5qv7q8iKWFT2DoEDSyw+awsBZQokVTCwHFWdbXZ50mAXoXxovn19DTRNqzD
yqO8dae9gjk16vap7gRpB60Y/YZ4Rf46X47SlRqTcqgEB/C/1jyGtl3jQlaLq4KL
POe1jFZYGUZTctmRvsol4VdSzfITqr/kd3DhJw0LxvXnT6c02wxzKLCSo2HnN6HT
A7l6eEWhAoGBAPZ46R8qPln9uGhbdYjtEjR+vxDhQcuCNkjn40psEOyXu62cFhFO
FSj3lVCyRCHIhrUWUzJIQTIPDnczH7iwrWZlqUujjYKs3DJcpu7J5B4ODatklXO+
2NZa45XEto6ygOPUp7OYZhLlGpjWnC2yp0XLqAEC0URkc1zOTTfJ0VFNAoGBANKL
tXPJLOZ2F1e3IPkX6y1hfbfbRlyuA2vai/2cAhbld4oZIpm7Yy6Jw4BFuDaUs02P
nDGBBh6EVgbZNZphZEUhgvglSdJaa2/3cS+1pGcnjmYMj4xywHpOxiomgZ8Xa1LW
ZuJdD2SajS0yPYcrEDg+xBQBvDpE0NEIka6Zu6MHAoGBAMVbKegPjl/GvuupGGMs
2Z/5QYsFpAaN3GPicmh8Qc0A7oHkcvMmX+Eu5nv4Un/urpbAKpwfqTypO78My8C6
kA5nJvlvG/ff7G3TLMQWGzhJrn5oCxfkYIK7wnKBUmDO5FAKTsKLLGjC1No/Nk2N
OU209nDgzaqC+LD+bGxYiOgdAoGAWFtXD7s6Q5EFZMMubDqUcFv8dV7pHVXNi8KQ
gyKoYdF0pBi+Q4O3ML2RtNANaaJnyMHey4uY9M+WhpM7AomimbxhiR+k5kkZ00gl
UN9Kmhuoj7zvtQInMmzCjsfQF+KtIHtne9GP9ylA29m8pm/1A5WblcXQpydf9olB
EePkMZsCgYABr07cGT31CXxrbQwDTgiQJm2JHq/wIR+q0eys1aiMvKRN+0arfqvz
j8zPK6B9SRcCXY4XAda3rilsF/7eHf2zkg/0kHV6NqaSWFEA8yqAoIqpc03cE/ef
lUgGakZ6Wb0sucIRB40loAZIu0lN0kF45K1P8JDHg74jk6uM2xnZvg==
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIIEqjCCA5KgAwIBAgIJALOCBen0S+W5MA0GCSqGSIb3DQEBBQUAMIGUMQswCQYD
VQQGEwJOTDEQMA4GA1UECBMHVXRyZWNodDEQMA4GA1UEBxMHVXRyZWNodDEeMBwG
A1UEChMVTWljaGFlbCB2YW4gVGVsbGluZ2VuMRwwGgYDVQQDExN3d3cucHl0aG9u
LXplZXAub3JnMSMwIQYJKoZIhvcNAQkBFhRpbmZvQHB5dGhvbi16ZWVwLm9yZzAe
Fw0xNzAxMjUxOTI3NTJaFw0yNzAxMjMxOTI3NTJaMIGUMQswCQYDVQQGEwJOTDEQ
MA4GA1UECBMHVXRyZWNodDEQMA4GA1UEBxMHVXRyZWNodDEeMBwGA1UEChMVTWlj
aGFlbCB2YW4gVGVsbGluZ2VuMRwwGgYDVQQDExN3d3cucHl0aG9uLXplZXAub3Jn
MSMwIQYJKoZIhvcNAQkBFhRpbmZvQHB5dGhvbi16ZWVwLm9yZzCCASIwDQYJKoZI
hvcNAQEBBQADggEPADCCAQoCggEBAMq1sZUbZwE+6tiIFhGkFAsBvtIDbqkzT1It
3y2f+1yO5TgXpk092HgmXO320y6wAR/JeDRHVxufAhWvzJHbJtOV7eBt2r62E/gj
WQN7Tn+Nk7BiAef1b6nfS0uLoQVKNqqnE1M9VQPIz+wimNuXavESxHdYMN5S4zxq
mGuvbFJBGMwAQriXz/cVBMki3nJcVsfpMtj6fAAFz6Q7ZRnW/a7M8WIUibXHvyhL
G2amgkPWtmQCXhWriYlLzgzzYoLPL1ECxjWB3JhJuEr1ZEkoL6SnpAJNYAudTqi2
MqafHPdep9QxtjwuW/ZE4+plF5AaGvY41iUGJBPMxucG2jO8QBsCAwEAAaOB/DCB
+TAdBgNVHQ4EFgQUxd12m9nIS0QO4uIPRy7oerPyVygwgckGA1UdIwSBwTCBvoAU
xd12m9nIS0QO4uIPRy7oerPyVyihgZqkgZcwgZQxCzAJBgNVBAYTAk5MMRAwDgYD
VQQIEwdVdHJlY2h0MRAwDgYDVQQHEwdVdHJlY2h0MR4wHAYDVQQKExVNaWNoYWVs
IHZhbiBUZWxsaW5nZW4xHDAaBgNVBAMTE3d3dy5weXRob24temVlcC5vcmcxIzAh
BgkqhkiG9w0BCQEWFGluZm9AcHl0aG9uLXplZXAub3JnggkAs4IF6fRL5bkwDAYD
VR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOCAQEAHTUp/i9FYbvl86By7EvMlZeK
v6I38IYcrIGzDdbrk8KkilYv7p2Ll8gUJYRFj96iX6Uvn0ACTutFJW9xE2ytBMOu
UurTBpcpk8k368gfO/fGVi6HzjyFqTnhLkmd3CADIzPN/yg5j2q+mgA3ys6wISBR
aDJR2jGt9sTAkAwkVJdDCFkCwyRfB28mBRnI5SLeR5vQyLT97THPma39xR3FaqYv
h2q3coXBnaOOcuigiKyIynhJtXH42XlN3TM23b9NK2Oep2e51pxst3uohlDGmB/W
uzx/hG+kNxy9D+Ms7qNL9+i4nHFOoR034RB/NGTChzTxq2JcXIKPWIo2tslNsg==
-----END CERTIFICATE-----

58
tests/cert_valid_pw.pem Normal file
View File

@ -0,0 +1,58 @@
-----BEGIN RSA PRIVATE KEY-----
Proc-Type: 4,ENCRYPTED
DEK-Info: DES-EDE3-CBC,415D98F4452C4F85
5QDgVsTX78dXETR9RfVoAPXA3M0WKB6YibDZCavgMXBorWxOAaQz+chSehaJHHsi
ZYWgxSUYsiZk5NVZLi3Pidg3zx8G1HWThxRmWakiEqseVTX9B/H2h1AEwd7AQ+5K
kbLXbZ/CGTSkrVA+IjEWRtSjigaNfrDiyNO6YfLWjZrYk8mC2EVRnMD7LGDPL4E0
rD9sFT4zAovZEe+WBGCWjzRiBMgIvY+4frsX3u9MkTYFxpNMkdDozUJu5sxr0asB
68fSfLo2lDYuH9azWQe6aeMLkW/1QalRsYwGurZ/PW6DEnfkhN0ikdz0vpGARohL
if+7wsSxKU6MRd5GntMqQ3uE+C5GxoiJ0T51+b/MDpo7hwhnhAqLvjPZNAcOtOip
krLI3BPiyraBkY+km3EheDxX3XjGIl381577WO4HxKUtcT1KOYBLgN1VVdgrrlt3
mLO9kexl6h57bi3OZxxanuX76zE7gvwVLgagMY/8Ek2TJJXhSne5YIrC5WPT1bQv
nFenU8VzcF0xf02vTCos9Os6obeSJr3Lzz6StIj+WVytMD/QmfvG/0TCtlmq+av4
VR7OWgnNcXWXPsoy6+8JwZLZGz3+Q2tav2JjGOCHneTIlgRyBKQX7sT6M8DGcLYf
mZgysrA048Hh3bcPmk4W7p4/dHZBU1HR6KT34wKjMuLIEXuMVITfBK+Gv6Is9Q3h
sVWJ2ChGcTkAIyqIaRglU0QBKUgOs4q2HsAxrefOXStaKlmawDjttFHhPveyKXCr
NSqNMwK16g+9S1LcxCNoX8y1qTDObhGO/A3uTm81RcDN4gzQiZNd6ZM5ynuPyAAn
8vACDiAXaf1SnAJjBuuqKm3dTXQgu+Ise7KmQXoherzHoXUZa0T49hCowb+WOJsB
ljmEoehxVAPrgs8GmzvpSiJfCw2NgowaG75wMt8zTKAGgIht+R5xRNqp9g6Qgyn8
0Z4E6JTZ8JRN9FtutUc0NKLuwiai2F4qwUzmOTPLiFTT6sAQrDX+JSmezPAVbtw0
rchz/p1jCS+V0k78D/UwV/mGkXeqPUn370MVK0boPfi1ZrjASi6/ztaB7nza7QKP
LCoZC7P6cbWqJG3g4MUxR/zDIdLF6XGBuSp2i6GmHlIi464AIv+O0Oz1ftIC/Xt2
HZQoL7KW9NeU2WdIkczxQnPc/DucK/dkyKzzxLDr7ztU9E4maYTNd4Ml6B2azysY
gyvAKq9ui8zYAZt096mACUSLj1/yNZKwlGUuyeq3XLaeqsHAGecEJsg3smnJ39Rd
B4RSx3BDEQ4GiS1zNUaxs5KCCqtV4iylYWdUu6Y9H/koLuj04/c9qOjYCzIDDgTS
M8asTkZIjXeknBb+RSTYj48Q1Vom2zjFhunPKvPmSgfdDGzX24ipagwZvmzLxlg1
6edI75EBMMetVVwMWWe3ZU/AlERe/QVNBZUO2TwiVM2VvIVkaaa1pFYDv/eWeGLD
HdJ1Hoz7O/gX6eKVY6OJu3nIwAXDTutxYC1VpZ6a48OrOWHuz352wkmw7aGcn0GS
ZYjbizCZckvuDKk8PFpGbF7gPmaxtVKx/hijoqRJlRIJq9pSXTxbq7HEXPKq9dtI
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIIEqjCCA5KgAwIBAgIJALOCBen0S+W5MA0GCSqGSIb3DQEBBQUAMIGUMQswCQYD
VQQGEwJOTDEQMA4GA1UECBMHVXRyZWNodDEQMA4GA1UEBxMHVXRyZWNodDEeMBwG
A1UEChMVTWljaGFlbCB2YW4gVGVsbGluZ2VuMRwwGgYDVQQDExN3d3cucHl0aG9u
LXplZXAub3JnMSMwIQYJKoZIhvcNAQkBFhRpbmZvQHB5dGhvbi16ZWVwLm9yZzAe
Fw0xNzAxMjUxOTI3NTJaFw0yNzAxMjMxOTI3NTJaMIGUMQswCQYDVQQGEwJOTDEQ
MA4GA1UECBMHVXRyZWNodDEQMA4GA1UEBxMHVXRyZWNodDEeMBwGA1UEChMVTWlj
aGFlbCB2YW4gVGVsbGluZ2VuMRwwGgYDVQQDExN3d3cucHl0aG9uLXplZXAub3Jn
MSMwIQYJKoZIhvcNAQkBFhRpbmZvQHB5dGhvbi16ZWVwLm9yZzCCASIwDQYJKoZI
hvcNAQEBBQADggEPADCCAQoCggEBAMq1sZUbZwE+6tiIFhGkFAsBvtIDbqkzT1It
3y2f+1yO5TgXpk092HgmXO320y6wAR/JeDRHVxufAhWvzJHbJtOV7eBt2r62E/gj
WQN7Tn+Nk7BiAef1b6nfS0uLoQVKNqqnE1M9VQPIz+wimNuXavESxHdYMN5S4zxq
mGuvbFJBGMwAQriXz/cVBMki3nJcVsfpMtj6fAAFz6Q7ZRnW/a7M8WIUibXHvyhL
G2amgkPWtmQCXhWriYlLzgzzYoLPL1ECxjWB3JhJuEr1ZEkoL6SnpAJNYAudTqi2
MqafHPdep9QxtjwuW/ZE4+plF5AaGvY41iUGJBPMxucG2jO8QBsCAwEAAaOB/DCB
+TAdBgNVHQ4EFgQUxd12m9nIS0QO4uIPRy7oerPyVygwgckGA1UdIwSBwTCBvoAU
xd12m9nIS0QO4uIPRy7oerPyVyihgZqkgZcwgZQxCzAJBgNVBAYTAk5MMRAwDgYD
VQQIEwdVdHJlY2h0MRAwDgYDVQQHEwdVdHJlY2h0MR4wHAYDVQQKExVNaWNoYWVs
IHZhbiBUZWxsaW5nZW4xHDAaBgNVBAMTE3d3dy5weXRob24temVlcC5vcmcxIzAh
BgkqhkiG9w0BCQEWFGluZm9AcHl0aG9uLXplZXAub3JnggkAs4IF6fRL5bkwDAYD
VR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOCAQEAHTUp/i9FYbvl86By7EvMlZeK
v6I38IYcrIGzDdbrk8KkilYv7p2Ll8gUJYRFj96iX6Uvn0ACTutFJW9xE2ytBMOu
UurTBpcpk8k368gfO/fGVi6HzjyFqTnhLkmd3CADIzPN/yg5j2q+mgA3ys6wISBR
aDJR2jGt9sTAkAwkVJdDCFkCwyRfB28mBRnI5SLeR5vQyLT97THPma39xR3FaqYv
h2q3coXBnaOOcuigiKyIynhJtXH42XlN3TM23b9NK2Oep2e51pxst3uohlDGmB/W
uzx/hG+kNxy9D+Ms7qNL9+i4nHFOoR034RB/NGTChzTxq2JcXIKPWIo2tslNsg==
-----END CERTIFICATE-----

View File

@ -1,5 +1,13 @@
import sys
import pytest
# Don't try to test asyncio since it is py3 only syntax
if sys.version_info < (3, 5):
collect_ignore = [
'test_asyncio_transport.py'
]
pytest.register_assert_rewrite('tests.utils')
@ -12,3 +20,14 @@ def no_requests(request, monkeypatch):
pytest.fail("External connections not allowed during tests.")
monkeypatch.setattr("socket.socket", func)
@pytest.yield_fixture()
def event_loop():
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()

View File

@ -0,0 +1,41 @@
import pytest
from pretend import stub
from lxml import etree
from aioresponses import aioresponses
from zeep import cache, asyncio
@pytest.mark.requests
def test_no_cache(event_loop):
transport = asyncio.AsyncTransport(loop=event_loop)
assert transport.cache is None
@pytest.mark.requests
def test_load(event_loop):
cache = stub(get=lambda url: None, add=lambda url, content: None)
transport = asyncio.AsyncTransport(loop=event_loop, cache=cache)
with aioresponses() as m:
m.get('http://tests.python-zeep.org/test.xml', body='x')
result = transport.load('http://tests.python-zeep.org/test.xml')
assert result == b'x'
@pytest.mark.requests
@pytest.mark.asyncio
async def test_post(event_loop):
cache = stub(get=lambda url: None, add=lambda url, content: None)
transport = asyncio.AsyncTransport(loop=event_loop, cache=cache)
envelope = etree.Element('Envelope')
with aioresponses() as m:
m.post('http://tests.python-zeep.org/test.xml', body='x')
result = await transport.post_xml(
'http://tests.python-zeep.org/test.xml',
envelope=envelope,
headers={})
assert result.content == b'x'

View File

@ -2,12 +2,10 @@ import os
import pytest
import requests_mock
from lxml import etree
from zeep import client
from zeep import xsd
from zeep.exceptions import Error
from tests.utils import load_xml
from zeep import client, xsd
from zeep.exceptions import Error
def test_bind():
@ -45,6 +43,13 @@ def test_service_proxy_non_existing():
assert client_obj.service.NonExisting
def test_open_from_file_object():
with open('tests/wsdl_files/soap_transport_err.wsdl', 'rb') as fh:
client_obj = client.Client(fh)
service = client_obj.bind()
assert service
def test_client_no_wsdl():
with pytest.raises(ValueError):
client.Client(None)

View File

@ -1,7 +1,12 @@
import datetime
from collections import OrderedDict
from lxml import etree
from tests.utils import assert_nodes_equal, load_xml
from tests.utils import assert_nodes_equal, load_xml, render_node
from zeep import xsd
from six import binary_type
from zeep import helpers
from zeep.helpers import serialize_object
@ -147,3 +152,51 @@ def test_serialize_any_array():
assert result == {
'_value_1': [any_obj],
}
def test_create_xml_soap_map():
data = OrderedDict([
('text', u'String'),
('bytes', b'Bytes'),
('boolean', True),
('integer', 100),
('float', 100.1234),
('datetime', datetime.datetime(2017, 10, 28, 12, 30, 10)),
('date', datetime.date(2016, 1, 14)),
])
value = helpers.create_xml_soap_map(data)
expected = """
<document>
<item>
<key xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:string">text</key>
<value xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:string">String</value>
</item>
<item>
<key xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:string">bytes</key>
<value xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:string">Bytes</value>
</item>
<item>
<key xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:string">boolean</key>
<value xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:boolean">true</value>
</item>
<item>
<key xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:string">integer</key>
<value xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:integer">100</value>
</item>
<item>
<key xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:string">float</key>
<value xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:float">100.1234</value>
</item>
<item>
<key xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:string">datetime</key>
<value xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:dateTime">2017-10-28T12:30:10</value>
</item>
<item>
<key xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:string">date</key>
<value xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="xs:date">2016-01-14</value>
</item>
</document>
""" # noqa
node = render_node(value._xsd_type, value)
assert_nodes_equal(expected, node)

View File

@ -28,4 +28,4 @@ def test_main_extract_auth(monkeypatch):
assert mock_transport.call_count == 1
args, kwargs = mock_transport.call_args
assert kwargs['http_auth'] == ('user', 'secret')
assert kwargs['session'].auth == ('user', 'secret')

View File

@ -8,10 +8,10 @@ def test_dict():
'foo_2': 'bar',
'foo_3': 'bar',
'foo_4': {
'bar': '1',
'foo': '1',
'bar': {
'bala': 'qwe',
},
},
'x': [1, 2, 3, 4],
'y': [],
}

View File

@ -5,11 +5,6 @@ from pretend import stub
from zeep import cache, transports
@pytest.mark.requests
def test_default_cache():
transport = transports.Transport()
assert isinstance(transport.cache, cache.SqliteCache)
@pytest.mark.requests
def test_no_cache():
@ -17,6 +12,12 @@ def test_no_cache():
assert transport.cache is None
@pytest.mark.requests
def test_custom_cache():
transport = transports.Transport(cache=cache.SqliteCache())
assert isinstance(transport.cache, cache.SqliteCache)
@pytest.mark.requests
def test_load():
cache = stub(get=lambda url: None, add=lambda url, content: None)

View File

@ -4,7 +4,7 @@ from pretend import stub
from six import StringIO
from tests.utils import DummyTransport, assert_nodes_equal
from zeep import wsa, wsdl, Client
from zeep import Client, wsa, wsdl
def test_require_wsa(recwarn, monkeypatch):

View File

@ -7,8 +7,7 @@ from pretend import stub
from six import StringIO
from tests.utils import DummyTransport, assert_nodes_equal
from zeep import wsdl
from zeep import Client
from zeep import Client, wsdl
from zeep.transports import Transport
@ -36,6 +35,7 @@ def test_parse_soap_wsdl():
m.post('http://example.com/stockquote', text=response)
account_type = client.get_type('stoc:account')
account = account_type(id=100)
account.user = 'mvantellingen'
country = client.get_element('stoc:country').type()
country.name = 'The Netherlands'
country.code = 'NL'
@ -58,7 +58,7 @@ def test_parse_soap_wsdl():
<tickerSymbol>foobar</tickerSymbol>
<account>
<id>100</id>
<user/>
<user>mvantellingen</user>
</account>
<stoc:country>
<name>The Netherlands</name>
@ -324,10 +324,12 @@ def test_create_import_schema(recwarn):
""".strip())
transport = DummyTransport()
transport.bind('a.xsd', schema_node_a)
transport.bind('b.xsd', schema_node_b)
transport.bind('http://tests.python-zeep.org/a.xsd', schema_node_a)
transport.bind('http://tests.python-zeep.org/b.xsd', schema_node_b)
document = wsdl.Document(
content, transport, 'http://tests.python-zeep.org/content.wsdl')
document = wsdl.Document(content, transport)
assert len(recwarn) == 0
assert document.types.get_element('{http://tests.python-zeep.org/b}global')
@ -366,10 +368,11 @@ def test_wsdl_imports_xsd(recwarn):
""".strip())
transport = DummyTransport()
transport.bind('a.xsd', schema_node_a)
transport.bind('b.xsd', schema_node_b)
transport.bind('http://tests.python-zeep.org/a.xsd', schema_node_a)
transport.bind('http://tests.python-zeep.org/b.xsd', schema_node_b)
wsdl.Document(content, transport)
wsdl.Document(
content, transport, 'http://tests.python-zeep.org/content.wsdl')
def test_import_schema_without_location(recwarn):
@ -445,10 +448,11 @@ def test_import_schema_without_location(recwarn):
""".strip())
transport = DummyTransport()
transport.bind('a.xsd', schema_node_a)
transport.bind('b.xsd', schema_node_b)
transport.bind('http://tests.python-zeep.org/a.xsd', schema_node_a)
transport.bind('http://tests.python-zeep.org/b.xsd', schema_node_b)
document = wsdl.Document(content, transport)
document = wsdl.Document(
content, transport, 'http://tests.python-zeep.org/content.wsdl')
assert len(recwarn) == 0
assert document.types.get_type('{http://tests.python-zeep.org/b}foo')
@ -826,3 +830,73 @@ def test_parse_operation_empty_nodes():
""".strip())
assert wsdl.Document(content, None)
def test_wsdl_duplicate_tns(recwarn):
wsdl_main = StringIO("""
<?xml version="1.0"?>
<wsdl:definitions
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:tns="http://tests.python-zeep.org/xsd-main"
xmlns:sec="http://tests.python-zeep.org/wsdl-secondary"
xmlns:xsd-sec="http://tests.python-zeep.org/xsd-secondary"
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
xmlns:wsdlsoap="http://schemas.xmlsoap.org/wsdl/soap/"
targetNamespace="http://tests.python-zeep.org/xsd-main">
<wsdl:import namespace="http://tests.python-zeep.org/xsd-main"
location="http://tests.python-zeep.org/schema-2.wsdl"/>
<wsdl:binding name="TestBinding" type="tns:TestPortType">
<soap:binding style="document" transport="http://schemas.xmlsoap.org/soap/http"/>
<wsdl:operation name="TestOperation1">
<soap:operation soapAction=""/>
<wsdl:input>
<soap:body use="literal"/>
</wsdl:input>
</wsdl:operation>
</wsdl:binding>
<wsdl:service name="TestService">
<wsdl:documentation>Test service</wsdl:documentation>
<wsdl:port name="TestPortType" binding="tns:TestBinding">
<soap:address location="http://tests.python-zeep.org/test"/>
</wsdl:port>
</wsdl:service>
</wsdl:definitions>
""".strip())
wsdl_2 = ("""
<?xml version="1.0"?>
<wsdl:definitions
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:tns="http://tests.python-zeep.org/xsd-main"
xmlns:mine="http://tests.python-zeep.org/xsd-secondary"
xmlns:wsdlsoap="http://schemas.xmlsoap.org/wsdl/soap/"
targetNamespace="http://tests.python-zeep.org/xsd-main">
<wsdl:types>
<xsd:schema
targetNamespace="http://tests.python-zeep.org/xsd-main"
xmlns:tns="http://tests.python-zeep.org/xsd-main">
<xsd:element name="input" type="xsd:string"/>
</xsd:schema>
</wsdl:types>
<wsdl:message name="message-1">
<wsdl:part name="response" element="tns:input"/>
</wsdl:message>
<wsdl:portType name="TestPortType">
<wsdl:operation name="TestOperation1">
<wsdl:input message="message-1"/>
</wsdl:operation>
</wsdl:portType>
</wsdl:definitions>
""".strip())
transport = DummyTransport()
transport.bind('http://tests.python-zeep.org/schema-2.wsdl', wsdl_2)
document = wsdl.Document(wsdl_main, transport)
document.dump()

View File

@ -168,6 +168,15 @@ def test_complex_type_without_name():
</document>
"""
assert_nodes_equal(expected, node)
data = ArrayOfObject.parse_xmlelement(node, schema)
assert len(data._value_1) == 3
assert data._value_1[0]['attr_1'] == 'attr-1'
assert data._value_1[0]['attr_2'] == 'attr-2'
assert data._value_1[1]['attr_1'] == 'attr-3'
assert data._value_1[1]['attr_2'] == 'attr-4'
assert data._value_1[2]['attr_1'] == 'attr-5'
assert data._value_1[2]['attr_2'] == 'attr-6'
def test_soap_array_parse_remote_ns():

View File

@ -56,13 +56,13 @@ def test_parse():
assert operation.input.body.signature() == 'xsd:string'
assert operation.input.header.signature() == ''
assert operation.input.envelope.signature() == 'body: xsd:string, header: {}'
assert operation.input.envelope.signature() == 'body: xsd:string'
assert operation.input.signature(as_output=False) == 'xsd:string'
assert operation.output.body.signature() == 'xsd:string'
assert operation.output.header.signature() == ''
assert operation.output.envelope.signature() == 'body: xsd:string, header: {}'
assert operation.output.signature(as_output=True) == 'body: xsd:string, header: {}'
assert operation.output.envelope.signature() == 'body: xsd:string'
assert operation.output.signature(as_output=True) == 'xsd:string'
def test_empty_input_parse():
@ -113,7 +113,7 @@ def test_empty_input_parse():
assert operation.input.body.signature() == ''
assert operation.input.header.signature() == ''
assert operation.input.envelope.signature() == 'body: {}, header: {}'
assert operation.input.envelope.signature() == 'body: {}'
assert operation.input.signature(as_output=False) == ''
@ -251,9 +251,6 @@ def test_parse_with_header_type():
assert operation.output.signature(as_output=True) == 'body: xsd:string, header: {auth: ResponseHeaderType}' # noqa
def test_parse_with_header_other_message():
wsdl_content = StringIO("""
<definitions xmlns="http://schemas.xmlsoap.org/wsdl/"

View File

@ -153,7 +153,7 @@ def test_deserialize():
</soap-env:Body>
</soap-env:Envelope>
""")
assert operation.output.signature(True) == 'body: {result: xsd:string}, header: {}'
assert operation.output.signature(True) == 'result: xsd:string'
result = operation.output.deserialize(document)
assert result == 'ah1'
@ -381,7 +381,7 @@ def test_deserialize_rpc_literal():
assert deserialized == 'foobar'
def test_deserialize():
def test_deserialize_x():
wsdl_content = StringIO("""
<definitions xmlns="http://schemas.xmlsoap.org/wsdl/"
xmlns:tns="http://tests.python-zeep.org/tns"
@ -398,8 +398,10 @@ def test_deserialize():
<portType name="TestPortType">
<operation name="clearFoo">
<input wsam:Action="http://foo.services.example.com/Util/clearFooRequest" message="tns:clearFoo"/>
<output wsam:Action="http://foo.services.example.com/Util/clearFooResponse" message="tns:clearFooResponse"/>
<input wsam:Action="http://foo.services.example.com/Util/clearFooRequest"
message="tns:clearFoo"/>
<output wsam:Action="http://foo.services.example.com/Util/clearFooResponse"
message="tns:clearFooResponse"/>
</operation>
</portType>

View File

@ -1,6 +1,8 @@
from lxml import etree
from pretend import stub
from tests.utils import load_xml
from zeep import Client
from zeep.exceptions import Fault
from zeep.wsdl import bindings
@ -108,3 +110,60 @@ def test_soap12_process_error():
assert exc.subcodes[0].localname == 'fault-subcode1'
assert exc.subcodes[1].namespace == 'http://example.com/example2'
assert exc.subcodes[1].localname == 'fault-subcode2'
def test_mime_multipart():
data = '\r\n'.join(line.strip() for line in """
--MIME_boundary
Content-Type: text/xml; charset=UTF-8
Content-Transfer-Encoding: 8bit
Content-ID: <claim061400a.xml@claiming-it.com>
<?xml version='1.0' ?>
<SOAP-ENV:Envelope
xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/">
<SOAP-ENV:Body>
<claim:insurance_claim_auto id="insurance_claim_document_id"
xmlns:claim="http://schemas.risky-stuff.com/Auto-Claim">
<theSignedForm href="cid:claim061400a.tiff@claiming-it.com"/>
<theCrashPhoto href="cid:claim061400a.jpeg@claiming-it.com"/>
<!-- ... more claim details go here... -->
</claim:insurance_claim_auto>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>
--MIME_boundary
Content-Type: image/tiff
Content-Transfer-Encoding: base64
Content-ID: <claim061400a.tiff@claiming-it.com>
Li4uQmFzZTY0IGVuY29kZWQgVElGRiBpbWFnZS4uLg==
--MIME_boundary
Content-Type: image/jpeg
Content-Transfer-Encoding: binary
Content-ID: <claim061400a.jpeg@claiming-it.com>
...Raw JPEG image..
--MIME_boundary--
""".splitlines()).encode('utf-8')
client = Client('tests/wsdl_files/claim.wsdl')
binding = client.service._binding
response = stub(
status_code=200,
content=data,
headers={
'Content-Type': 'multipart/related; type="text/xml"; start="<claim061400a.xml@claiming-it.com>"; boundary="MIME_boundary"'
}
)
result = binding.process_reply(
client, binding.get('GetClaimDetails'), response)
assert result.root is None
assert len(result.attachments) == 2
assert result.attachments[0].content == b'...Base64 encoded TIFF image...'
assert result.attachments[1].content == b'...Raw JPEG image..'

View File

@ -0,0 +1,112 @@
import os
import sys
import pytest
from tests.utils import load_xml
from zeep.exceptions import SignatureVerificationFailed
from zeep import wsse
from zeep.wsse import signature
DS_NS = 'http://www.w3.org/2000/09/xmldsig#'
KEY_FILE = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'cert_valid.pem')
KEY_FILE_PW = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'cert_valid_pw.pem')
@pytest.mark.skipif(sys.platform == 'win32',
reason="does not run on windows")
def test_sign():
envelope = load_xml("""
<soapenv:Envelope
xmlns:tns="http://tests.python-zeep.org/"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/">
<soapenv:Header></soapenv:Header>
<soapenv:Body>
<tns:Function>
<tns:Argument>OK</tns:Argument>
</tns:Function>
</soapenv:Body>
</soapenv:Envelope>
""")
signature.sign_envelope(envelope, KEY_FILE, KEY_FILE)
signature.verify_envelope(envelope, KEY_FILE)
@pytest.mark.skipif(sys.platform == 'win32',
reason="does not run on windows")
def test_sign_pw():
envelope = load_xml("""
<soapenv:Envelope
xmlns:tns="http://tests.python-zeep.org/"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/">
<soapenv:Header></soapenv:Header>
<soapenv:Body>
<tns:Function>
<tns:Argument>OK</tns:Argument>
</tns:Function>
</soapenv:Body>
</soapenv:Envelope>
""")
signature.sign_envelope(envelope, KEY_FILE_PW, KEY_FILE_PW, 'geheim')
signature.verify_envelope(envelope, KEY_FILE_PW)
@pytest.mark.skipif(sys.platform == 'win32',
reason="does not run on windows")
def test_verify_error():
envelope = load_xml("""
<soapenv:Envelope
xmlns:tns="http://tests.python-zeep.org/"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/">
<soapenv:Header></soapenv:Header>
<soapenv:Body>
<tns:Function>
<tns:Argument>OK</tns:Argument>
</tns:Function>
</soapenv:Body>
</soapenv:Envelope>
""")
signature.sign_envelope(envelope, KEY_FILE, KEY_FILE)
nsmap = {'tns': 'http://tests.python-zeep.org/'}
for elm in envelope.xpath('//tns:Argument', namespaces=nsmap):
elm.text = 'NOT!'
with pytest.raises(SignatureVerificationFailed):
signature.verify_envelope(envelope, KEY_FILE)
@pytest.mark.skipif(sys.platform == 'win32',
reason="does not run on windows")
def test_signature():
envelope = load_xml("""
<soapenv:Envelope
xmlns:tns="http://tests.python-zeep.org/"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/">
<soapenv:Header></soapenv:Header>
<soapenv:Body>
<tns:Function>
<tns:Argument>OK</tns:Argument>
</tns:Function>
</soapenv:Body>
</soapenv:Envelope>
""")
plugin = wsse.Signature(KEY_FILE_PW, KEY_FILE_PW, 'geheim')
envelope, headers = plugin.apply(envelope, {})
plugin.verify(envelope)

View File

@ -7,7 +7,7 @@ from freezegun import freeze_time
from tests.utils import assert_nodes_equal, load_xml
from zeep import client
from zeep.wsse.username import UsernameToken
from zeep.wsse import UsernameToken
@pytest.mark.requests
@ -55,7 +55,7 @@ def test_password_text():
""")
token = UsernameToken('michael', 'geheim')
envelope, headers = token.sign(envelope, {})
envelope, headers = token.apply(envelope, {})
expected = """
<soap-env:Envelope
xmlns:ns0="http://example.com/stockquote.xsd"
@ -64,12 +64,12 @@ def test_password_text():
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<soap-env:Header>
<ns0:Security xmlns:ns0="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd">
<ns0:UsernameToken>
<ns0:Username>michael</ns0:Username>
<ns0:Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordText">geheim</ns0:Password>
</ns0:UsernameToken>
</ns0:Security>
<wsse:Security xmlns:wsse="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd">
<wsse:UsernameToken>
<wsse:Username>michael</wsse:Username>
<wsse:Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordText">geheim</wsse:Password>
</wsse:UsernameToken>
</wsse:Security>
</soap-env:Header>
<soap-env:Body>
<ns0:TradePriceRequest>
@ -104,7 +104,7 @@ def test_password_digest(monkeypatch):
""")
token = UsernameToken('michael', 'geheim', use_digest=True)
envelope, headers = token.sign(envelope, {})
envelope, headers = token.apply(envelope, {})
expected = """
<soap-env:Envelope
xmlns:ns0="http://example.com/stockquote.xsd"
@ -113,14 +113,14 @@ def test_password_digest(monkeypatch):
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<soap-env:Header>
<ns0:Security xmlns:ns0="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd">
<ns0:UsernameToken>
<ns0:Username>michael</ns0:Username>
<ns0:Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest">hVicspAQSg70JNhe67OHqD9gexc=</ns0:Password>
<ns0:Nonce EncodingType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary">bW9ja2VkLXJhbmRvbQ==</ns0:Nonce>
<ns0:Created xmlns:ns0="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">2016-05-08T12:00:00+00:00</ns0:Created>
</ns0:UsernameToken>
</ns0:Security>
<wsse:Security xmlns:wsse="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd">
<wsse:UsernameToken>
<wsse:Username>michael</wsse:Username>
<wsse:Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest">hVicspAQSg70JNhe67OHqD9gexc=</wsse:Password>
<wsse:Nonce EncodingType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary">bW9ja2VkLXJhbmRvbQ==</wsse:Nonce>
<wsu:Created xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">2016-05-08T12:00:00+00:00</wsu:Created>
</wsse:UsernameToken>
</wsse:Security>
</soap-env:Header>
<soap-env:Body>
<ns0:TradePriceRequest>
@ -158,7 +158,7 @@ def test_password_digest_custom(monkeypatch):
token = UsernameToken(
'michael', password_digest='12345', use_digest=True,
nonce='iets', created=created)
envelope, headers = token.sign(envelope, {})
envelope, headers = token.apply(envelope, {})
expected = """
<soap-env:Envelope
xmlns:ns0="http://example.com/stockquote.xsd"
@ -167,14 +167,14 @@ def test_password_digest_custom(monkeypatch):
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<soap-env:Header>
<ns0:Security xmlns:ns0="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd">
<ns0:UsernameToken>
<ns0:Username>michael</ns0:Username>
<ns0:Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest">12345</ns0:Password>
<ns0:Nonce EncodingType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary">aWV0cw==</ns0:Nonce>
<ns0:Created xmlns:ns0="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">2016-06-04T20:10:00+00:00</ns0:Created>
</ns0:UsernameToken>
</ns0:Security>
<wsse:Security xmlns:wsse="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd">
<wsse:UsernameToken>
<wsse:Username>michael</wsse:Username>
<wsse:Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest">12345</wsse:Password>
<wsse:Nonce EncodingType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary">aWV0cw==</wsse:Nonce>
<wsu:Created xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">2016-06-04T20:10:00+00:00</wsu:Created>
</wsse:UsernameToken>
</wsse:Security>
</soap-env:Header>
<soap-env:Body>
<ns0:TradePriceRequest>
@ -211,7 +211,7 @@ def test_password_prepared():
""") # noqa
token = UsernameToken('michael', 'geheim')
envelope, headers = token.sign(envelope, {})
envelope, headers = token.apply(envelope, {})
expected = """
<soap-env:Envelope
xmlns:ns0="http://example.com/stockquote.xsd"

View File

@ -304,7 +304,8 @@ def test_choice_init():
{'item_2': 'value-2'},
{'item_1': 'value-3'},
{'item_4_1': 'value-4-1', 'item_4_2': 'value-4-2'},
])
],
post='bar')
assert obj._value_1 == [
{'item_1': 'value-1'},
@ -326,7 +327,7 @@ def test_choice_init():
<ns0:item_1>value-3</ns0:item_1>
<ns0:item_4_1>value-4-1</ns0:item_4_1>
<ns0:item_4_2>value-4-2</ns0:item_4_2>
<ns0:post/>
<ns0:post>bar</ns0:post>
</ns0:kies>
</document>
""".strip()

View File

@ -135,7 +135,8 @@ def test_any_with_ref():
container_elm = schema.get_element('{http://tests.python-zeep.org/}container')
obj = container_elm(
item='bar',
_value_1=xsd.AnyObject(item_elm, item_elm('argh')))
_value_1=xsd.AnyObject(item_elm, item_elm('argh')),
_value_2=xsd.AnyObject(item_elm, item_elm('ok')))
node = etree.Element('document')
container_elm.render(node, obj)
@ -144,6 +145,7 @@ def test_any_with_ref():
<ns0:container xmlns:ns0="http://tests.python-zeep.org/">
<ns0:item>bar</ns0:item>
<ns0:item>argh</ns0:item>
<ns0:item>ok</ns0:item>
</ns0:container>
</document>
"""
@ -218,6 +220,54 @@ def test_element_any_type():
assert item.something == 'bar'
def test_element_any_type_elements():
node = etree.fromstring("""
<?xml version="1.0"?>
<schema xmlns="http://www.w3.org/2001/XMLSchema"
xmlns:tns="http://tests.python-zeep.org/"
targetNamespace="http://tests.python-zeep.org/"
elementFormDefault="qualified">
<element name="container">
<complexType>
<sequence>
<element name="something" type="anyType"/>
</sequence>
</complexType>
</element>
</schema>
""".strip())
schema = xsd.Schema(node)
Child = xsd.ComplexType(
xsd.Sequence([
xsd.Element('{http://tests.python-zeep.org/}item_1', xsd.String()),
xsd.Element('{http://tests.python-zeep.org/}item_2', xsd.String()),
])
)
child = Child(item_1='item-1', item_2='item-2')
container_elm = schema.get_element('{http://tests.python-zeep.org/}container')
obj = container_elm(something=child)
node = etree.Element('document')
container_elm.render(node, obj)
expected = """
<document>
<ns0:container xmlns:ns0="http://tests.python-zeep.org/">
<ns0:something>
<ns0:item_1>item-1</ns0:item_1>
<ns0:item_2>item-2</ns0:item_2>
</ns0:something>
</ns0:container>
</document>
"""
assert_nodes_equal(expected, node)
item = container_elm.parse(node.getchildren()[0], schema)
assert len(item.something) == 2
assert item.something[0].text == 'item-1'
assert item.something[1].text == 'item-2'
def test_any_in_nested_sequence():
schema = xsd.Schema(load_xml("""
<?xml version="1.0"?>

View File

@ -418,3 +418,48 @@ def test_attribute_union_type_inline():
attr = schema.get_attribute('{http://tests.python-zeep.org/}something')
assert attr('foo') == 'foo'
def test_attribute_value_retrieval():
schema = xsd.Schema(load_xml("""
<?xml version="1.0"?>
<schema xmlns="http://www.w3.org/2001/XMLSchema"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:tns="http://tests.python-zeep.org/"
elementFormDefault="qualified"
targetNamespace="http://tests.python-zeep.org/">
<complexType name="Address">
<sequence>
<element name="Street" type="tns:Street"/>
</sequence>
</complexType>
<complexType name="Street">
<sequence>
<element name="Name" type="string"/>
<element name="Something" type="string" minOccurs="0"/>
</sequence>
<attribute name="ID" type="int" use="required"/>
<attribute name="Postcode" type="string"/>
</complexType>
</schema>
"""))
Addr = schema.get_type('{http://tests.python-zeep.org/}Address')
address = Addr()
address.Street = {
'ID': 100,
'Name': 'Foo',
}
expected = """
<document>
<ns0:Street xmlns:ns0="http://tests.python-zeep.org/" ID="100">
<ns0:Name>Foo</ns0:Name>
</ns0:Street>
</document>
"""
node = etree.Element('document')
Addr.render(node, address)
assert_nodes_equal(expected, node)

View File

@ -6,7 +6,7 @@ import pytest
import pytz
import six
from zeep.xsd import builtins
from zeep.xsd.types import builtins
class TestString:

View File

@ -321,12 +321,12 @@ def test_choice_in_sequence():
assert container_elm.type.signature() == (
'something: xsd:string, ({item_1: xsd:string} | {item_2: xsd:string} | {item_3: xsd:string})') # noqa
value = container_elm(item_1='item-1')
value = container_elm(something='foobar', item_1='item-1')
expected = """
<document>
<ns0:container xmlns:ns0="http://tests.python-zeep.org/">
<ns0:something/>
<ns0:something>foobar</ns0:something>
<ns0:item_1>item-1</ns0:item_1>
</ns0:container>
</document>
@ -977,3 +977,69 @@ def test_parse_check_mixed_choices():
</document>
"""
assert_nodes_equal(expected, node)
def test_choice_extend():
schema = xsd.Schema(load_xml("""
<?xml version="1.0"?>
<schema
xmlns="http://www.w3.org/2001/XMLSchema"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:tns="http://tests.python-zeep.org/"
elementFormDefault="qualified"
targetNamespace="http://tests.python-zeep.org/">
<xsd:complexType name="BaseType">
<xsd:sequence>
<xsd:element name="optional" minOccurs="0"/>
</xsd:sequence>
<xsd:attribute name="Id"/>
</xsd:complexType>
<xsd:complexType name="ChildType">
<xsd:complexContent>
<xsd:extension base="tns:BaseType">
<xsd:sequence>
<xsd:element name="item-1-1" type="xsd:string"/>
<xsd:element name="item-1-2" type="xsd:string"/>
</xsd:sequence>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
<xsd:element name="container">
<xsd:complexType>
<xsd:complexContent>
<xsd:extension base="tns:ChildType">
<xsd:choice minOccurs="0" maxOccurs="6">
<xsd:element name="item-2-1" type="xsd:string"/>
<xsd:element name="item-2-2" type="xsd:string"/>
</xsd:choice>
<xsd:attribute name="version" use="required" fixed="10.0.1.2"/>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
</xsd:element>
</schema>
"""))
element = schema.get_element('ns0:container')
node = load_xml("""
<ns0:container xmlns:ns0="http://tests.python-zeep.org/">
<ns0:item-1-1>foo</ns0:item-1-1>
<ns0:item-1-2>bar</ns0:item-1-2>
</ns0:container>
""")
value = element.parse(node, schema)
node = load_xml("""
<ns0:container xmlns:ns0="http://tests.python-zeep.org/">
<ns0:item-1-1>foo</ns0:item-1-1>
<ns0:item-1-2>bar</ns0:item-1-2>
<ns0:item-2-1>xafoo</ns0:item-2-1>
<ns0:item-2-2>xabar</ns0:item-2-2>
</ns0:container>
""")
value = element.parse(node, schema)
assert value['item-1-1'] == 'foo'
assert value['item-1-2'] == 'bar'
assert value['_value_1'][0] == {'item-2-1': 'xafoo'}
assert value['_value_1'][1] == {'item-2-2': 'xabar'}

View File

@ -1,6 +1,6 @@
import pytest
from lxml import etree
from tests.utils import assert_nodes_equal, load_xml, render_node
from zeep import xsd

View File

@ -3,7 +3,8 @@ import io
from lxml import etree
from tests.utils import DummyTransport, assert_nodes_equal, load_xml, render_node
from tests.utils import (
DummyTransport, assert_nodes_equal, load_xml, render_node)
from zeep import xsd
@ -106,7 +107,7 @@ def test_complex_content_with_recursive_elements():
<xsd:complexContent>
<xsd:extension base="tns:Name">
<xsd:sequence>
<xsd:element name="children" type="tns:Pet"/>
<xsd:element name="children" type="tns:Pet" minOccurs="0" maxOccurs="unbounded"/>
</xsd:sequence>
</xsd:extension>
</xsd:complexContent>
@ -125,7 +126,10 @@ def test_complex_content_with_recursive_elements():
assert(pet_type.signature() == 'name: xsd:string, common_name: xsd:string, children: Pet')
obj = pet_type(
name='foo', common_name='bar')
name='foo', common_name='bar',
children=[
pet_type(name='child-1', common_name='child-cname-1')
])
node = etree.Element('document')
pet_type.render(node, obj)
@ -134,7 +138,10 @@ def test_complex_content_with_recursive_elements():
<ns0:Pet xmlns:ns0="http://tests.python-zeep.org/">
<ns0:name>foo</ns0:name>
<ns0:common_name>bar</ns0:common_name>
<ns0:children/>
<ns0:children>
<ns0:name>child-1</ns0:name>
<ns0:common_name>child-cname-1</ns0:common_name>
</ns0:children>
</ns0:Pet>
</document>
"""
@ -543,14 +550,14 @@ def test_complex_content_extension_with_sequence():
address_type = schema.get_element('{http://tests.python-zeep.org/}SpecialPackage')
obj = address_type(
id='testString', pkg_id='nameId')
id='testString', pkg_id='nameId', otherElement='foobar')
node = etree.Element('document')
address_type.render(node, obj)
expected = """
<document>
<ns0:SpecialPackage xmlns:ns0="http://tests.python-zeep.org/" pkg_id="nameId" id="testString">
<ns0:otherElement/>
<ns0:otherElement>foobar</ns0:otherElement>
</ns0:SpecialPackage>
</document>
"""

View File

@ -774,7 +774,7 @@ def test_complex_type_empty():
schema = xsd.Schema(node)
container_elm = schema.get_element('{http://tests.python-zeep.org/}container')
obj = container_elm()
obj = container_elm(something={})
node = etree.Element('document')
container_elm.render(node, obj)

View File

@ -1,4 +1,5 @@
import datetime
from lxml import etree
from tests.utils import load_xml
@ -238,7 +239,7 @@ def test_sequence_parse_anytype_obj():
'{http://www.w3.org/2001/XMLSchema}Schema',
targetNamespace='http://tests.python-zeep.org/'))
root = list(schema._schemas.values())[0]
root = next(schema.documents)
root.register_type('{http://tests.python-zeep.org/}something', value_type)
custom_type = xsd.Element(

View File

@ -3,8 +3,7 @@ from lxml import etree
from tests.utils import DummyTransport, load_xml
from zeep import exceptions, xsd
from zeep.xsd.builtins import Schema as Schema
from zeep.exceptions import ZeepWarning
from zeep.xsd import Schema
def test_default_types():
@ -125,7 +124,9 @@ def test_schema_doc_repr_val():
elementFormDefault="qualified">
</xs:schema>
"""))
doc = schema._get_schema_document('http://tests.python-zeep.org/')
docs = schema._get_schema_documents('http://tests.python-zeep.org/')
assert len(docs) == 1
doc = docs[0]
assert repr(doc) == "<SchemaDocument(location=None, tns='http://tests.python-zeep.org/', is_empty=True)>"
@ -393,8 +394,16 @@ def test_duplicate_target_namespace():
<?xml version="1.0"?>
<xsd:schema
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:tns="http://tests.python-zeep.org/duplicate"
targetNamespace="http://tests.python-zeep.org/duplicate"
elementFormDefault="qualified">
<xsd:element name="elm-in-b" type="tns:item-c"/>
<xsd:complexType name="item-c">
<xsd:sequence>
<xsd:element name="item-a" type="xsd:string"/>
<xsd:element name="item-b" type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>
</xsd:schema>
""".strip())
@ -402,8 +411,17 @@ def test_duplicate_target_namespace():
<?xml version="1.0"?>
<xsd:schema
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:tns="http://tests.python-zeep.org/duplicate"
targetNamespace="http://tests.python-zeep.org/duplicate"
elementFormDefault="qualified">
<xsd:element name="elm-in-c" type="tns:item-c"/>
<xsd:complexType name="item-c">
<xsd:sequence>
<xsd:element name="item-a" type="xsd:string"/>
<xsd:element name="item-b" type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>
</xsd:schema>
""".strip())
@ -411,8 +429,12 @@ def test_duplicate_target_namespace():
transport.bind('http://tests.python-zeep.org/a.xsd', schema_a)
transport.bind('http://tests.python-zeep.org/b.xsd', schema_b)
transport.bind('http://tests.python-zeep.org/c.xsd', schema_c)
with pytest.warns(ZeepWarning):
xsd.Schema(schema_a, transport=transport)
schema = xsd.Schema(schema_a, transport=transport)
elm_b = schema.get_element('{http://tests.python-zeep.org/duplicate}elm-in-b')
elm_c = schema.get_element('{http://tests.python-zeep.org/duplicate}elm-in-c')
assert not isinstance(elm_b.type, xsd.UnresolvedType)
assert not isinstance(elm_c.type, xsd.UnresolvedType)
def test_multiple_no_namespace():
@ -440,8 +462,7 @@ def test_multiple_no_namespace():
transport = DummyTransport()
transport.bind('http://tests.python-zeep.org/b.xsd', node_b)
transport.bind('http://tests.python-zeep.org/c.xsd', node_b)
with pytest.warns(ZeepWarning):
xsd.Schema(node_a, transport=transport)
xsd.Schema(node_a, transport=transport)
def test_multiple_only_target_ns():
@ -470,8 +491,7 @@ def test_multiple_only_target_ns():
transport = DummyTransport()
transport.bind('http://tests.python-zeep.org/b.xsd', node_b)
transport.bind('http://tests.python-zeep.org/c.xsd', node_b)
with pytest.warns(ZeepWarning):
xsd.Schema(node_a, transport=transport)
xsd.Schema(node_a, transport=transport)
def test_schema_error_handling():

View File

@ -27,51 +27,51 @@ def test_base_type():
def test_simpletype_eq():
type_1 = types.SimpleType()
type_2 = types.SimpleType()
type_1 = types.AnySimpleType()
type_2 = types.AnySimpleType()
assert type_1 == type_2
def test_simpletype_parse():
node = etree.Element('foobar')
item = types.SimpleType()
item = types.AnySimpleType()
assert item.parse_xmlelement(node) is None
def test_simpletype_xmlvalue():
item = types.SimpleType()
item = types.AnySimpleType()
with pytest.raises(NotImplementedError):
item.xmlvalue(None)
def test_simpletype_pythonvalue():
item = types.SimpleType()
item = types.AnySimpleType()
with pytest.raises(NotImplementedError):
item.pythonvalue(None)
def test_simpletype_call_wrong_arg_count():
item = types.SimpleType()
item = types.AnySimpleType()
with pytest.raises(TypeError):
item('foo', 'bar')
def test_simpletype_call_wrong_kwarg():
item = types.SimpleType()
item = types.AnySimpleType()
with pytest.raises(TypeError):
item(uhhh='x')
def test_simpletype_str():
item = types.SimpleType()
item = types.AnySimpleType()
item.name = u'foobar'
assert six.text_type(item) == 'SimpleType(value)'
assert six.text_type(item) == 'AnySimpleType(value)'
def test_complextype_parse_xmlelement_no_childs():

View File

@ -1,9 +1,4 @@
import datetime
import io
from lxml import etree
from tests.utils import DummyTransport, assert_nodes_equal, load_xml, render_node
from tests.utils import assert_nodes_equal, load_xml, render_node
from zeep import xsd

View File

@ -0,0 +1,85 @@
import pytest
from tests.utils import assert_nodes_equal, load_xml, render_node
from zeep import exceptions, xsd
def test_validate_element_value():
schema = xsd.Schema(load_xml("""
<?xml version="1.0"?>
<schema xmlns="http://www.w3.org/2001/XMLSchema"
xmlns:tns="http://tests.python-zeep.org/"
targetNamespace="http://tests.python-zeep.org/"
elementFormDefault="qualified">
<element name="container">
<complexType>
<sequence>
<element minOccurs="1" maxOccurs="1" name="item" type="string" />
</sequence>
</complexType>
</element>
</schema>
"""))
schema.set_ns_prefix('tns', 'http://tests.python-zeep.org/')
container_elm = schema.get_element('tns:container')
obj = container_elm()
expected = """
<document>
<ns0:container xmlns:ns0="http://tests.python-zeep.org/">
<ns0:item>bar</ns0:item>
</ns0:container>
</document>
"""
with pytest.raises(exceptions.ValidationError) as exc:
result = render_node(container_elm, obj)
assert 'Missing element item (container.item)' in str(exc)
obj.item = 'bar'
result = render_node(container_elm, obj)
assert_nodes_equal(result, expected)
obj = container_elm.parse(result[0], schema)
assert obj.item == 'bar'
def test_validate_required_attribute():
schema = xsd.Schema(load_xml("""
<?xml version="1.0"?>
<schema xmlns="http://www.w3.org/2001/XMLSchema"
xmlns:tns="http://tests.python-zeep.org/"
targetNamespace="http://tests.python-zeep.org/"
elementFormDefault="qualified">
<element name="container">
<complexType>
<attribute name="item" type="string" use="required"/>
</complexType>
</element>
</schema>
"""))
schema.set_ns_prefix('tns', 'http://tests.python-zeep.org/')
container_elm = schema.get_element('tns:container')
obj = container_elm()
expected = """
<document>
<ns0:container xmlns:ns0="http://tests.python-zeep.org/" item="bar"/>
</document>
"""
with pytest.raises(exceptions.ValidationError) as exc:
result = render_node(container_elm, obj)
assert 'The attribute item is not valid: Value is required (container.item)' in str(exc)
obj.item = 'bar'
result = render_node(container_elm, obj)
assert_nodes_equal(result, expected)
obj = container_elm.parse(result[0], schema)
assert obj.item == 'bar'

View File

@ -1,19 +1,16 @@
import pytest
from lxml import etree
from tests.utils import assert_nodes_equal, load_xml, render_node
from zeep import xsd
from zeep.xsd import builtins
from zeep.xsd.context import ParserContext
from zeep import exceptions, xsd
from zeep.xsd.schema import Schema
def parse_schema_node(node):
parser_context = ParserContext()
schema = Schema(
node=node,
transport=None,
location=None,
parser_context=parser_context)
location=None)
return schema
@ -26,7 +23,7 @@ def test_schema_empty():
</schema>
""")
schema = parse_schema_node(node)
root = list(schema._schemas.values())[0]
root = next(schema.documents)
assert root._element_form == 'qualified'
assert root._attribute_form == 'unqualified'
@ -70,7 +67,7 @@ def test_element_default_type():
""")
schema = parse_schema_node(node)
element = schema.get_element('{http://tests.python-zeep.org/}foo')
assert isinstance(element.type, builtins.AnyType)
assert isinstance(element.type, xsd.AnyType)
def test_element_simple_type_unresolved():
@ -185,10 +182,15 @@ def test_attribute_required():
xsd_element = schema.get_element('{http://tests.python-zeep.org/}foo')
value = xsd_element()
with pytest.raises(exceptions.ValidationError):
node = render_node(xsd_element, value)
value.base = 'foo'
node = render_node(xsd_element, value)
expected = """
<document>
<ns0:foo xmlns:ns0="http://tests.python-zeep.org/" base=""/>
<ns0:foo xmlns:ns0="http://tests.python-zeep.org/" base="foo"/>
</document>
"""
assert_nodes_equal(expected, node)

View File

@ -4,7 +4,8 @@ from six import binary_type, string_types
def load_xml(xml):
parser = etree.XMLParser(remove_blank_text=True, remove_comments=True)
parser = etree.XMLParser(
remove_blank_text=True, remove_comments=True, resolve_entities=False)
return etree.fromstring(xml.strip(), parser=parser)

View File

@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions
name="ClaimService"
targetNamespace="http://risky-stuff.com/soap/GetClaimDetails/"
xmlns="http://schemas.xmlsoap.org/wsdl/"
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
xmlns:tns="http://risky-stuff.com/soap/GetClaimDetails/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<types>
<schema elementFormDefault="qualified"
targetNamespace="http://risky-stuff.com/soap/GetClaimDetails/"
xmlns="http://www.w3.org/2001/XMLSchema"
xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/">
<simpleType name="ClaimID">
<restriction base="string"/>
</simpleType>
</schema>
</types>
<message name="GetClaimDetailsInput">
<part name="ClaimID" type="tns:ClaimID"/>
</message>
<message name="GetClaimDetailsOutput"/>
<portType name="GetClaimDetailsPortType">
<operation name="GetClaimDetails">
<input message="tns:GetClaimDetailsInput"/>
<output message="tns:GetClaimDetailsOutput"/>
</operation>
</portType>
<binding name="GetClaimDetailsBinding" type="tns:GetClaimDetailsPortType">
<soap:binding style="rpc" transport="http://schemas.xmlsoap.org/soap/http"/>
<operation name="GetClaimDetails">
<soap:operation soapAction="http://schemas.risk-stuff.com/soap/action/#GetClaimDetails"/>
<input>
<soap:body encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" namespace="http://schemas.risk-stuff.com/soap/" use="encoded"/>
</input>
<output>
<soap:body encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" namespace="http://schemas.risk-stuff.com/soap/" use="encoded"/>
</output>
</operation>
</binding>
<service name="GetClaimDetailsService">
<port binding="tns:GetClaimDetailsBinding" name="GetClaimDetailsPort">
<soap:address location="https://www.risky-stuff.com/claim.svc"/>
</port>
</service>
</definitions>

View File

@ -36,7 +36,7 @@
<all>
<element name="tickerSymbol" type="string"/>
<element name="account" type="tns:account" minOccurs="0" />
<element ref="tns:country"/>
<element ref="tns:country" minOccurs="0"/>
</all>
</complexType>
</element>

View File

@ -31,10 +31,18 @@
</sequence>
</complexType>
</element>
<element name="Token">
<complexType>
<sequence>
<element name="value" type="string"/>
</sequence>
</complexType>
</element>
</schema>
</types>
<message name="GetLastTradePriceInput">
<part name="header" element="xsd1:Authentication"/>
<part name="token" element="xsd1:Token"/>
<part name="body" element="xsd1:TradePriceRequest"/>
</message>
<message name="GetLastTradePriceOutput">
@ -52,6 +60,7 @@
<soap:operation soapAction="http://example.com/GetLastTradePrice"/>
<input>
<soap:header message="tns:GetLastTradePriceInput" part="header" use="literal"/>
<soap:header message="tns:GetLastTradePriceInput" part="token" use="literal"/>
<soap:body use="literal"/>
</input>
<output>