Merge branch 'develop' for updating to release v1.0.16

This commit is contained in:
Davide Brunato 2019-11-18 09:48:39 +01:00
commit 1a1ec82586
44 changed files with 1606 additions and 694 deletions

1
.gitignore vendored
View File

@ -6,6 +6,7 @@
*.json
.idea/
.tox/
.eggs/
.coverage*
!.coveragerc
.ipynb_checkpoints/

View File

@ -2,6 +2,12 @@
CHANGELOG
*********
`v1.0.16`_ (2019-11-18)
=======================
* Improved XMLResource class for working with compressed files
* Fix for validation with XSD wildcards and 'lax' process content
* Fix ambiguous items validation for xs:choice and xs:sequence models
`v1.0.15`_ (2019-10-13)
=======================
* Improved XPath 2.0 bindings
@ -265,3 +271,4 @@ v0.9.6 (2017-05-05)
.. _v1.0.13: https://github.com/brunato/xmlschema/compare/v1.0.11...v1.0.13
.. _v1.0.14: https://github.com/brunato/xmlschema/compare/v1.0.13...v1.0.14
.. _v1.0.15: https://github.com/brunato/xmlschema/compare/v1.0.14...v1.0.15
.. _v1.0.16: https://github.com/brunato/xmlschema/compare/v1.0.15...v1.0.16

View File

@ -62,7 +62,7 @@ author = 'Davide Brunato'
# The short X.Y version.
version = '1.0'
# The full version, including alpha/beta/rc tags.
release = '1.0.15'
release = '1.0.16'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.

View File

@ -40,7 +40,7 @@ Otherwise the argument can be also an opened file-like object:
.. doctest::
>>> import xmlschema
>>> schema_file = open('xmlschema/tests/test_cases/examples/vehicles/vehicles.xsd')
>>> schema_file = open('xmlschema/tests/test_cases/examples/collection/collection.xsd')
>>> schema = xmlschema.XMLSchema(schema_file)
Alternatively you can pass a string containing the schema definition:
@ -54,8 +54,8 @@ Alternatively you can pass a string containing the schema definition:
... </xs:schema>
... """)
this option might not works when the schema includes other local subschemas, because the package
cannot knows anything about the schema's source location:
Strings and file-like objects might not work when the schema includes other local subschemas,
because the package cannot knows anything about the schema's source location:
.. doctest::
@ -73,6 +73,15 @@ cannot knows anything about the schema's source location:
Path: /xs:schema/xs:element/xs:complexType/xs:sequence/xs:element
In these cases you can provide an appropriate *base_url* optional argument to define the
reference directory path for other includes and imports:
.. doctest::
>>> import xmlschema
>>> schema_file = open('xmlschema/tests/test_cases/examples/vehicles/vehicles.xsd')
>>> schema = xmlschema.XMLSchema(schema_file, base_url='xmlschema/tests/test_cases/examples/vehicles/')
XSD declarations
----------------
@ -517,35 +526,6 @@ For example you can build a schema using a *strict* mode and then decode XML dat
using the *validation* argument setted to 'lax'.
XML entity-based attacks protection
-----------------------------------
The XML data resource loading is protected using the `SafeXMLParser` class, a subclass of
the pure Python version of XMLParser that forbids the use of entities.
The protection is applied both to XSD schemas and to XML data. The usage of this feature is
regulated by the XMLSchema's argument *defuse*.
For default this argument has value *'remote'* that means the protection on XML data is
applied only to data loaded from remote. Other values for this argument can be *'always'*
and *'never'*.
Limit on model groups checking
------------------------------
From release v1.0.11 the model groups of the schemas are checked against restriction violations
and *Unique Particle Attribution* violations.
To avoids XSD model recursion attacks a limit of ``MAX_MODEL_DEPTH = 15`` is set. If this limit
is exceeded an ``XMLSchemaModelDepthError`` is raised, the error is caught and a warning is generated.
If you need to set an higher limit for checking all your groups you can import the library and change
the value in the specific module that processes the model checks:
.. doctest::
>>> import xmlschema
>>> xmlschema.validators.models.MAX_MODEL_DEPTH = 20
Lazy validation
---------------
@ -561,3 +541,53 @@ From release v1.0.14 XSD 1.1 support has been added to the library through the c
:class:`XMLSchema11`. You have to use this class for XSD 1.1 schemas instead the default
class :class:`XMLSchema` that is still linked to XSD 1.0 validator :class:`XMLSchema10`.
From next minor release (v1.1) the default class will become :class:`XMLSchema11`.
XML entity-based attacks protection
...................................
The XML data resource loading is protected using the `SafeXMLParser` class, a subclass of
the pure Python version of XMLParser that forbids the use of entities.
The protection is applied both to XSD schemas and to XML data. The usage of this feature is
regulated by the XMLSchema's argument *defuse*.
For default this argument has value *'remote'* that means the protection on XML data is
applied only to data loaded from remote. Other values for this argument can be *'always'*
and *'never'*.
Processing limits
-----------------
From release v1.0.16 a module has been added in order to group constants that define
processing limits, generally to protect against attacks prepared to exhaust system
resources. These limits usually don't need to be changed, but this possibility has
been left at the module level for situations where a different setting is needed.
Limit on XSD model groups checking
..................................
Model groups of the schemas are checked against restriction violations and *Unique Particle
Attribution* violations. To avoids XSD model recursion attacks a depth limit of 15 levels
is set. If this limit is exceeded an ``XMLSchemaModelDepthError`` is raised, the error is
caught and a warning is generated. If you need to set an higher limit for checking all your
groups you can import the library and change the value of ``MAX_MODEL_DEPTH`` in the limits
module:
.. doctest::
>>> import xmlschema
>>> xmlschema.limits.MAX_MODEL_DEPTH = 20
Limit on XML data depth
.......................
A limit of 9999 on maximum depth is set for XML validation/decoding/encoding to avoid
attacks based on extremely deep XML data. To increase or decrease this limit change the
value of ``MAX_XML_DEPTH`` in the module *limits* after the import of the package:
.. doctest::
>>> import xmlschema
>>> xmlschema.limits.MAX_XML_DEPTH = 1000

View File

@ -1,4 +1,4 @@
# This repository adheres to the publiccode.yml standard by including this
# This repository adheres to the publiccode.yml standard by including this
# metadata file that makes public software easily discoverable.
# More info at https://github.com/italia/publiccode.yml
@ -6,8 +6,8 @@ publiccodeYmlVersion: '0.2'
name: xmlschema
url: 'https://github.com/sissaschool/xmlschema'
landingURL: 'https://github.com/sissaschool/xmlschema'
releaseDate: '2019-10-13'
softwareVersion: v1.0.15
releaseDate: '2019-11-18'
softwareVersion: v1.0.16
developmentStatus: stable
platforms:
- linux

View File

@ -5,6 +5,7 @@ coverage
elementpath~=1.3.0
lxml
memory_profiler
matplotlib
pathlib2 # For Py27 tests on resources
Sphinx
sphinx_rtd_theme

View File

@ -38,7 +38,8 @@ class InstallCommand(install):
setup(
name='xmlschema',
version='1.0.15',
version='1.0.16',
setup_requires=['elementpath~=1.3.0'],
install_requires=['elementpath~=1.3.0'],
packages=['xmlschema'],
include_package_data=True,
@ -64,6 +65,7 @@ setup(
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: Implementation :: CPython',
'Topic :: Software Development :: Libraries'
]

View File

@ -8,6 +8,7 @@
#
# @author Davide Brunato <brunato@sissa.it>
#
from . import limits
from .exceptions import XMLSchemaException, XMLSchemaRegexError, XMLSchemaURLError, \
XMLSchemaNamespaceError
from .etree import etree_tostring
@ -30,7 +31,7 @@ from .validators import (
XsdGlobals, XMLSchemaBase, XMLSchema, XMLSchema10, XMLSchema11
)
__version__ = '1.0.15'
__version__ = '1.0.16'
__author__ = "Davide Brunato"
__contact__ = "brunato@sissa.it"
__copyright__ = "Copyright 2016-2019, SISSA"

View File

@ -171,7 +171,7 @@ def from_json(source, schema, path=None, converter=None, json_options=None, **kw
:param source: can be a string or a :meth:`read()` supporting file-like object \
containing the JSON document.
:param schema: an :class:`XMLSchema` instance.
:param schema: an :class:`XMLSchema` or an :class:`XMLSchema11` instance.
:param path: is an optional XPath expression for selecting the element of the schema \
that matches the data that has to be encoded. For default the first global element of \
the schema is used.

21
xmlschema/limits.py Normal file
View File

@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
#
# Copyright (c), 2016-2019, SISSA (International School for Advanced Studies).
# All rights reserved.
# This file is distributed under the terms of the MIT License.
# See the file 'LICENSE' in the root directory of the present
# distribution, or http://opensource.org/licenses/MIT.
#
# @author Davide Brunato <brunato@sissa.it>
#
"""Package protection limits. Values can be changed after import to set different limits."""
MAX_XML_DEPTH = 9999
"""
Maximum depth of XML data. An `XMLSchemaValidationError` is raised if this limit is exceeded.
"""
MAX_MODEL_DEPTH = 15
"""
Maximum XSD model group depth. An `XMLSchemaModelDepthError` is raised if this limit is exceeded.
"""

View File

@ -12,10 +12,13 @@
This module contains namespace definitions for W3C core standards and namespace related classes.
"""
from __future__ import unicode_literals
import os
import re
from .compat import MutableMapping, Mapping
###
# Namespace URIs
XSD_NAMESPACE = 'http://www.w3.org/2001/XMLSchema'
"URI of the XML Schema Definition namespace (xs|xsd)"
@ -42,10 +45,34 @@ VC_NAMESPACE = 'http://www.w3.org/2007/XMLSchema-versioning'
"URI of the XML Schema Versioning namespace (vc)"
###
# Schema location hints
SCHEMAS_DIR = os.path.join(os.path.dirname(__file__), 'validators/schemas/')
LOCATION_HINTS = {
# Locally saved schemas
HFP_NAMESPACE: os.path.join(SCHEMAS_DIR, 'XMLSchema-hasFacetAndProperty_minimal.xsd'),
VC_NAMESPACE: os.path.join(SCHEMAS_DIR, 'XMLSchema-versioning_minimal.xsd'),
XLINK_NAMESPACE: os.path.join(SCHEMAS_DIR, 'xlink.xsd'),
XHTML_NAMESPACE: os.path.join(SCHEMAS_DIR, 'xhtml1-strict.xsd'),
# Remote locations: contributors can propose additional official locations
# for other namespaces for extending this list.
XSLT_NAMESPACE: os.path.join(SCHEMAS_DIR, 'http://www.w3.org/2007/schema-for-xslt20.xsd'),
}
###
# Helper functions and classes
NAMESPACE_PATTERN = re.compile(r'{([^}]*)}')
def get_namespace(name):
if not name or name[0] != '{':
return ''
try:
return NAMESPACE_PATTERN.match(name).group(1)
except (AttributeError, TypeError):

View File

@ -224,34 +224,38 @@ def local_name(qname):
return qname
def qname_to_prefixed(qname, namespaces):
def qname_to_prefixed(qname, namespaces, use_empty=True):
"""
Transforms a fully qualified name into a prefixed name using a namespace map.
Returns the *qname* argument if it's not a fully qualified name or if it has
boolean value `False`.
Maps a QName in extended format to a QName in prefixed format.
Do not change local names and QNames in prefixed format.
:param qname: an extended QName or a local name.
:param qname: a QName or a local name.
:param namespaces: a map from prefixes to namespace URIs.
:param use_empty: if `True` use the empty prefix for mapping.
:return: a QName in prefixed format or a local name.
"""
if not qname:
if not qname or qname[0] != '{':
return qname
namespace = get_namespace(qname)
for prefix, uri in sorted(filter(lambda x: x[1] == namespace, namespaces.items()), reverse=True):
if not uri:
return '%s:%s' % (prefix, qname) if prefix else qname
elif prefix:
return qname.replace('{%s}' % uri, '%s:' % prefix)
else:
return qname.replace('{%s}' % uri, '')
prefixes = [x for x in namespaces if namespaces[x] == namespace]
if not prefixes:
return qname
elif prefixes[0]:
return '%s:%s' % (prefixes[0], qname.split('}', 1)[1])
elif len(prefixes) > 1:
return '%s:%s' % (prefixes[1], qname.split('}', 1)[1])
elif use_empty:
return qname.split('}', 1)[1]
else:
return qname
def qname_to_extended(qname, namespaces):
"""
Converts a QName in prefixed format or a local name to the extended QName format.
Maps a QName in prefixed format or a local name to the extended QName format.
Local names are mapped if *namespaces* has a not empty default namespace.
:param qname: a QName in prefixed format or a local name.
:param namespaces: a map from prefixes to namespace URIs.

View File

@ -11,7 +11,7 @@
import os.path
import re
import codecs
from elementpath import iter_select, Selector
from elementpath import iter_select, Selector, XPath1Parser
from .compat import (
PY3, StringIO, BytesIO, string_base_type, urlopen, urlsplit, urljoin, urlunsplit,
@ -26,8 +26,23 @@ from .etree import ElementTree, PyElementTree, SafeXMLParser, etree_tostring
DEFUSE_MODES = ('always', 'remote', 'never')
XML_RESOURCE_XPATH_SYMBOLS = {
'position', 'last', 'not', 'and', 'or', '!=', '<=', '>=', '(', ')', 'text',
'[', ']', '.', ',', '/', '|', '*', '=', '<', '>', ':', '(end)', '(name)',
'(string)', '(float)', '(decimal)', '(integer)'
}
class XmlResourceXPathParser(XPath1Parser):
symbol_table = {k: v for k, v in XPath1Parser.symbol_table.items() if k in XML_RESOURCE_XPATH_SYMBOLS}
SYMBOLS = XML_RESOURCE_XPATH_SYMBOLS
XmlResourceXPathParser.build_tokenizer()
def is_remote_url(url):
return url is not None and urlsplit(url).scheme not in ('', 'file')
return isinstance(url, string_base_type) and urlsplit(url).scheme not in ('', 'file')
def url_path_is_directory(url):
@ -51,14 +66,23 @@ def normalize_url(url, base_url=None, keep_relative=False):
conformant to URL format specification.
:return: A normalized URL.
"""
def add_trailing_slash(r):
return urlunsplit((r[0], r[1], r[2] + '/' if r[2] and r[2][-1] != '/' else r[2], r[3], r[4]))
def add_trailing_slash(x):
return urlunsplit((x[0], x[1], x[2] + '/' if x[2] and x[2][-1] != '/' else x[2], x[3], x[4]))
def filter_url(x):
x = x.strip().replace('\\', '/')
while x.startswith('//'):
x = x.replace('//', '/', 1)
while x.startswith('file:////'):
x = x.replace('file:////', 'file:///', 1)
if urlsplit(x).scheme in {'', 'file'}:
x = x.replace('#', '%23')
return x
url = filter_url(url)
if base_url is not None:
base_url = base_url.replace('\\', '/')
while base_url.startswith('//'):
base_url = base_url.replace('//', '/', 1)
base_url = filter_url(base_url)
base_url_parts = urlsplit(base_url)
base_url = add_trailing_slash(base_url_parts)
if base_url_parts.scheme not in uses_relative:
@ -87,15 +111,11 @@ def normalize_url(url, base_url=None, keep_relative=False):
if base_url_parts.netloc and not url.startswith(base_url_parts.netloc) and url.startswith('//'):
url = 'file:' + url
url = url.replace('\\', '/')
while url.startswith('//'):
url = url.replace('//', '/', 1)
url_parts = urlsplit(url, scheme='file')
if url_parts.scheme not in uses_relative:
return 'file:///{}'.format(url_parts.geturl()) # Eg. k:/Python/lib/....
normalized_url = 'file:///{}'.format(url_parts.geturl()) # Eg. k:/Python/lib/....
elif url_parts.scheme != 'file':
return urlunsplit((
normalized_url = urlunsplit((
url_parts.scheme,
url_parts.netloc,
pathname2url(url_parts.path),
@ -103,18 +123,19 @@ def normalize_url(url, base_url=None, keep_relative=False):
url_parts.fragment,
))
elif os.path.isabs(url_parts.path):
return url_parts.geturl()
normalized_url = url_parts.geturl()
elif keep_relative:
# Can't use urlunsplit with a scheme because it converts relative paths to absolute ones.
return 'file:{}'.format(urlunsplit(('',) + url_parts[1:]))
normalized_url = 'file:{}'.format(urlunsplit(('',) + url_parts[1:]))
else:
return urlunsplit((
normalized_url = urlunsplit((
url_parts.scheme,
url_parts.netloc,
os.path.abspath(url_parts.path),
url_parts.query,
url_parts.fragment,
))
return filter_url(normalized_url)
def fetch_resource(location, base_url=None, timeout=30):
@ -169,12 +190,17 @@ def fetch_schema_locations(source, locations=None, **resource_options):
base_url = resource.base_url
namespace = resource.namespace
locations = resource.get_locations(locations)
for ns, url in filter(lambda x: x[0] == namespace, locations):
if not locations:
msg = "the XML data resource {!r} does not contain any schema location hint."
raise XMLSchemaValueError(msg.format(source))
for ns, url in sorted(locations, key=lambda x: x[0] != namespace):
try:
return fetch_resource(url, base_url, timeout), locations
except XMLSchemaURLError:
pass
raise XMLSchemaValueError("not found a schema for XML data resource %r (namespace=%r)." % (source, namespace))
raise XMLSchemaValueError("not found a schema for XML data resource {!r}.".format(source))
def fetch_schema(source, locations=None, **resource_options):
@ -245,7 +271,7 @@ class XMLResource(object):
if base_url is not None and not isinstance(base_url, string_base_type):
raise XMLSchemaValueError(u"'base_url' argument has to be a string: {!r}".format(base_url))
self._root = self._document = self._url = self._text = None
self._root = self._text = self._url = None
self._base_url = base_url
self.defuse = defuse
self.timeout = timeout
@ -274,7 +300,7 @@ class XMLResource(object):
def __setattr__(self, name, value):
if name == 'source':
self._root, self._document, self._text, self._url = self._fromsource(value)
self._root, self._text, self._url = self._fromsource(value)
elif name == 'defuse' and value not in DEFUSE_MODES:
raise XMLSchemaValueError(u"'defuse' attribute: {!r} is not a defuse mode.".format(value))
elif name == 'timeout' and (not isinstance(value, int) or value <= 0):
@ -284,58 +310,56 @@ class XMLResource(object):
super(XMLResource, self).__setattr__(name, value)
def _fromsource(self, source):
url, lazy = None, self._lazy
if hasattr(source, 'tag'):
url = None
if hasattr(source, 'tag') and hasattr(source, 'attrib'):
self._lazy = False
return source, None, None, None # Source is already an Element --> nothing to load
return source, None, None # Source is already an Element --> nothing to load
elif isinstance(source, string_base_type):
_url, self._url = self._url, None
try:
if lazy:
if self._lazy:
# check if source is a string containing a valid XML root
for _, root in self.iterparse(StringIO(source), events=('start',)):
return root, None, source, None
return root, source, None
else:
return self.fromstring(source), None, source, None
return self.fromstring(source), source, None
except (ElementTree.ParseError, PyElementTree.ParseError, UnicodeEncodeError):
if '\n' in source:
raise
finally:
self._url = _url
url = normalize_url(source) if '\n' not in source else None
elif isinstance(source, StringIO):
_url, self._url = self._url, None
try:
if lazy:
if self._lazy:
for _, root in self.iterparse(source, events=('start',)):
return root, None, source.getvalue(), None
return root, source.getvalue(), None
else:
document = self.parse(source)
return document.getroot(), document, source.getvalue(), None
return self.parse(source).getroot(), source.getvalue(), None
finally:
self._url = _url
elif hasattr(source, 'read'):
# source should be a file-like object
try:
if hasattr(source, 'url'):
# Save remote urls for open new resources (non seekable)
if is_remote_url(source.url):
url = source.url
else:
url = normalize_url(source.name)
except AttributeError:
pass
else:
_url, self._url = self._url, url
try:
if lazy:
for _, root in self.iterparse(source, events=('start',)):
return root, None, None, url
else:
document = self.parse(source)
return document.getroot(), document, None, url
finally:
self._url = _url
_url, self._url = self._url, url
try:
if self._lazy:
for _, root in self.iterparse(source, events=('start',)):
return root, None, url
else:
return self.parse(source).getroot(), None, url
finally:
self._url = _url
else:
# Try ElementTree object at last
@ -346,7 +370,7 @@ class XMLResource(object):
else:
if hasattr(root, 'tag'):
self._lazy = False
return root, source, None, None
return root, None, None
if url is None:
raise XMLSchemaTypeError(
@ -357,13 +381,11 @@ class XMLResource(object):
resource = urlopen(url, timeout=self.timeout)
_url, self._url = self._url, url
try:
if lazy:
if self._lazy:
for _, root in self.iterparse(resource, events=('start',)):
return root, None, None, url
return root, None, url
else:
document = self.parse(resource)
root = document.getroot()
return root, document, None, url
return self.parse(resource).getroot(), None, url
finally:
self._url = _url
resource.close()
@ -373,14 +395,6 @@ class XMLResource(object):
"""The XML tree root Element."""
return self._root
@property
def document(self):
"""
The ElementTree document, `None` if the instance is lazy or is not created
from another document or from an URL.
"""
return self._document
@property
def text(self):
"""The XML text source, `None` if it's not available."""
@ -396,9 +410,22 @@ class XMLResource(object):
"""The base URL for completing relative locations."""
return os.path.dirname(self._url) if self._url else self._base_url
@property
def document(self):
"""
The resource as ElementTree XML document. It's `None` if the instance
is lazy or if it's an lxml Element.
"""
if isinstance(self.source, ElementTree.ElementTree):
return self.source
elif hasattr(self.source, 'getroot') and hasattr(self.source, 'parse'):
return self.source # lxml's _ElementTree
elif not self._lazy and not hasattr(self.root, 'nsmap'):
return ElementTree.ElementTree(self.root)
@property
def namespace(self):
"""The namespace of the XML document."""
"""The namespace of the XML resource."""
return get_namespace(self._root.tag) if self._root is not None else None
@staticmethod
@ -418,14 +445,23 @@ class XMLResource(object):
def parse(self, source):
"""
An equivalent of *ElementTree.parse()* that can protect from XML entities attacks. When
protection is applied XML data are loaded and defused before building the ElementTree instance.
An equivalent of *ElementTree.parse()* that can protect from XML entities attacks.
When protection is applied XML data are loaded and defused before building the
ElementTree instance. The protection applied is based on value of *defuse*
attribute and *base_url* property.
:param source: a filename or file object containing XML data.
:returns: an ElementTree instance.
"""
if self.defuse == 'always' or self.defuse == 'remote' and is_remote_url(self._url):
text = source.read()
if self.defuse == 'always' or self.defuse == 'remote' and \
hasattr(source, 'read') and is_remote_url(self.base_url):
if hasattr(source, 'read'):
text = source.read()
else:
with open(source) as f:
text = f.read()
if isinstance(text, bytes):
self.defusing(BytesIO(text))
return ElementTree.parse(BytesIO(text))
@ -439,11 +475,14 @@ class XMLResource(object):
"""
An equivalent of *ElementTree.iterparse()* that can protect from XML entities attacks.
When protection is applied the iterator yields pure-Python Element instances.
The protection applied is based on resource *defuse* attribute and *base_url* property.
:param source: a filename or file object containing XML data.
:param events: a list of events to report back. If omitted, only end events are reported.
"""
if self.defuse == 'always' or self.defuse == 'remote' and is_remote_url(self._url):
if self.defuse == 'always' or self.defuse == 'remote' and \
hasattr(source, 'read') and is_remote_url(self.base_url):
parser = SafeXMLParser(target=PyElementTree.TreeBuilder())
try:
return PyElementTree.iterparse(source, events, parser)
@ -455,17 +494,20 @@ class XMLResource(object):
def fromstring(self, text):
"""
An equivalent of *ElementTree.fromstring()* that can protect from XML entities attacks.
The protection applied is based on resource *defuse* attribute and *base_url* property.
:param text: a string containing XML data.
:returns: the root Element instance.
"""
if self.defuse == 'always' or self.defuse == 'remote' and is_remote_url(self._url):
if self.defuse == 'always' or self.defuse == 'remote' and is_remote_url(self.base_url):
self.defusing(StringIO(text))
return ElementTree.fromstring(text)
def tostring(self, indent='', max_lines=None, spaces_for_tab=4, xml_declaration=False):
"""Generates a string representation of the XML resource."""
return etree_tostring(self._root, self.get_namespaces(), indent, max_lines, spaces_for_tab, xml_declaration)
elem = self._root
namespaces = self.get_namespaces()
return etree_tostring(elem, namespaces, indent, max_lines, spaces_for_tab, xml_declaration)
def copy(self, **kwargs):
"""Resource copy method. Change init parameters with keyword arguments."""
@ -481,20 +523,66 @@ class XMLResource(object):
return obj
def open(self):
"""Returns a opened resource reader object for the instance URL."""
if self._url is None:
"""
Returns a opened resource reader object for the instance URL. If the
source attribute is a seekable file-like object rewind the source and
return it.
"""
if self.seek(0) == 0:
return self.source
elif self._url is None:
raise XMLSchemaValueError("can't open, the resource has no URL associated.")
try:
return urlopen(self._url, timeout=self.timeout)
except URLError as err:
raise XMLSchemaURLError(reason="cannot access to resource %r: %s" % (self._url, err.reason))
def seek(self, position):
"""
Change stream position if the XML resource was created with a seekable
file-like object. In the other cases this method has no effect.
"""
if not hasattr(self.source, 'read'):
return
try:
if not self.source.seekable():
return
except AttributeError:
pass
else:
return self.source.seek(position)
try:
value = self.source.seek(position)
except AttributeError:
pass
else:
return value if PY3 else position
try:
value = self.source.fp.seek(position)
except AttributeError:
pass
else:
return value if PY3 else position
def close(self):
"""
Close the XML resource if it's created with a file-like object.
In other cases this method has no effect.
"""
try:
self.source.close()
except (AttributeError, TypeError):
pass
def load(self):
"""
Loads the XML text from the data source. If the data source is an Element
the source XML text can't be retrieved.
"""
if self._url is None:
if self._url is None and not hasattr(self.source, 'read'):
return # Created from Element or text source --> already loaded
resource = self.open()
@ -503,16 +591,25 @@ class XMLResource(object):
except (OSError, IOError) as err:
raise XMLSchemaOSError("cannot load data from %r: %s" % (self._url, err))
finally:
resource.close()
# We don't want to close the file obj if it wasn't originally
# opened by `XMLResource`. That is the concern of the code
# where the file obj came from.
if resource is not self.source:
resource.close()
try:
self._text = data.decode('utf-8') if PY3 else data.encode('utf-8')
except UnicodeDecodeError:
if PY3:
self._text = data.decode('iso-8859-1')
else:
with codecs.open(urlsplit(self._url).path, mode='rb', encoding='iso-8859-1') as f:
self._text = f.read().encode('iso-8859-1')
if isinstance(data, bytes):
try:
text = data.decode('utf-8') if PY3 else data.encode('utf-8')
except UnicodeDecodeError:
if PY3:
text = data.decode('iso-8859-1')
else:
with codecs.open(urlsplit(self._url).path, mode='rb', encoding='iso-8859-1') as f:
text = f.read().encode('iso-8859-1')
else:
text = data
self._text = text
def is_lazy(self):
"""Returns `True` if the XML resource is lazy."""
@ -528,18 +625,22 @@ class XMLResource(object):
for elem in self._root.iter(tag):
yield elem
return
elif self.seek(0) == 0:
resource = self.source
elif self._url is not None:
resource = urlopen(self._url, timeout=self.timeout)
else:
resource = StringIO(self._text)
# Note: lazy iteration change the order (top level element is the last)
try:
for event, elem in self.iterparse(resource, events=('end',)):
if tag is None or elem.tag == tag:
yield elem
elem.clear()
finally:
resource.close()
if resource is not self.source:
resource.close()
def iterfind(self, path=None, namespaces=None):
"""XML resource tree iterfind selector."""
@ -550,6 +651,8 @@ class XMLResource(object):
for e in iter_select(self._root, path, namespaces, strict=False):
yield e
return
elif self.seek(0) == 0:
resource = self.source
elif self._url is not None:
resource = urlopen(self._url, timeout=self.timeout)
else:
@ -571,7 +674,11 @@ class XMLResource(object):
yield elem
elem.clear()
else:
selector = Selector(path, namespaces, strict=False)
selector = Selector(path, namespaces, strict=False, parser=XmlResourceXPathParser)
path = path.replace(' ', '').replace('./', '')
path_level = path.count('/') + 1 if path != '.' else 0
select_all = '*' in path and set(path).issubset({'*', '/'})
level = 0
for event, elem in self.iterparse(resource, events=('start', 'end')):
if event == "start":
@ -581,13 +688,15 @@ class XMLResource(object):
level += 1
else:
level -= 1
if elem in selector.select(self._root):
if level == path_level and \
(select_all or elem in selector.select(self._root)):
yield elem
elem.clear()
elif level == 0:
elem.clear()
finally:
resource.close()
if self.source is not resource:
resource.close()
def iter_location_hints(self):
"""Yields schema location hints from the XML tree."""
@ -639,7 +748,7 @@ class XMLResource(object):
local_root = self.root.tag[0] != '{'
nsmap = {}
if self._url is not None:
if self._url is not None or hasattr(self.source, 'read'):
resource = self.open()
try:
for event, node in self.iterparse(resource, events=('start-ns', 'end')):
@ -650,7 +759,11 @@ class XMLResource(object):
except (ElementTree.ParseError, PyElementTree.ParseError, UnicodeEncodeError):
pass
finally:
resource.close()
# We don't want to close the file obj if it wasn't
# originally opened by `XMLResource`. That is the concern
# of the code where the file obj came from.
if self.source is not resource:
resource.close()
elif isinstance(self._text, string_base_type):
try:
for event, node in self.iterparse(StringIO(self._text), events=('start-ns', 'end')):

View File

@ -28,7 +28,7 @@ def test_choice_type(value):
parser = argparse.ArgumentParser(add_help=True)
parser.usage = """%(prog)s TEST_NUM [XML_FILE]
parser.usage = """%(prog)s TEST_NUM [XML_FILE [REPEAT]]
Run memory tests:
1) Package import or schema build
@ -44,6 +44,7 @@ Run memory tests:
parser.add_argument('test_num', metavar="TEST_NUM", type=test_choice_type, help="Test number to run")
parser.add_argument('xml_file', metavar='XML_FILE', nargs='?', help='Input XML file')
parser.add_argument('repeat', metavar='REPEAT', nargs='?', type=int, default=1, help='Repeat operation N times')
args = parser.parse_args()
@ -68,54 +69,62 @@ def build_schema(source):
@profile
def etree_parse(source):
def etree_parse(source, repeat=1):
xt = ElementTree.parse(source)
for _ in xt.iter():
pass
@profile
def etree_full_iterparse(source):
context = ElementTree.iterparse(source, events=('start', 'end'))
for event, elem in context:
if event == 'start':
for _ in range(repeat):
for _ in xt.iter():
pass
@profile
def etree_emptied_iterparse(source):
context = ElementTree.iterparse(source, events=('start', 'end'))
for event, elem in context:
if event == 'end':
elem.clear()
def etree_full_iterparse(source, repeat=1):
for _ in range(repeat):
context = ElementTree.iterparse(source, events=('start', 'end'))
for event, elem in context:
if event == 'start':
pass
@profile
def decode(source):
def etree_emptied_iterparse(source, repeat=1):
for _ in range(repeat):
context = ElementTree.iterparse(source, events=('start', 'end'))
for event, elem in context:
if event == 'end':
elem.clear()
@profile
def decode(source, repeat=1):
decoder = xmlschema.XMLSchema.meta_schema if source.endswith('.xsd') else xmlschema
return decoder.to_dict(source)
for _ in range(repeat):
decoder.to_dict(source)
@profile
def lazy_decode(source):
def lazy_decode(source, repeat=1):
decoder = xmlschema.XMLSchema.meta_schema if source.endswith('.xsd') else xmlschema
for result in decoder.to_dict(xmlschema.XMLResource(source, lazy=True), path='*'):
del result
for _ in range(repeat):
for result in decoder.to_dict(xmlschema.XMLResource(source, lazy=True), path='*'):
del result
@profile
def validate(source):
def validate(source, repeat=1):
validator = xmlschema.XMLSchema.meta_schema if source.endswith('.xsd') else xmlschema
return validator.validate(source)
for _ in range(repeat):
validator.validate(source)
@profile
def lazy_validate(source):
def lazy_validate(source, repeat=1):
if source.endswith('.xsd'):
validator, path = xmlschema.XMLSchema.meta_schema, '*'
else:
validator, path = xmlschema, None
return validator.validate(xmlschema.XMLResource(source, lazy=True), path=path)
for _ in range(repeat):
validator.validate(xmlschema.XMLResource(source, lazy=True), path=path)
if __name__ == '__main__':
@ -127,26 +136,26 @@ if __name__ == '__main__':
build_schema(args.xml_file)
elif args.test_num == 2:
import xml.etree.ElementTree as ElementTree
etree_parse(args.xml_file)
etree_parse(args.xml_file, args.repeat)
elif args.test_num == 3:
import xml.etree.ElementTree as ElementTree
etree_full_iterparse(args.xml_file)
etree_full_iterparse(args.xml_file, args.repeat)
elif args.test_num == 4:
import xml.etree.ElementTree as ElementTree
etree_emptied_iterparse(args.xml_file)
etree_emptied_iterparse(args.xml_file, args.repeat)
elif args.test_num == 5:
import xmlschema
xmlschema.XMLSchema.meta_schema.build()
decode(args.xml_file)
decode(args.xml_file, args.repeat)
elif args.test_num == 6:
import xmlschema
xmlschema.XMLSchema.meta_schema.build()
lazy_decode(args.xml_file)
lazy_decode(args.xml_file, args.repeat)
elif args.test_num == 7:
import xmlschema
xmlschema.XMLSchema.meta_schema.build()
validate(args.xml_file)
validate(args.xml_file, args.repeat)
elif args.test_num == 8:
import xmlschema
xmlschema.XMLSchema.meta_schema.build()
lazy_validate(args.xml_file)
lazy_validate(args.xml_file, args.repeat)

View File

@ -0,0 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<b:rootB xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://example.com/xmlschema/ns-A import-case4a.xsd"
xmlns:b="http://example.com/xmlschema/ns-B" />

View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<a:rootA xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://example.com/xmlschema/ns-B import-case4b.xsd"
xmlns:a="http://example.com/xmlschema/ns-A"
xmlns:b="http://example.com/xmlschema/ns-B">
<b:rootB/>
</a:rootA>

View File

@ -0,0 +1,24 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
An valid import case: two namespaces, each one with a global element that
can be used as valid root element for XML instances.
-->
<xs:schema
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:a="http://example.com/xmlschema/ns-A"
xmlns:b="http://example.com/xmlschema/ns-B"
targetNamespace="http://example.com/xmlschema/ns-A"
elementFormDefault="qualified">
<xs:import schemaLocation="import-case4b.xsd" namespace="http://example.com/xmlschema/ns-B"/>
<xs:element name="rootA" type="a:rootType"/>
<xs:complexType name="rootType">
<xs:sequence>
<xs:element ref="b:rootB" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:schema>

View File

@ -0,0 +1,24 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
An valid import case: two namespaces, each one with a global element that
can be used as valid root element for XML instances.
-->
<xs:schema
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:a="http://example.com/xmlschema/ns-A"
xmlns:b="http://example.com/xmlschema/ns-B"
targetNamespace="http://example.com/xmlschema/ns-B"
elementFormDefault="qualified">
<xs:import schemaLocation="import-case4a.xsd" namespace="http://example.com/xmlschema/ns-A"/>
<xs:element name="rootB" type="b:rootType"/>
<xs:complexType name="rootType">
<xs:sequence>
<xs:element ref="a:rootA" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:schema>

View File

@ -0,0 +1 @@
DUMMY CONTENT

View File

@ -60,6 +60,10 @@ features/namespaces/default_ns_valid2.xsd
features/namespaces/import-case1.xsd --errors=1 # Unknown type
features/namespaces/import-case2.xsd --errors=1 # Missing namespace import in imported chameleon schema
features/namespaces/import-case3.xsd
features/namespaces/import-case4a.xsd
features/namespaces/import-case4b.xsd
features/namespaces/import-case4-1.xml # This and the next are also regression tests for issue #140
features/namespaces/import-case4-2.xml
features/namespaces/include-case1.xsd
features/namespaces/include-case1bis.xsd
features/namespaces/include-case2.xsd

View File

@ -15,6 +15,7 @@ import os
import importlib
import sys
import subprocess
import platform
@unittest.skipIf(sys.version_info < (3,), "In Python 2 ElementTree is not overwritten by cElementTree")
@ -51,6 +52,7 @@ class TestElementTree(unittest.TestCase):
self.assertIs(importlib.import_module('xml.etree.ElementTree'), ElementTree)
self.assertIs(xmlschema_etree.ElementTree, ElementTree)
@unittest.skipIf(platform.system() == 'Windows', "Run only for UNIX based systems.")
def test_element_tree_import_script(self):
test_dir = os.path.dirname(__file__) or '.'

View File

@ -21,6 +21,7 @@ import re
import argparse
TEST_FACTORY_OPTIONS = {
'narrow': '-n' in sys.argv or '--narrow' in sys.argv, # Skip extra checks (eg. other converters)
'extra_cases': '-x' in sys.argv or '--extra' in sys.argv, # Include extra test cases
'check_with_lxml': '-l' in sys.argv or '--lxml' in sys.argv, # Check with lxml.etree.XMLSchema (for XSD 1.0)
}
@ -28,7 +29,8 @@ TEST_FACTORY_OPTIONS = {
RUN_W3C_TEST_SUITE = '-w' in sys.argv or '--w3c' in sys.argv
sys.argv = [a for a in sys.argv if a not in {'-x', '--extra', '-l', '--lxml'}] # Clean sys.argv for unittest
sys.argv = [a for a in sys.argv if a not in
{'-x', '--extra', '-l', '--lxml', '-n', '--narrow'}] # Clean sys.argv for unittest
def get_test_args(args_line):

View File

@ -38,6 +38,7 @@ def tests_factory(test_class_builder, suffix='xml'):
test_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
testfiles = [os.path.join(test_dir, 'test_cases/testfiles')]
narrow = TEST_FACTORY_OPTIONS['narrow']
if TEST_FACTORY_OPTIONS['extra_cases']:
package_dir = os.path.dirname(os.path.dirname(test_dir))
testfiles.extend(glob.glob(os.path.join(package_dir, 'test_cases/testfiles')))
@ -94,7 +95,9 @@ def tests_factory(test_class_builder, suffix='xml'):
schema_class = ObservedXMLSchema11 if test_args.inspect else XMLSchema11
check_with_lxml = False
test_class = test_class_builder(test_file, test_args, test_num, schema_class, check_with_lxml)
test_class = test_class_builder(
test_file, test_args, test_num, schema_class, narrow, check_with_lxml
)
test_classes[test_class.__name__] = test_class
logger.debug("Add XSD %s test class %r.", test_args.version, test_class.__name__)

View File

@ -27,7 +27,7 @@ from xmlschema.tests import XsdValidatorTestCase
from .observers import SchemaObserver
def make_schema_test_class(test_file, test_args, test_num, schema_class, check_with_lxml):
def make_schema_test_class(test_file, test_args, test_num, schema_class, narrow, check_with_lxml):
"""
Creates a schema test class.
@ -35,6 +35,7 @@ def make_schema_test_class(test_file, test_args, test_num, schema_class, check_w
:param test_args: line arguments for test case.
:param test_num: a positive integer number associated with the test case.
:param schema_class: the schema class to use.
:param narrow: skip extra checks (observed inspections).
:param check_with_lxml: if `True` compare with lxml XMLSchema class, reporting anomalies. \
Works only for XSD 1.0 tests.
"""
@ -69,7 +70,7 @@ def make_schema_test_class(test_file, test_args, test_num, schema_class, check_w
xs = schema_class(xsd_file, locations=locations, defuse=defuse, loglevel=loglevel)
self.errors.extend(xs.maps.all_errors)
if inspect:
if narrow and inspect:
components_ids = set([id(c) for c in xs.maps.iter_components()])
missing = [c for c in SchemaObserver.components if id(c) not in components_ids]
if any(c for c in missing):

View File

@ -47,7 +47,7 @@ def iter_nested_items(items, dict_class=dict, list_class=list):
yield items
def make_validator_test_class(test_file, test_args, test_num, schema_class, check_with_lxml):
def make_validator_test_class(test_file, test_args, test_num, schema_class, narrow, check_with_lxml):
"""
Creates a validator test class.
@ -55,6 +55,7 @@ def make_validator_test_class(test_file, test_args, test_num, schema_class, chec
:param test_args: line arguments for test case.
:param test_num: a positive integer number associated with the test case.
:param schema_class: the schema class to use.
:param narrow: skip other converters checks.
:param check_with_lxml: if `True` compare with lxml XMLSchema class, reporting anomalies. \
Works only for XSD 1.0 tests.
"""
@ -239,19 +240,21 @@ def make_validator_test_class(test_file, test_args, test_num, schema_class, chec
options = {'namespaces': namespaces, 'dict_class': ordered_dict_class}
self.check_etree_encode(root, cdata_prefix='#', **options) # Default converter
self.check_etree_encode(root, ParkerConverter, validation='lax', **options)
self.check_etree_encode(root, ParkerConverter, validation='skip', **options)
self.check_etree_encode(root, BadgerFishConverter, **options)
self.check_etree_encode(root, AbderaConverter, **options)
self.check_etree_encode(root, JsonMLConverter, **options)
if narrow:
self.check_etree_encode(root, ParkerConverter, validation='lax', **options)
self.check_etree_encode(root, ParkerConverter, validation='skip', **options)
self.check_etree_encode(root, BadgerFishConverter, **options)
self.check_etree_encode(root, AbderaConverter, **options)
self.check_etree_encode(root, JsonMLConverter, **options)
options.pop('dict_class')
self.check_json_serialization(root, cdata_prefix='#', **options)
self.check_json_serialization(root, ParkerConverter, validation='lax', **options)
self.check_json_serialization(root, ParkerConverter, validation='skip', **options)
self.check_json_serialization(root, BadgerFishConverter, **options)
self.check_json_serialization(root, AbderaConverter, **options)
self.check_json_serialization(root, JsonMLConverter, **options)
if narrow:
self.check_json_serialization(root, ParkerConverter, validation='lax', **options)
self.check_json_serialization(root, ParkerConverter, validation='skip', **options)
self.check_json_serialization(root, BadgerFishConverter, **options)
self.check_json_serialization(root, AbderaConverter, **options)
self.check_json_serialization(root, JsonMLConverter, **options)
def check_decoding_and_encoding_with_lxml(self):
xml_tree = lxml_etree.parse(xml_file)
@ -280,19 +283,21 @@ def make_validator_test_class(test_file, test_args, test_num, schema_class, chec
'dict_class': ordered_dict_class,
}
self.check_etree_encode(root, cdata_prefix='#', **options) # Default converter
self.check_etree_encode(root, ParkerConverter, validation='lax', **options)
self.check_etree_encode(root, ParkerConverter, validation='skip', **options)
self.check_etree_encode(root, BadgerFishConverter, **options)
self.check_etree_encode(root, AbderaConverter, **options)
self.check_etree_encode(root, JsonMLConverter, **options)
if narrow:
self.check_etree_encode(root, ParkerConverter, validation='lax', **options)
self.check_etree_encode(root, ParkerConverter, validation='skip', **options)
self.check_etree_encode(root, BadgerFishConverter, **options)
self.check_etree_encode(root, AbderaConverter, **options)
self.check_etree_encode(root, JsonMLConverter, **options)
options.pop('dict_class')
self.check_json_serialization(root, cdata_prefix='#', **options)
self.check_json_serialization(root, ParkerConverter, validation='lax', **options)
self.check_json_serialization(root, ParkerConverter, validation='skip', **options)
self.check_json_serialization(root, BadgerFishConverter, **options)
self.check_json_serialization(root, AbderaConverter, **options)
self.check_json_serialization(root, JsonMLConverter, **options)
if narrow:
self.check_json_serialization(root, ParkerConverter, validation='lax', **options)
self.check_json_serialization(root, ParkerConverter, validation='skip', **options)
self.check_json_serialization(root, BadgerFishConverter, **options)
self.check_json_serialization(root, AbderaConverter, **options)
self.check_json_serialization(root, JsonMLConverter, **options)
def check_validate_and_is_valid_api(self):
if expected_errors:

View File

@ -40,6 +40,9 @@ class TestHelpers(unittest.TestCase):
self.assertEqual(get_namespace(XSD_SIMPLE_TYPE), XSD_NAMESPACE)
self.assertEqual(get_namespace(''), '')
self.assertEqual(get_namespace(None), '')
self.assertEqual(get_namespace('{}name'), '')
self.assertEqual(get_namespace('{ }name'), ' ')
self.assertEqual(get_namespace('{ ns }name'), ' ns ')
def test_get_qname_functions(self):
self.assertEqual(get_qname(XSD_NAMESPACE, 'element'), XSD_ELEMENT)
@ -81,8 +84,21 @@ class TestHelpers(unittest.TestCase):
self.assertEqual(qname_to_prefixed('', {}), '')
self.assertEqual(qname_to_prefixed('type', {'': XSI_NAMESPACE}), 'type')
self.assertEqual(qname_to_prefixed('type', {'ns': ''}), 'ns:type')
self.assertEqual(qname_to_prefixed('type', {'': ''}), 'type')
self.assertEqual(qname_to_prefixed('{}type', {'': ''}), 'type')
self.assertEqual(qname_to_prefixed('{}type', {'': ''}, use_empty=False), '{}type')
# Attention! in XML the empty namespace (that means no namespace) can be
# associated only with empty prefix, so these cases should never happen.
self.assertEqual(qname_to_prefixed('{}type', {'p': ''}), 'p:type')
self.assertEqual(qname_to_prefixed('type', {'p': ''}), 'type')
self.assertEqual(qname_to_prefixed('{ns}type', {'': 'ns'}, use_empty=True), 'type')
self.assertEqual(qname_to_prefixed('{ns}type', {'': 'ns'}, use_empty=False), '{ns}type')
self.assertEqual(qname_to_prefixed('{ns}type', {'': 'ns', 'p': 'ns'}, use_empty=True), 'p:type')
self.assertEqual(qname_to_prefixed('{ns}type', {'': 'ns', 'p': 'ns'}, use_empty=False), 'p:type')
self.assertEqual(qname_to_prefixed('{ns}type', {'': 'ns', 'p': 'ns0'}, use_empty=True), 'type')
self.assertEqual(qname_to_prefixed('{ns}type', {'': 'ns', 'p': 'ns0'}, use_empty=False), '{ns}type')
def test_get_xsd_annotation(self):
elem = etree_element(XSD_SCHEMA)

View File

@ -15,13 +15,15 @@ This module runs tests concerning model groups validation.
import unittest
from xmlschema import XMLSchema10, XMLSchema11
from xmlschema.validators import ModelVisitor
from xmlschema.validators import XsdElement, ModelVisitor
from xmlschema.compat import ordered_dict_class
from xmlschema.tests import casepath, XsdValidatorTestCase
class TestModelValidation(XsdValidatorTestCase):
schema_class = XMLSchema10
# --- Test helper functions ---
def check_advance_true(self, model, expected=None):
@ -514,6 +516,146 @@ class TestModelValidation(XsdValidatorTestCase):
self.check_advance_true(model) # match choice with <elem4>
self.assertIsNone(model.element)
#
# Test pathological cases
def test_empty_choice_groups(self):
schema = self.schema_class("""<?xml version="1.0"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:group name="group1">
<xs:sequence>
<xs:choice minOccurs="0">
<xs:choice minOccurs="0"/>
</xs:choice>
<xs:element name="elem1"/>
</xs:sequence>
</xs:group>
<xs:element name="root">
<xs:complexType>
<xs:choice>
<xs:group ref="group1"/>
</xs:choice>
</xs:complexType>
</xs:element>
</xs:schema>""")
xml_data = "<root><elem1/></root>"
model = ModelVisitor(schema.elements['root'].type.content_type)
self.assertIsInstance(model.element, XsdElement)
self.assertEqual(model.element.name, 'elem1')
self.assertIsNone(schema.validate(xml_data))
def test_sequence_model_with_extended_occurs(self):
schema = self.schema_class(
"""<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="root">
<xs:complexType>
<xs:sequence minOccurs="2" maxOccurs="unbounded">
<xs:element name="ax" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
""")
self.assertIsNone(schema.validate('<root><ax/><ax/></root>'))
schema = self.schema_class(
"""<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="root">
<xs:complexType>
<xs:sequence minOccurs="0" maxOccurs="unbounded">
<xs:element name="a" minOccurs="2" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
""")
self.assertIsNone(schema.validate('<root><a/><a/></root>'))
self.assertIsNone(schema.validate('<root><a/><a/><a/></root>'))
self.assertIsNone(schema.validate('<root><a/><a/><a/><a/><a/><a/></root>'))
def test_sequence_model_with_nested_choice_model(self):
schema = self.schema_class(
"""<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="root">
<xs:complexType>
<xs:sequence minOccurs="0" maxOccurs="unbounded">
<xs:group ref="group1" minOccurs="2" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:group name="group1">
<xs:choice>
<xs:element name="a" maxOccurs="unbounded"/>
<xs:element name="b"/>
<xs:element name="c"/>
</xs:choice>
</xs:group>
</xs:schema>
""")
self.assertIsNone(schema.validate('<root><a/><a/></root>'))
self.assertIsNone(schema.validate('<root><a/><a/><a/></root>'))
self.assertIsNone(schema.validate('<root><a/><a/><a/><a/><a/><a/></root>'))
def test_sequence_model_with_optional_elements(self):
schema = self.schema_class(
"""<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="root">
<xs:complexType>
<xs:sequence minOccurs="2" maxOccurs="2">
<xs:element name="a" minOccurs="1" maxOccurs="2" />
<xs:element name="b" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
""")
self.assertIsNone(schema.validate('<root><a/><a/><b/></root>'))
def test_choice_model_with_extended_occurs(self):
schema = self.schema_class(
"""<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="root">
<xs:complexType>
<xs:choice maxOccurs="unbounded" minOccurs="0">
<xs:element maxOccurs="5" minOccurs="3" name="a"/>
<xs:element maxOccurs="5" minOccurs="3" name="b"/>
</xs:choice>
</xs:complexType>
</xs:element>
</xs:schema>
""")
self.assertIsNone(schema.validate('<root><a/><a/><a/></root>'))
self.assertIsNone(schema.validate('<root><a/><a/><a/><a/><a/></root>'))
self.assertIsNone(schema.validate('<root><a/><a/><a/><a/><a/><a/></root>'))
schema = self.schema_class(
"""<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="root">
<xs:complexType>
<xs:choice minOccurs="2" maxOccurs="3">
<xs:element name="a" maxOccurs="unbounded"/>
<xs:element name="b" maxOccurs="unbounded"/>
<xs:element name="c"/>
</xs:choice>
</xs:complexType>
</xs:element>
</xs:schema>
""")
self.assertIsNone(schema.validate('<root><a/><a/><a/></root>'))
#
# Tests on issues
def test_issue_086(self):
@ -576,6 +718,98 @@ class TestModelValidation(XsdValidatorTestCase):
class TestModelValidation11(TestModelValidation):
schema_class = XMLSchema11
def test_all_model_with_wildcard(self):
schema = self.schema_class(
"""<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="root">
<xs:complexType>
<xs:all>
<xs:element name="a" type="xs:string" />
<xs:any maxOccurs="3" processContents="lax" />
</xs:all>
</xs:complexType>
</xs:element>
</xs:schema>
""")
xml_data = """
<root>
<wildcard1/>
<a>1</a>
<wildcard2/>
<wildcard3/>
</root>
"""
self.assertIsNone(schema.validate(xml_data))
def test_all_model_with_extended_occurs(self):
schema = self.schema_class(
"""<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="root">
<xs:complexType>
<xs:all>
<xs:element name="a" minOccurs="0" maxOccurs="5"/>
<xs:element name="b" maxOccurs="5"/>
<xs:element name="c" minOccurs="2" maxOccurs="5"/>
<xs:element name="d" />
</xs:all>
</xs:complexType>
</xs:element>
</xs:schema>
""")
xml_data = '<root><a/><b/><d/><c/><a/><c/></root>'
self.assertIsNone(schema.validate(xml_data))
def test_all_model_with_relaxed_occurs(self):
schema = self.schema_class(
"""<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="root">
<xs:complexType>
<xs:all>
<xs:element name="a" minOccurs="0" maxOccurs="5"/>
<xs:element name="b" maxOccurs="5"/>
<xs:element name="c" minOccurs="2" maxOccurs="unbounded"/>
<xs:element name="d" />
</xs:all>
</xs:complexType>
</xs:element>
</xs:schema>
""")
xml_data = '<root><a/><b/><d/><c/><a/><c/><c/><a/><a/><b/></root>'
self.assertIsNone(schema.validate(xml_data))
schema = self.schema_class(
"""<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="root">
<xs:complexType>
<xs:all>
<xs:element name="a" minOccurs="0" maxOccurs="5"/>
<xs:group ref="group1"/>
</xs:all>
</xs:complexType>
</xs:element>
<xs:group name="group1">
<xs:all>
<xs:element name="b" maxOccurs="5"/>
<xs:element name="c" minOccurs="2" maxOccurs="unbounded"/>
<xs:element name="d" />
</xs:all>
</xs:group>
</xs:schema>
""")
self.assertIsNone(schema.validate(xml_data))
class TestModelBasedSorting(XsdValidatorTestCase):

View File

@ -14,20 +14,23 @@ This module runs tests concerning resources.
"""
import unittest
import os
import platform
try:
from pathlib import PureWindowsPath, PurePath
except ImportError:
# noinspection PyPackageRequirements
from pathlib2 import PureWindowsPath, PurePath
from xmlschema import (
fetch_namespaces, fetch_resource, normalize_url, fetch_schema, fetch_schema_locations,
load_xml_resource, XMLResource, XMLSchemaURLError
load_xml_resource, XMLResource, XMLSchemaURLError, XMLSchema
)
from xmlschema.tests import casepath
from xmlschema.tests import SKIP_REMOTE_TESTS, casepath
from xmlschema.compat import urlopen, urlsplit, uses_relative, StringIO
from xmlschema.etree import ElementTree, PyElementTree, lxml_etree, \
etree_element, py_etree_element
from xmlschema.namespaces import XSD_NAMESPACE
from xmlschema.helpers import is_etree_element
@ -40,10 +43,20 @@ def add_leading_slash(path):
return '/' + path if path and path[0] not in ('/', '\\') else path
def filter_windows_path(path):
if path.startswith('/\\'):
return path[1:]
elif path and path[0] not in ('/', '\\'):
return '/' + path
else:
return path
class TestResources(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.schema_class = XMLSchema
cls.vh_dir = casepath('examples/vehicles')
cls.vh_xsd_file = casepath('examples/vehicles/vehicles.xsd')
cls.vh_xml_file = casepath('examples/vehicles/vehicles.xml')
@ -64,14 +77,14 @@ class TestResources(unittest.TestCase):
self.assertEqual(url_parts.fragment, expected_parts.fragment, "%r: Fragment parts differ." % url)
if is_windows_path(url_parts.path) or is_windows_path(expected_parts.path):
path = PureWindowsPath(url_parts.path)
expected_path = PureWindowsPath(add_leading_slash(expected_parts.path))
path = PureWindowsPath(filter_windows_path(url_parts.path))
expected_path = PureWindowsPath(filter_windows_path(expected_parts.path))
else:
path = PurePath(url_parts.path)
expected_path = PurePath(expected_parts.path)
self.assertEqual(path, expected_path, "%r: Paths differ." % url)
def test_normalize_url(self):
def test_normalize_url_posix(self):
url1 = "https://example.com/xsd/other_schema.xsd"
self.check_url(normalize_url(url1, base_url="/path_my_schema/schema.xsd"), url1)
@ -94,6 +107,7 @@ class TestResources(unittest.TestCase):
self.check_url(normalize_url('dummy path.xsd', 'http://site/base'), 'http://site/base/dummy%20path.xsd')
self.check_url(normalize_url('dummy path.xsd', 'file://host/home/'), 'file://host/home/dummy path.xsd')
def test_normalize_url_windows(self):
win_abs_path1 = 'z:\\Dir_1_0\\Dir2-0\\schemas/XSD_1.0/XMLSchema.xsd'
win_abs_path2 = 'z:\\Dir-1.0\\Dir-2_0\\'
self.check_url(normalize_url(win_abs_path1), win_abs_path1)
@ -104,7 +118,9 @@ class TestResources(unittest.TestCase):
self.check_url(
normalize_url('xsd1.0/schema.xsd', win_abs_path2), 'file:///z:\\Dir-1.0\\Dir-2_0/xsd1.0/schema.xsd'
)
self.check_url(normalize_url('file:///\\k:\\Dir A\\schema.xsd'), 'file:///k:\\Dir A\\schema.xsd')
def test_normalize_url_slashes(self):
# Issue #116
self.assertEqual(
normalize_url('//anaconda/envs/testenv/lib/python3.6/site-packages/xmlschema/validators/schemas/'),
@ -118,12 +134,29 @@ class TestResources(unittest.TestCase):
self.assertEqual(normalize_url('dir2/schema.xsd', '//root/dir1'), 'file:///root/dir1/dir2/schema.xsd')
self.assertEqual(normalize_url('dir2/schema.xsd', '////root/dir1'), 'file:///root/dir1/dir2/schema.xsd')
def test_normalize_url_hash_character(self):
self.check_url(normalize_url('issue #000.xml', 'file:///dir1/dir2/'),
'file:///dir1/dir2/issue %23000.xml')
self.check_url(normalize_url('data.xml', 'file:///dir1/dir2/issue 000'),
'file:///dir1/dir2/issue 000/data.xml')
self.check_url(normalize_url('data.xml', '/dir1/dir2/issue #000'),
'/dir1/dir2/issue %23000/data.xml')
def test_fetch_resource(self):
wrong_path = casepath('resources/dummy_file.txt')
self.assertRaises(XMLSchemaURLError, fetch_resource, wrong_path)
right_path = casepath('resources/dummy file.txt')
self.assertTrue(fetch_resource(right_path).endswith('dummy file.txt'))
ambiguous_path = casepath('resources/dummy file #2.txt')
self.assertTrue(fetch_resource(ambiguous_path).endswith('dummy file %232.txt'))
res = urlopen(fetch_resource(ambiguous_path))
try:
self.assertEqual(res.read(), b'DUMMY CONTENT')
finally:
res.close()
def test_fetch_namespaces(self):
self.assertFalse(fetch_namespaces(casepath('resources/malformed.xml')))
@ -177,7 +210,7 @@ class TestResources(unittest.TestCase):
resource = XMLResource(vh_root)
self.assertEqual(resource.source, vh_root)
self.assertIsNone(resource.document)
self.assertIsInstance(resource.document, ElementTree.ElementTree)
self.assertEqual(resource.root.tag, '{http://example.com/vehicles}vehicles')
self.assertIsNone(resource.url)
self.assertIsNone(resource.text)
@ -206,17 +239,22 @@ class TestResources(unittest.TestCase):
resource.load()
self.assertIsNone(resource.text)
@unittest.skipIf(
platform.python_version_tuple()[0] < '3',
"Skip: urlopen on Python 2 can't seek 'file://' paths."
)
def test_xml_resource_from_resource(self):
xml_file = urlopen('file://{}'.format(add_leading_slash(self.vh_xml_file)))
try:
resource = XMLResource(xml_file)
self.assertEqual(resource.source, xml_file)
self.assertEqual(resource.root.tag, '{http://example.com/vehicles}vehicles')
self.check_url(resource.url, self.vh_xml_file)
self.assertIsNone(resource.url)
self.assertIsNone(resource.document)
self.assertIsNone(resource.text)
resource.load()
self.assertTrue(resource.text.startswith('<?xml'))
self.assertFalse(xml_file.closed)
finally:
xml_file.close()
@ -225,21 +263,35 @@ class TestResources(unittest.TestCase):
resource = XMLResource(schema_file)
self.assertEqual(resource.source, schema_file)
self.assertEqual(resource.root.tag, '{http://www.w3.org/2001/XMLSchema}schema')
self.check_url(resource.url, self.vh_xsd_file)
self.assertIsNone(resource.url)
self.assertIsNone(resource.document)
self.assertIsNone(resource.text)
resource.load()
self.assertTrue(resource.text.startswith('<xs:schema'))
self.assertFalse(schema_file.closed)
for _ in resource.iter():
pass
self.assertFalse(schema_file.closed)
for _ in resource.iterfind():
pass
self.assertFalse(schema_file.closed)
with open(self.vh_xsd_file) as schema_file:
resource = XMLResource(schema_file, lazy=False)
self.assertEqual(resource.source, schema_file)
self.assertEqual(resource.root.tag, '{http://www.w3.org/2001/XMLSchema}schema')
self.check_url(resource.url, self.vh_xsd_file)
self.assertIsNone(resource.url)
self.assertIsInstance(resource.document, ElementTree.ElementTree)
self.assertIsNone(resource.text)
resource.load()
self.assertTrue(resource.text.startswith('<xs:schema'))
self.assertFalse(schema_file.closed)
for _ in resource.iter():
pass
self.assertFalse(schema_file.closed)
for _ in resource.iterfind():
pass
self.assertFalse(schema_file.closed)
def test_xml_resource_from_string(self):
with open(self.vh_xsd_file) as schema_file:
@ -323,14 +375,36 @@ class TestResources(unittest.TestCase):
resource.load()
self.assertTrue(resource.is_loaded())
def test_xml_resource_open(self):
def test_xml_resource_parse(self):
resource = XMLResource(self.vh_xml_file)
xml_file = resource.open()
data = xml_file.read().decode('utf-8')
self.assertTrue(data.startswith('<?xml '))
xml_file.close()
resource = XMLResource('<A/>')
self.assertRaises(ValueError, resource.open)
self.assertEqual(resource.defuse, 'remote')
xml_document = resource.parse(self.col_xml_file)
self.assertTrue(is_etree_element(xml_document.getroot()))
resource.defuse = 'always'
xml_document = resource.parse(self.col_xml_file)
self.assertTrue(is_etree_element(xml_document.getroot()))
def test_xml_resource_iterparse(self):
resource = XMLResource(self.vh_xml_file)
self.assertEqual(resource.defuse, 'remote')
for _, elem in resource.iterparse(self.col_xml_file, events=('end',)):
self.assertTrue(is_etree_element(elem))
resource.defuse = 'always'
for _, elem in resource.iterparse(self.col_xml_file, events=('end',)):
self.assertTrue(is_etree_element(elem))
def test_xml_resource_fromstring(self):
resource = XMLResource(self.vh_xml_file)
self.assertEqual(resource.defuse, 'remote')
self.assertEqual(resource.fromstring('<root/>').tag, 'root')
resource.defuse = 'always'
self.assertEqual(resource.fromstring('<root/>').tag, 'root')
def test_xml_resource_tostring(self):
resource = XMLResource(self.vh_xml_file)
@ -352,16 +426,109 @@ class TestResources(unittest.TestCase):
resource2 = resource.copy()
self.assertEqual(resource.text, resource2.text)
def test_xml_resource_open(self):
resource = XMLResource(self.vh_xml_file)
xml_file = resource.open()
self.assertIsNot(xml_file, resource.source)
data = xml_file.read().decode('utf-8')
self.assertTrue(data.startswith('<?xml '))
xml_file.close()
resource = XMLResource('<A/>')
self.assertRaises(ValueError, resource.open)
resource = XMLResource(source=open(self.vh_xml_file))
xml_file = resource.open()
self.assertIs(xml_file, resource.source)
xml_file.close()
def test_xml_resource_seek(self):
resource = XMLResource(self.vh_xml_file)
self.assertIsNone(resource.seek(0))
self.assertIsNone(resource.seek(1))
xml_file = open(self.vh_xml_file)
resource = XMLResource(source=xml_file)
self.assertEqual(resource.seek(0), 0)
self.assertEqual(resource.seek(1), 1)
xml_file.close()
def test_xml_resource_close(self):
resource = XMLResource(self.vh_xml_file)
resource.close()
xml_file = resource.open()
self.assertTrue(callable(xml_file.read))
with open(self.vh_xml_file) as xml_file:
resource = XMLResource(source=xml_file)
resource.close()
with self.assertRaises(ValueError):
resource.open()
def test_xml_resource_iter(self):
resource = XMLResource(self.schema_class.meta_schema.source.url, lazy=False)
self.assertFalse(resource.is_lazy())
lazy_resource = XMLResource(self.schema_class.meta_schema.source.url)
self.assertTrue(lazy_resource.is_lazy())
tags = [x.tag for x in resource.iter()]
self.assertEqual(len(tags), 1390)
self.assertEqual(tags[0], '{%s}schema' % XSD_NAMESPACE)
lazy_tags = [x.tag for x in lazy_resource.iter()]
self.assertEqual(len(lazy_tags), 1390)
self.assertEqual(lazy_tags[-1], '{%s}schema' % XSD_NAMESPACE)
self.assertNotEqual(tags, lazy_tags)
tags = [x.tag for x in resource.iter('{%s}complexType' % XSD_NAMESPACE)]
self.assertEqual(len(tags), 56)
self.assertEqual(tags[0], '{%s}complexType' % XSD_NAMESPACE)
self.assertListEqual(tags, [x.tag for x in lazy_resource.iter('{%s}complexType' % XSD_NAMESPACE)])
def test_xml_resource_iterfind(self):
namespaces = {'xs': XSD_NAMESPACE}
resource = XMLResource(self.schema_class.meta_schema.source.url, lazy=False)
self.assertFalse(resource.is_lazy())
lazy_resource = XMLResource(self.schema_class.meta_schema.source.url)
self.assertTrue(lazy_resource.is_lazy())
# Note: Element change with lazy resource so compare only tags
tags = [x.tag for x in resource.iterfind()]
self.assertEqual(len(tags), 1)
self.assertEqual(tags[0], '{%s}schema' % XSD_NAMESPACE)
self.assertListEqual(tags, [x.tag for x in lazy_resource.iterfind()])
tags = [x.tag for x in resource.iterfind(path='.')]
self.assertEqual(len(tags), 1)
self.assertEqual(tags[0], '{%s}schema' % XSD_NAMESPACE)
self.assertListEqual(tags, [x.tag for x in lazy_resource.iterfind(path='.')])
tags = [x.tag for x in resource.iterfind(path='*')]
self.assertEqual(len(tags), 156)
self.assertEqual(tags[0], '{%s}annotation' % XSD_NAMESPACE)
self.assertListEqual(tags, [x.tag for x in lazy_resource.iterfind(path='*')])
tags = [x.tag for x in resource.iterfind('xs:complexType', namespaces)]
self.assertEqual(len(tags), 35)
self.assertTrue(all(t == '{%s}complexType' % XSD_NAMESPACE for t in tags))
self.assertListEqual(tags, [x.tag for x in lazy_resource.iterfind('xs:complexType', namespaces)])
tags = [x.tag for x in resource.iterfind('. /. / xs:complexType', namespaces)]
self.assertEqual(len(tags), 35)
self.assertTrue(all(t == '{%s}complexType' % XSD_NAMESPACE for t in tags))
self.assertListEqual(tags, [x.tag for x in lazy_resource.iterfind('. /. / xs:complexType', namespaces)])
def test_xml_resource_get_namespaces(self):
with open(self.vh_xml_file) as schema_file:
resource = XMLResource(schema_file)
self.assertEqual(resource.url, normalize_url(self.vh_xml_file))
self.assertIsNone(resource.url)
self.assertEqual(set(resource.get_namespaces().keys()), {'vh', 'xsi'})
self.assertFalse(schema_file.closed)
with open(self.vh_xsd_file) as schema_file:
resource = XMLResource(schema_file)
self.assertEqual(resource.url, normalize_url(self.vh_xsd_file))
self.assertIsNone(resource.url)
self.assertEqual(set(resource.get_namespaces().keys()), {'xs', 'vh'})
self.assertFalse(schema_file.closed)
resource = XMLResource(self.col_xml_file)
self.assertEqual(resource.url, normalize_url(self.col_xml_file))
@ -378,6 +545,47 @@ class TestResources(unittest.TestCase):
self.assertEqual(len(locations), 2)
self.check_url(locations[0][1], os.path.join(self.col_dir, 'other.xsd'))
@unittest.skipIf(SKIP_REMOTE_TESTS or platform.system() == 'Windows',
"Remote networks are not accessible or avoid SSL verification error on Windows.")
def test_remote_schemas_loading(self):
col_schema = self.schema_class("https://raw.githubusercontent.com/brunato/xmlschema/master/"
"xmlschema/tests/test_cases/examples/collection/collection.xsd")
self.assertTrue(isinstance(col_schema, self.schema_class))
vh_schema = self.schema_class("https://raw.githubusercontent.com/brunato/xmlschema/master/"
"xmlschema/tests/test_cases/examples/vehicles/vehicles.xsd")
self.assertTrue(isinstance(vh_schema, self.schema_class))
def test_schema_defuse(self):
vh_schema = self.schema_class(self.vh_xsd_file, defuse='always')
self.assertIsInstance(vh_schema.root, etree_element)
for schema in vh_schema.maps.iter_schemas():
self.assertIsInstance(schema.root, etree_element)
def test_fid_with_name_attr(self):
"""XMLResource gets correct data when passed a file like object
with a name attribute that isn't on disk.
These file descriptors appear when working with the contents from a
zip using the zipfile module and with Django files in some
instances.
"""
class FileProxy(object):
def __init__(self, fid, fake_name):
self._fid = fid
self.name = fake_name
def __getattr__(self, attr):
try:
return self.__dict__[attr]
except (KeyError, AttributeError):
return getattr(self.__dict__["_fid"], attr)
with open(self.vh_xml_file) as xml_file:
resource = XMLResource(FileProxy(xml_file, fake_name="not__on____disk.xml"))
self.assertIsNone(resource.url)
self.assertEqual(set(resource.get_namespaces().keys()), {'vh', 'xsi'})
self.assertFalse(xml_file.closed)
if __name__ == '__main__':
from xmlschema.tests import print_test_header

View File

@ -99,6 +99,9 @@ SKIPPED_TESTS = {
'../msData/additional/test93490_4.xml', # 4795: https://www.w3.org/Bugs/Public/show_bug.cgi?id=4078
'../msData/additional/test93490_8.xml', # 4799: Idem
# Valid XML tests
'../ibmData/instance_invalid/S3_4_2_4/s3_4_2_4ii03.xml', # defaultAttributeApply is true (false in comment)
# Skip for missing XML version 1.1 implementation
'../saxonData/XmlVersions/xv001.v01.xml', # 14850
'../saxonData/XmlVersions/xv003.v01.xml', # 14852

View File

@ -10,6 +10,7 @@
# @author Davide Brunato <brunato@sissa.it>
#
import unittest
import sys
import xmlschema
from xmlschema import XMLSchemaValidationError
@ -55,7 +56,13 @@ class TestValidation(XsdValidatorTestCase):
path_line = str(err).splitlines()[-1]
else:
path_line = ''
self.assertEqual('Path: /vhx:vehicles/vhx:cars', path_line)
if sys.version_info >= (3, 6):
self.assertEqual('Path: /vhx:vehicles/vhx:cars', path_line)
else:
self.assertTrue(
'Path: /vh:vehicles/vh:cars' == path_line or 'Path: /vhx:vehicles/vhx:cars', path_line
) # Due to unordered dicts
# Issue #80
vh_2_xt = ElementTree.parse(vh_2_file)
@ -70,13 +77,33 @@ class TestValidation(XsdValidatorTestCase):
self.assertRaises(XMLSchemaValidationError, xsd_element.decode, source.root, namespaces=namespaces)
# Testing adding 'no_depth' argument
for result in xsd_element.iter_decode(source.root, 'strict', namespaces=namespaces,
source=source, no_depth=True):
source=source, max_depth=1):
del result
self.assertIsNone(xmlschema.validate(self.col_xml_file, lazy=True))
def test_max_depth_argument(self):
schema = self.schema_class(self.col_xsd_file)
self.assertEqual(
schema.decode(self.col_xml_file, max_depth=1),
{'@xmlns:col': 'http://example.com/ns/collection',
'@xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance',
'@xsi:schemaLocation': 'http://example.com/ns/collection collection.xsd'})
xmlschema.limits.MAX_XML_DEPTH = 1
with self.assertRaises(XMLSchemaValidationError):
self.assertEqual(schema.decode(self.col_xml_file))
xmlschema.limits.MAX_XML_DEPTH = 9999
self.assertEqual(
schema.decode(self.col_xml_file, max_depth=2),
{'@xmlns:col': 'http://example.com/ns/collection',
'@xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance',
'@xsi:schemaLocation': 'http://example.com/ns/collection collection.xsd',
'object': [{'@id': 'b0836217462', '@available': True},
{'@id': 'b0836217463', '@available': True}]})
class TestValidation11(TestValidation):
schema_class = XMLSchema11

View File

@ -13,9 +13,11 @@ from __future__ import print_function, unicode_literals
import unittest
import platform
import warnings
import os
from xmlschema import XMLSchemaParseError, XMLSchemaIncludeWarning, XMLSchemaImportWarning
from xmlschema.etree import etree_element
from xmlschema.namespaces import SCHEMAS_DIR
from xmlschema.qnames import XSD_ELEMENT, XSI_TYPE
from xmlschema.tests import SKIP_REMOTE_TESTS, XsdValidatorTestCase
from xmlschema.validators import XMLSchema11
@ -113,8 +115,7 @@ class TestXMLSchema10(XsdValidatorTestCase):
</xs:simpleType>""", XMLSchemaParseError)
def test_base_schemas(self):
from xmlschema.validators.schema import XML_SCHEMA_FILE
self.schema_class(XML_SCHEMA_FILE)
self.schema_class(os.path.join(SCHEMAS_DIR, 'xml_minimal.xsd'))
def test_root_elements(self):
# Test issue #107 fix
@ -141,10 +142,12 @@ class TestXMLSchema10(XsdValidatorTestCase):
"Remote networks are not accessible or avoid SSL verification error on Windows.")
def test_remote_schemas_loading(self):
col_schema = self.schema_class("https://raw.githubusercontent.com/brunato/xmlschema/master/"
"xmlschema/tests/test_cases/examples/collection/collection.xsd")
"xmlschema/tests/test_cases/examples/collection/collection.xsd",
timeout=300)
self.assertTrue(isinstance(col_schema, self.schema_class))
vh_schema = self.schema_class("https://raw.githubusercontent.com/brunato/xmlschema/master/"
"xmlschema/tests/test_cases/examples/vehicles/vehicles.xsd")
"xmlschema/tests/test_cases/examples/vehicles/vehicles.xsd",
timeout=300)
self.assertTrue(isinstance(vh_schema, self.schema_class))
def test_schema_defuse(self):

View File

@ -235,7 +235,8 @@ class XsdAttribute(XsdComponent, ValidationMixin):
elif text == self.fixed or validation == 'skip':
pass
elif self.type.text_decode(text) != self.type.text_decode(self.fixed):
yield self.validation_error(validation, "value differs from fixed value", text, **kwargs)
msg = "attribute {!r} has a fixed value {!r}".format(self.name, self.fixed)
yield self.validation_error(validation, msg, text, **kwargs)
for result in self.type.iter_decode(text, validation, **kwargs):
if isinstance(result, XMLSchemaValidationError):
@ -286,9 +287,12 @@ class Xsd11Attribute(XsdAttribute):
@property
def target_namespace(self):
if self._target_namespace is None:
if self._target_namespace is not None:
return self._target_namespace
elif self.ref is not None:
return self.ref.target_namespace
else:
return self.schema.target_namespace
return self._target_namespace
def _parse(self):
super(Xsd11Attribute, self)._parse()
@ -594,7 +598,11 @@ class XsdAttributeGroup(MutableMapping, XsdComponent, ValidationMixin):
reason = "missing required attribute: %r" % k
yield self.validation_error(validation, reason, attrs, **kwargs)
kwargs['level'] = kwargs.get('level', 0) + 1
use_defaults = kwargs.get('use_defaults', True)
id_map = kwargs.get('id_map', '')
num_id = len(id_map)
additional_attrs = [(k, v) for k, v in self.iter_predefined(use_defaults) if k not in attrs]
if additional_attrs:
attrs = {k: v for k, v in attrs.items()}
@ -638,6 +646,10 @@ class XsdAttributeGroup(MutableMapping, XsdComponent, ValidationMixin):
result_list.append((name, result))
break
if self.xsd_version == '1.0' and len(id_map) - num_id > 1:
reason = "No more than one attribute of type ID should be present in an element"
yield self.validation_error(validation, reason, attrs, **kwargs)
if kwargs.get('fill_missing') is True:
if filler is None:
result_list.extend((k, None) for k in self._attribute_group

View File

@ -13,8 +13,9 @@ from __future__ import unicode_literals
from ..exceptions import XMLSchemaValueError
from ..qnames import XSD_ANNOTATION, XSD_GROUP, XSD_ATTRIBUTE_GROUP, XSD_SEQUENCE, \
XSD_ALL, XSD_CHOICE, XSD_ANY_ATTRIBUTE, XSD_ATTRIBUTE, XSD_COMPLEX_CONTENT, \
XSD_RESTRICTION, XSD_COMPLEX_TYPE, XSD_EXTENSION, XSD_ANY_TYPE, XSD_SIMPLE_CONTENT, \
XSD_ANY_SIMPLE_TYPE, XSD_OPEN_CONTENT, XSD_ASSERT, get_qname, local_name
XSD_RESTRICTION, XSD_COMPLEX_TYPE, XSD_EXTENSION, XSD_ANY_TYPE, XSD_OVERRIDE, \
XSD_SIMPLE_CONTENT, XSD_ANY_SIMPLE_TYPE, XSD_OPEN_CONTENT, XSD_ASSERT, \
get_qname, local_name
from ..helpers import get_xsd_derivation_attribute
from .exceptions import XMLSchemaValidationError, XMLSchemaDecodeError
@ -52,6 +53,8 @@ class XsdComplexType(XsdType, ValidationMixin):
mixed = False
assertions = ()
open_content = None
content_type = None
default_open_content = None
_block = None
_ADMITTED_TAGS = {XSD_COMPLEX_TYPE, XSD_RESTRICTION}
@ -138,6 +141,10 @@ class XsdComplexType(XsdType, ValidationMixin):
elif content_elem.tag in {XSD_GROUP, XSD_SEQUENCE, XSD_ALL, XSD_CHOICE}:
self.content_type = self.schema.BUILDERS.group_class(content_elem, self.schema, self)
default_open_content = self.default_open_content
if default_open_content and \
(self.mixed or self.content_type or default_open_content.applies_to_empty):
self.open_content = default_open_content
self._parse_content_tail(elem)
elif content_elem.tag == XSD_SIMPLE_CONTENT:
@ -179,6 +186,7 @@ class XsdComplexType(XsdType, ValidationMixin):
self.base_type = base_type
elif self.redefine:
self.base_type = self.redefine
self.open_content = None
if derivation_elem.tag == XSD_RESTRICTION:
self._parse_complex_content_restriction(derivation_elem, base_type)
@ -345,10 +353,10 @@ class XsdComplexType(XsdType, ValidationMixin):
)
if not self.open_content:
if self.schema.default_open_content:
self.open_content = self.schema.default_open_content
elif getattr(base_type, 'open_content', None):
self.open_content = base_type.open_content
default_open_content = self.default_open_content
if default_open_content and \
(self.mixed or content_type or default_open_content.applies_to_empty):
self.open_content = default_open_content
if self.open_content and content_type and \
not self.open_content.is_restriction(base_type.open_content):
@ -455,6 +463,8 @@ class XsdComplexType(XsdType, ValidationMixin):
def is_empty(self):
if self.name == XSD_ANY_TYPE:
return False
elif self.open_content and self.open_content.mode != 'none':
return False
return self.content_type.is_empty()
def is_emptiable(self):
@ -504,7 +514,7 @@ class XsdComplexType(XsdType, ValidationMixin):
elif other.name == XSD_ANY_TYPE:
return True
elif self.base_type is other:
return derivation is None or self.base_type.derivation == derivation
return derivation is None # or self.base_type.derivation == derivation
elif hasattr(other, 'member_types'):
return any(self.is_derived(m, derivation) for m in other.member_types)
elif self.base_type is None:
@ -573,6 +583,10 @@ class XsdComplexType(XsdType, ValidationMixin):
:return: yields a 3-tuple (simple content, complex content, attributes) containing \
the decoded parts, eventually preceded by a sequence of validation or decoding errors.
"""
if self.is_empty() and elem.text:
reason = "character data between child elements not allowed because the type's content is empty"
yield self.validation_error(validation, reason, elem, **kwargs)
# XSD 1.1 assertions
for assertion in self.assertions:
for error in assertion(elem, **kwargs):
@ -667,6 +681,32 @@ class Xsd11ComplexType(XsdComplexType):
_CONTENT_TAIL_TAGS = {XSD_ATTRIBUTE_GROUP, XSD_ATTRIBUTE, XSD_ANY_ATTRIBUTE, XSD_ASSERT}
@property
def default_attributes(self):
if self.redefine is not None:
return self.schema.default_attributes
for child in filter(lambda x: x.tag == XSD_OVERRIDE, self.schema.root):
if self.elem in child:
schema = self.schema.includes[child.attrib['schemaLocation']]
if schema.override is self.schema:
return schema.default_attributes
else:
return self.schema.default_attributes
@property
def default_open_content(self):
if self.parent is not None:
return self.schema.default_open_content
for child in filter(lambda x: x.tag == XSD_OVERRIDE, self.schema.root):
if self.elem in child:
schema = self.schema.includes[child.attrib['schemaLocation']]
if schema.override is self.schema:
return schema.default_open_content
else:
return self.schema.default_open_content
def _parse(self):
super(Xsd11ComplexType, self)._parse()
@ -679,13 +719,12 @@ class Xsd11ComplexType(XsdComplexType):
# Add open content to complex content type
if isinstance(self.content_type, XsdGroup):
open_content = self.open_content or self.schema.default_open_content
if open_content is None:
pass
elif open_content.mode == 'interleave':
self.content_type.interleave = self.content_type.suffix = open_content.any_element
elif open_content.mode == 'suffix':
self.content_type.suffix = open_content.any_element
if self.open_content is None:
assert self.content_type.interleave is None and self.content_type.suffix is None
elif self.open_content.mode == 'interleave':
self.content_type.interleave = self.content_type.suffix = self.open_content.any_element
elif self.open_content.mode == 'suffix':
self.content_type.suffix = self.open_content.any_element
# Add inheritable attributes
if hasattr(self.base_type, 'attributes'):
@ -697,23 +736,18 @@ class Xsd11ComplexType(XsdComplexType):
self.parse_error("attribute %r must be inheritable")
if 'defaultAttributesApply' in self.elem.attrib:
if self.elem.attrib['defaultAttributesApply'].strip() in {'false', '0'}:
self.default_attributes_apply = False
attr = self.elem.attrib['defaultAttributesApply'].strip()
self.default_attributes_apply = False if attr in {'false', '0'} else True
else:
self.default_attributes_apply = True
# Add default attributes
if self.redefine is None:
default_attributes = self.schema.default_attributes
else:
default_attributes = self.redefine.schema.default_attributes
if default_attributes is None:
pass
elif self.default_attributes_apply and not self.is_override():
if self.redefine is None and any(k in self.attributes for k in default_attributes):
self.parse_error("at least a default attribute is already declared in the complex type")
self.attributes.update(
(k, v) for k, v in default_attributes.items() if k not in self.attributes
)
if self.default_attributes_apply:
default_attributes = self.default_attributes
if default_attributes is not None:
if self.redefine is None and any(k in self.attributes for k in default_attributes):
self.parse_error("at least a default attribute is already declared in the complex type")
self.attributes.update((k, v) for k, v in default_attributes.items())
def _parse_complex_content_extension(self, elem, base_type):
# Complex content extension with simple base is forbidden XSD 1.1.
@ -738,19 +772,6 @@ class Xsd11ComplexType(XsdComplexType):
else:
group_elem = None
if not self.open_content:
if self.schema.default_open_content:
self.open_content = self.schema.default_open_content
elif getattr(base_type, 'open_content', None):
self.open_content = base_type.open_content
try:
if self.open_content and not base_type.open_content.is_restriction(self.open_content):
msg = "{!r} is not an extension of the base type {!r}"
self.parse_error(msg.format(self.open_content, base_type.open_content))
except AttributeError:
pass
if not base_type.content_type:
if not base_type.mixed:
# Empty element-only model extension: don't create a nested sequence group.
@ -825,6 +846,21 @@ class Xsd11ComplexType(XsdComplexType):
else:
self.content_type = self.schema.create_empty_content_group(self)
if not self.open_content:
default_open_content = self.default_open_content
if default_open_content and \
(self.mixed or self.content_type or default_open_content.applies_to_empty):
self.open_content = default_open_content
elif base_type.open_content:
self.open_content = base_type.open_content
if base_type.open_content and self.open_content is not base_type.open_content:
if self.open_content.mode == 'none':
self.open_content = base_type.open_content
elif not base_type.open_content.is_restriction(self.open_content):
msg = "{!r} is not an extension of the base type {!r}"
self.parse_error(msg.format(self.open_content, base_type.open_content))
self._parse_content_tail(elem, derivation='extension', base_attributes=base_type.attributes)
def _parse_content_tail(self, elem, **kwargs):

View File

@ -21,7 +21,7 @@ from ..exceptions import XMLSchemaAttributeError
from ..qnames import XSD_ANNOTATION, XSD_GROUP, XSD_SEQUENCE, XSD_ALL, \
XSD_CHOICE, XSD_ATTRIBUTE_GROUP, XSD_COMPLEX_TYPE, XSD_SIMPLE_TYPE, \
XSD_ALTERNATIVE, XSD_ELEMENT, XSD_ANY_TYPE, XSD_UNIQUE, XSD_KEY, \
XSD_KEYREF, XSI_NIL, XSI_TYPE, XSD_ID, XSD_ERROR, get_qname
XSD_KEYREF, XSI_NIL, XSI_TYPE, XSD_ERROR, get_qname
from ..etree import etree_element
from ..helpers import get_xsd_derivation_attribute, get_xsd_form_attribute, \
ParticleCounter, strictly_equal
@ -244,15 +244,13 @@ class XsdElement(XsdComponent, ValidationMixin, ParticleMixin, ElementPathMixin)
if not self.type.is_valid(attrib['default']):
msg = "'default' value {!r} is not compatible with the type {!r}"
self.parse_error(msg.format(attrib['default'], self.type))
elif self.xsd_version == '1.0' and (
self.type.name == XSD_ID or self.type.is_derived(self.schema.meta_schema.types['ID'])):
elif self.xsd_version == '1.0' and self.type.is_key():
self.parse_error("'xs:ID' or a type derived from 'xs:ID' cannot has a 'default'")
elif 'fixed' in attrib:
if not self.type.is_valid(attrib['fixed']):
msg = "'fixed' value {!r} is not compatible with the type {!r}"
self.parse_error(msg.format(attrib['fixed'], self.type))
elif self.xsd_version == '1.0' and (
self.type.name == XSD_ID or self.type.is_derived(self.schema.meta_schema.types['ID'])):
elif self.xsd_version == '1.0' and self.type.is_key():
self.parse_error("'xs:ID' or a type derived from 'xs:ID' cannot has a 'default'")
return 0
@ -458,14 +456,12 @@ class XsdElement(XsdComponent, ValidationMixin, ParticleMixin, ElementPathMixin)
text = self.fixed if self.fixed is not None else self.default
return self.type.text_decode(text)
def iter_decode(self, elem, validation='lax', converter=None, level=0, **kwargs):
def iter_decode(self, elem, validation='lax', **kwargs):
"""
Creates an iterator for decoding an Element instance.
:param elem: the Element that has to be decoded.
:param validation: the validation mode, can be 'lax', 'strict' or 'skip.
:param converter: an :class:`XMLSchemaConverter` subclass or instance to use for the decoding.
:param level: the depth of the element in the tree structure.
:param kwargs: keyword arguments for the decoding process.
:return: yields a decoded object, eventually preceded by a sequence of \
validation or decoding errors.
@ -473,8 +469,19 @@ class XsdElement(XsdComponent, ValidationMixin, ParticleMixin, ElementPathMixin)
if self.abstract:
yield self.validation_error(validation, "cannot use an abstract element for validation", elem, **kwargs)
if not isinstance(converter, XMLSchemaConverter):
converter = self.schema.get_converter(converter, level=level, **kwargs)
try:
level = kwargs['level']
except KeyError:
level = kwargs['level'] = 0
try:
converter = kwargs['converter']
except KeyError:
converter = kwargs['converter'] = self.get_converter(**kwargs)
else:
if not isinstance(converter, XMLSchemaConverter):
converter = kwargs['converter'] = self.get_converter(**kwargs)
inherited = kwargs.get('inherited')
value = content = attributes = None
@ -492,7 +499,7 @@ class XsdElement(XsdComponent, ValidationMixin, ParticleMixin, ElementPathMixin)
# Decode attributes
attribute_group = self.get_attributes(xsd_type)
for result in attribute_group.iter_decode(elem.attrib, validation, level=level, **kwargs):
for result in attribute_group.iter_decode(elem.attrib, validation, **kwargs):
if isinstance(result, XMLSchemaValidationError):
yield self.validation_error(validation, result, elem, **kwargs)
else:
@ -524,13 +531,16 @@ class XsdElement(XsdComponent, ValidationMixin, ParticleMixin, ElementPathMixin)
yield converter.element_decode(element_data, self, level)
return
if xsd_type.is_empty() and elem.text:
reason = "character data is not allowed because the type's content is empty"
yield self.validation_error(validation, reason, elem, **kwargs)
if not xsd_type.has_simple_content():
for assertion in xsd_type.assertions:
for error in assertion(elem, **kwargs):
yield self.validation_error(validation, error, **kwargs)
for result in xsd_type.content_type.iter_decode(
elem, validation, converter, level + 1, **kwargs):
for result in xsd_type.content_type.iter_decode(elem, validation, **kwargs):
if isinstance(result, XMLSchemaValidationError):
yield self.validation_error(validation, result, elem, **kwargs)
else:
@ -566,15 +576,12 @@ class XsdElement(XsdComponent, ValidationMixin, ParticleMixin, ElementPathMixin)
xsd_type = xsd_type.content_type
if text is None:
for result in xsd_type.iter_decode('', validation, _skip_id=True, **kwargs):
for result in xsd_type.iter_decode('', validation, **kwargs):
if isinstance(result, XMLSchemaValidationError):
yield self.validation_error(validation, result, elem, **kwargs)
if 'filler' in kwargs:
value = kwargs['filler'](self)
else:
if level == 0 or self.xsd_version != '1.0':
kwargs['_skip_id'] = True
for result in xsd_type.iter_decode(text, validation, **kwargs):
if isinstance(result, XMLSchemaValidationError):
yield self.validation_error(validation, result, elem, **kwargs)
@ -601,29 +608,40 @@ class XsdElement(XsdComponent, ValidationMixin, ParticleMixin, ElementPathMixin)
del content
if validation != 'skip':
for constraint in self.identities.values():
if isinstance(constraint, XsdKeyref) and '_no_deep' in kwargs: # TODO: Complete lazy validation
continue
for error in constraint(elem, converter):
yield self.validation_error(validation, error, elem, **kwargs)
if 'max_depth' in kwargs:
# Don't check key references with lazy or shallow validation
for constraint in filter(lambda x: not isinstance(x, XsdKeyref), self.identities.values()):
for error in constraint(elem, converter):
yield self.validation_error(validation, error, elem, **kwargs)
else:
for constraint in self.identities.values():
for error in constraint(elem, converter):
yield self.validation_error(validation, error, elem, **kwargs)
def iter_encode(self, obj, validation='lax', converter=None, level=0, **kwargs):
def iter_encode(self, obj, validation='lax', **kwargs):
"""
Creates an iterator for encoding data to an Element.
:param obj: the data that has to be encoded.
:param validation: the validation mode: can be 'lax', 'strict' or 'skip'.
:param converter: an :class:`XMLSchemaConverter` subclass or instance to use \
for the encoding.
:param level: the depth of the element data in the tree structure.
:param kwargs: keyword arguments for the encoding process.
:return: yields an Element, eventually preceded by a sequence of \
validation or encoding errors.
"""
if not isinstance(converter, XMLSchemaConverter):
converter = self.schema.get_converter(converter, level=level, **kwargs)
element_data = converter.element_encode(obj, self, level)
try:
converter = kwargs['converter']
except KeyError:
converter = kwargs['converter'] = self.get_converter(**kwargs)
else:
if not isinstance(converter, XMLSchemaConverter):
converter = kwargs['converter'] = self.get_converter(**kwargs)
try:
level = kwargs['level']
except KeyError:
level = 0
element_data = converter.element_encode(obj, self, level)
errors = []
tag = element_data.tag
text = None
@ -683,8 +701,7 @@ class XsdElement(XsdComponent, ValidationMixin, ParticleMixin, ElementPathMixin)
else:
text = result
else:
for result in xsd_type.content_type.iter_encode(
element_data, validation, converter, level + 1, **kwargs):
for result in xsd_type.content_type.iter_encode(element_data, validation, **kwargs):
if isinstance(result, XMLSchemaValidationError):
errors.append(result)
elif result:
@ -700,26 +717,38 @@ class XsdElement(XsdComponent, ValidationMixin, ParticleMixin, ElementPathMixin)
def is_matching(self, name, default_namespace=None, group=None):
if default_namespace and name[0] != '{':
name = '{%s}%s' % (default_namespace, name)
if name in self.names:
return True
for xsd_element in self.iter_substitutes():
if name in xsd_element.names:
qname = '{%s}%s' % (default_namespace, name)
if name in self.names or qname in self.names:
return True
for xsd_element in self.iter_substitutes():
if name in xsd_element.names or qname in xsd_element.names:
return True
elif name in self.names:
return True
else:
for xsd_element in self.iter_substitutes():
if name in xsd_element.names:
return True
return False
def match(self, name, default_namespace=None, **kwargs):
if default_namespace and name[0] != '{':
name = '{%s}%s' % (default_namespace, name)
qname = '{%s}%s' % (default_namespace, name)
if name in self.names or qname in self.names:
return self
if name in self.names:
for xsd_element in self.iter_substitutes():
if name in xsd_element.names or qname in xsd_element.names:
return xsd_element
elif name in self.names:
return self
for xsd_element in self.iter_substitutes():
if name in xsd_element.names:
return xsd_element
else:
for xsd_element in self.iter_substitutes():
if name in xsd_element.names:
return xsd_element
def is_restriction(self, other, check_occurs=True):
if isinstance(other, XsdAnyElement):
@ -890,9 +919,12 @@ class Xsd11Element(XsdElement):
@property
def target_namespace(self):
if self._target_namespace is None:
if self._target_namespace is not None:
return self._target_namespace
elif self.ref is not None:
return self.ref.target_namespace
else:
return self.schema.target_namespace
return self._target_namespace
def iter_components(self, xsd_classes=None):
if xsd_classes is None:
@ -933,6 +965,7 @@ class Xsd11Element(XsdElement):
if inherited:
dummy = etree_element('_dummy_element', attrib=inherited)
dummy.attrib.update(elem.attrib)
for alt in filter(lambda x: x.type is not None, self.alternatives):
if alt.token is None or alt.test(elem) or alt.test(dummy):

View File

@ -317,11 +317,11 @@ class XMLSchemaChildrenValidationError(XMLSchemaValidationError):
self.occurs = occurs
self.expected = expected
tag = qname_to_prefixed(elem.tag, validator.namespaces)
tag = qname_to_prefixed(elem.tag, validator.namespaces, use_empty=False)
if index >= len(elem):
reason = "The content of element %r is not complete." % tag
else:
child_tag = qname_to_prefixed(elem[index].tag, validator.namespaces)
child_tag = qname_to_prefixed(elem[index].tag, validator.namespaces, use_empty=False)
reason = "Unexpected child with tag %r at position %d." % (child_tag, index + 1)
if occurs and particle.is_missing(occurs):
@ -346,7 +346,7 @@ class XMLSchemaChildrenValidationError(XMLSchemaValidationError):
if not expected_tags:
pass # reason += " No child element is expected at this point." <-- this can be misleading
elif len(expected_tags) == 1:
reason += " Tag %s expected." % expected_tags[0]
reason += " Tag %r expected." % expected_tags[0]
else:
reason += " Tag (%s) expected." % ' | '.join(expected_tags)

View File

@ -15,9 +15,9 @@ from __future__ import unicode_literals
import warnings
from collections import Counter
from ..compat import string_base_type
from ..compat import string_base_type, lru_cache
from ..exceptions import XMLSchemaKeyError, XMLSchemaTypeError, XMLSchemaValueError, XMLSchemaWarning
from ..namespaces import XSD_NAMESPACE, NamespaceResourcesMap
from ..namespaces import XSD_NAMESPACE, LOCATION_HINTS, NamespaceResourcesMap
from ..qnames import XSD_REDEFINE, XSD_OVERRIDE, XSD_NOTATION, XSD_ANY_TYPE, \
XSD_SIMPLE_TYPE, XSD_COMPLEX_TYPE, XSD_GROUP, XSD_ATTRIBUTE, XSD_ATTRIBUTE_GROUP, \
XSD_ELEMENT, XSI_TYPE, get_qname, local_name, qname_to_extended
@ -203,6 +203,7 @@ class XsdGlobals(XsdValidator):
self.validator = validator
self.namespaces = NamespaceResourcesMap() # Registered schemas by namespace URI
self.missing_locations = [] # Missing or failing resource locations
self.types = {} # Global types (both complex and simple)
self.attributes = {} # Global attributes
@ -384,6 +385,62 @@ class XsdGlobals(XsdValidator):
elif not any(schema.url == obj.url and schema.__class__ == obj.__class__ for obj in ns_schemas):
ns_schemas.append(schema)
@lru_cache(maxsize=1000)
def load_namespace(self, namespace, build=True):
"""
Load namespace from available location hints. Returns `True` if the namespace
is already loaded or if the namespace can be loaded from one of the locations,
returns `False` otherwise. Failing locations are inserted into the missing
locations list.
:param namespace: the namespace to load.
:param build: if left with `True` value builds the maps after load. If the \
build fails the resource URL is added to missing locations.
"""
namespace = namespace.strip()
if namespace in self.namespaces:
return True
elif self.validator.meta_schema is None:
return False # Do not load additional namespaces for meta-schema (XHTML)
# Try from schemas location hints: usually the namespaces related to these
# hints are already loaded during schema construction, but it's better to
# retry once if the initial load has failed.
for schema in self.iter_schemas():
for url in schema.get_locations(namespace):
if url in self.missing_locations:
continue
try:
if schema.import_schema(namespace, url, schema.base_url) is not None:
if build:
self.build()
except (OSError, IOError):
pass
except XMLSchemaNotBuiltError:
self.clear(remove_schemas=True, only_unbuilt=True)
self.missing_locations.append(url)
else:
return True
# Try from library location hint, if there is any.
if namespace in LOCATION_HINTS:
url = LOCATION_HINTS[namespace]
if url not in self.missing_locations:
try:
if self.validator.import_schema(namespace, url) is not None:
if build:
self.build()
except (OSError, IOError):
return False
except XMLSchemaNotBuiltError:
self.clear(remove_schemas=True, only_unbuilt=True)
self.missing_locations.append(url)
else:
return True
return False
def clear(self, remove_schemas=False, only_unbuilt=False):
"""
Clears the instance maps and schemas.
@ -415,6 +472,7 @@ class XsdGlobals(XsdValidator):
self.namespaces = namespaces
else:
del self.missing_locations[:]
for global_map in self.global_maps:
global_map.clear()
self.substitution_groups.clear()

View File

@ -14,6 +14,7 @@ This module contains classes for XML Schema model groups.
from __future__ import unicode_literals
import warnings
from .. import limits
from ..compat import unicode_type
from ..exceptions import XMLSchemaValueError
from ..etree import etree_element
@ -485,7 +486,7 @@ class XsdGroup(XsdComponent, ModelGroup, ValidationMixin):
if 'substitution' in model_element.block \
or xsd_element.type.is_blocked(model_element):
raise XMLSchemaValidationError(
model_element, "substitution of %r is blocked" % model_element
model_element, elem, "substitution of %r is blocked" % model_element
)
alternatives = ()
@ -525,7 +526,8 @@ class XsdGroup(XsdComponent, ModelGroup, ValidationMixin):
if model_element is not xsd_element and model_element.block:
for derivation in model_element.block.split():
if xsd_type.is_derived(model_element.type, derivation):
if xsd_type is not model_element.type and \
xsd_type.is_derived(model_element.type, derivation):
reason = "usage of %r with type %s is blocked by head element"
raise XMLSchemaValidationError(self, reason % (xsd_element, derivation))
@ -555,15 +557,12 @@ class XsdGroup(XsdComponent, ModelGroup, ValidationMixin):
msg = "Maybe a not equivalent type table between elements %r and %r." % (self, xsd_element)
warnings.warn(msg, XMLSchemaTypeTableWarning, stacklevel=3)
def iter_decode(self, elem, validation='lax', converter=None, level=0, **kwargs):
def iter_decode(self, elem, validation='lax', **kwargs):
"""
Creates an iterator for decoding an Element content.
:param elem: the Element that has to be decoded.
:param validation: the validation mode, can be 'lax', 'strict' or 'skip.
:param converter: an :class:`XMLSchemaConverter` subclass or instance \
to use for the decoding.
:param level: the depth of the element in the tree structure.
:param kwargs: keyword arguments for the decoding process.
:return: yields a list of 3-tuples (key, decoded data, decoder), \
eventually preceded by a sequence of validation or decoding errors.
@ -580,7 +579,7 @@ class XsdGroup(XsdComponent, ModelGroup, ValidationMixin):
if len(self) == 1 and isinstance(self[0], XsdAnyElement):
pass # [XsdAnyElement()] equals to an empty complexType declaration
else:
reason = "character data between child elements not allowed!"
reason = "character data between child elements not allowed"
yield self.validation_error(validation, reason, elem, **kwargs)
cdata_index = 0 # Do not decode CDATA
@ -590,16 +589,21 @@ class XsdGroup(XsdComponent, ModelGroup, ValidationMixin):
result_list.append((cdata_index, text, None))
cdata_index += 1
model = ModelVisitor(self)
errors = []
level = kwargs['level'] = kwargs.pop('level', 0) + 1
if level > limits.MAX_XML_DEPTH:
reason = "XML data depth exceeded (MAX_XML_DEPTH=%r)" % limits.MAX_XML_DEPTH
self.validation_error('strict', reason, elem, **kwargs)
try:
default_namespace = converter.get('')
except (AttributeError, TypeError):
converter = self.schema.get_converter(converter, level=level, **kwargs)
default_namespace = converter.get('')
converter = kwargs['converter']
except KeyError:
converter = kwargs['converter'] = self.get_converter(**kwargs)
default_namespace = converter.get('')
model = ModelVisitor(self)
errors = []
model_broken = False
for index, child in enumerate(elem):
if callable(child.tag):
continue # child is a <class 'lxml.etree._Comment'>
@ -646,12 +650,13 @@ class XsdGroup(XsdComponent, ModelGroup, ValidationMixin):
xsd_element = None
model_broken = True
if xsd_element is None or kwargs.get('no_depth'):
# TODO: use a default decoder str-->str??
if 'max_depth' in kwargs and kwargs['max_depth'] <= level:
continue
elif xsd_element is None:
# TODO: apply a default decoder str-->str??
continue
for result in xsd_element.iter_decode(
child, validation, converter=converter, level=level, **kwargs):
for result in xsd_element.iter_decode(child, validation, **kwargs):
if isinstance(result, XMLSchemaValidationError):
yield result
else:
@ -678,16 +683,12 @@ class XsdGroup(XsdComponent, ModelGroup, ValidationMixin):
yield result_list
def iter_encode(self, element_data, validation='lax', converter=None, level=0, indent=4, **kwargs):
def iter_encode(self, element_data, validation='lax', **kwargs):
"""
Creates an iterator for encoding data to a list containing Element data.
:param element_data: an ElementData instance with unencoded data.
:param validation: the validation mode: can be 'lax', 'strict' or 'skip'.
:param converter: an :class:`XMLSchemaConverter` subclass or instance to use \
for the encoding.
:param level: the depth of the element data in the tree structure.
:param indent: number of spaces for XML indentation (default is 4).
:param kwargs: keyword arguments for the encoding process.
:return: yields a couple with the text of the Element and a list of 3-tuples \
(key, decoded data, decoder), eventually preceded by a sequence of validation \
@ -697,19 +698,26 @@ class XsdGroup(XsdComponent, ModelGroup, ValidationMixin):
yield element_data.content
return
level = kwargs['level'] = kwargs.get('level', 0) + 1
errors = []
text = None
children = []
try:
indent = kwargs['indent']
except KeyError:
indent = 4
padding = '\n' + ' ' * indent * level
try:
default_namespace = converter.get('')
except (AttributeError, TypeError):
converter = self.schema.get_converter(converter, level=level, **kwargs)
default_namespace = converter.get('')
converter = kwargs['converter']
except KeyError:
converter = kwargs['converter'] = self.get_converter(**kwargs)
default_namespace = converter.get('')
model = ModelVisitor(self)
cdata_index = 0
if isinstance(element_data.content, dict) or kwargs.get('unordered'):
content = model.iter_unordered_content(element_data.content)
elif not isinstance(element_data.content, list):
@ -766,8 +774,7 @@ class XsdGroup(XsdComponent, ModelGroup, ValidationMixin):
yield self.validation_error(validation, reason, value, **kwargs)
continue
for result in xsd_element.iter_encode(
value, validation, converter=converter, level=level, indent=indent, **kwargs):
for result in xsd_element.iter_encode(value, validation, **kwargs):
if isinstance(result, XMLSchemaValidationError):
yield result
else:
@ -844,7 +851,7 @@ class Xsd11Group(XsdGroup):
if ref != self.name:
self.append(Xsd11Group(child, self.schema, self))
if (self.model != 'all') ^ (self[-1].model != 'all'):
msg = "an xs:%s group cannot reference to an x:%s group"
msg = "an xs:%s group cannot include a reference to an x:%s group"
self.parse_error(msg % (self.model, self[-1].model))
self.pop()
@ -982,16 +989,18 @@ class Xsd11Group(XsdGroup):
for item in restriction_items:
if other_item is item or item.is_restriction(other_item, check_occurs):
if max_occurs is not None:
if item.effective_max_occurs is None:
effective_max_occurs = item.effective_max_occurs
if effective_max_occurs is None:
max_occurs = None
else:
max_occurs = counter_func(max_occurs, item.effective_max_occurs)
max_occurs = counter_func(max_occurs, effective_max_occurs)
if other_max_occurs is not None:
if other_item.effective_max_occurs is None:
effective_max_occurs = other_item.effective_max_occurs
if effective_max_occurs is None:
other_max_occurs = None
else:
other_max_occurs = max(other_max_occurs, other_item.effective_max_occurs)
other_max_occurs = max(other_max_occurs, effective_max_occurs)
break
else:
continue

View File

@ -201,7 +201,7 @@ class XsdIdentity(XsdComponent):
yield XMLSchemaValidationError(self, e, "{!r} is not an element".format(xsd_element))
xsd_fields = self.get_fields(xsd_element)
if all(fld is None for fld in xsd_fields):
if not xsd_fields or all(fld is None for fld in xsd_fields):
continue
try:

View File

@ -14,17 +14,13 @@ This module contains classes and functions for processing XSD content models.
from __future__ import unicode_literals
from collections import defaultdict, deque, Counter
from .. import limits
from ..compat import PY3, MutableSequence
from ..exceptions import XMLSchemaValueError
from .exceptions import XMLSchemaModelError, XMLSchemaModelDepthError
from .xsdbase import ParticleMixin
from .wildcards import XsdAnyElement, Xsd11AnyElement
MAX_MODEL_DEPTH = 15
"""Limit depth for safe visiting of models"""
XSD_GROUP_MODELS = {'sequence', 'choice', 'all'}
class ModelGroup(MutableSequence, ParticleMixin):
"""
@ -34,7 +30,6 @@ class ModelGroup(MutableSequence, ParticleMixin):
parent = None
def __init__(self, model):
assert model in XSD_GROUP_MODELS, "Not a valid value for 'model'"
self._group = []
self.model = model
@ -61,7 +56,7 @@ class ModelGroup(MutableSequence, ParticleMixin):
def __setattr__(self, name, value):
if name == 'model' and value is not None:
if value not in XSD_GROUP_MODELS:
if value not in {'sequence', 'choice', 'all'}:
raise XMLSchemaValueError("invalid model group %r." % value)
if self.model is not None and value != self.model and self.model != 'all':
raise XMLSchemaValueError("cannot change group model from %r to %r" % (self.model, value))
@ -165,11 +160,11 @@ class ModelGroup(MutableSequence, ParticleMixin):
"""
A generator function iterating elements and groups of a model group. Skips pointless groups,
iterating deeper through them. Raises `XMLSchemaModelDepthError` if the argument *depth* is
over `MAX_MODEL_DEPTH` value.
over `limits.MAX_MODEL_DEPTH` value.
:param depth: guard for protect model nesting bombs, incremented at each deepest recursion.
"""
if depth > MAX_MODEL_DEPTH:
if depth > limits.MAX_MODEL_DEPTH:
raise XMLSchemaModelDepthError(self)
for item in self:
if not isinstance(item, ModelGroup):
@ -183,11 +178,11 @@ class ModelGroup(MutableSequence, ParticleMixin):
def iter_elements(self, depth=0):
"""
A generator function iterating model's elements. Raises `XMLSchemaModelDepthError` if the
argument *depth* is over `MAX_MODEL_DEPTH` value.
argument *depth* is over `limits.MAX_MODEL_DEPTH` value.
:param depth: guard for protect model nesting bombs, incremented at each deepest recursion.
"""
if depth > MAX_MODEL_DEPTH:
if depth > limits.MAX_MODEL_DEPTH:
raise XMLSchemaModelDepthError(self)
for item in self:
if isinstance(item, ModelGroup):
@ -203,12 +198,12 @@ class ModelGroup(MutableSequence, ParticleMixin):
:raises: an `XMLSchemaModelError` at first violated constraint.
"""
def safe_iter_path(group, depth):
if depth > MAX_MODEL_DEPTH:
if not depth:
raise XMLSchemaModelDepthError(group)
for item in group:
if isinstance(item, ModelGroup):
current_path.append(item)
for _item in safe_iter_path(item, depth + 1):
for _item in safe_iter_path(item, depth - 1):
yield _item
current_path.pop()
else:
@ -221,7 +216,7 @@ class ModelGroup(MutableSequence, ParticleMixin):
except AttributeError:
any_element = None
for e in safe_iter_path(self, 0):
for e in safe_iter_path(self, limits.MAX_MODEL_DEPTH):
for pe, previous_path in paths.values():
# EDC check
if not e.is_consistent(pe) or any_element and not any_element.is_consistent(pe):
@ -343,7 +338,9 @@ class ModelVisitor(MutableSequence):
self.occurs = Counter()
self._subgroups = []
self.element = None
self.group, self.items, self.match = root, iter(root), False
self.group = root
self.items = self.iter_group()
self.match = False
self._start()
def __str__(self):
@ -379,17 +376,26 @@ class ModelVisitor(MutableSequence):
del self._subgroups[:]
self.occurs.clear()
self.element = None
self.group, self.items, self.match = self.root, iter(self.root), False
self.group = self.root
self.items = self.iter_group()
self.match = False
def _start(self):
while True:
item = next(self.items, None)
if item is None or not isinstance(item, ModelGroup):
if item is None:
if not self:
break
else:
self.group, self.items, self.match = self.pop()
elif not isinstance(item, ModelGroup):
self.element = item
break
elif item:
self.append((self.group, self.items, self.match))
self.group, self.items, self.match = item, iter(item), False
self.group = item
self.items = self.iter_group()
self.match = False
@property
def expected(self):
@ -421,6 +427,15 @@ class ModelVisitor(MutableSequence):
for e in self.advance():
yield e
def iter_group(self):
"""Returns an iterator for the current model group."""
if self.group.model != 'all':
return iter(self.group)
elif not self.occurs:
return self.group.iter_elements()
else:
return (e for e in self.group.iter_elements() if not e.is_over(self.occurs[e]))
def advance(self, match=False):
"""
Generator function for advance to the next element. Yields tuples with
@ -438,29 +453,55 @@ class ModelVisitor(MutableSequence):
if isinstance(item, ModelGroup):
self.group, self.items, self.match = self.pop()
item_occurs = occurs[item]
model = self.group.model
if item_occurs:
self.match = True
if model == 'choice':
occurs[item] = 0
occurs[self.group] += 1
self.items, self.match = iter(self.group), False
elif model == 'sequence' and item is self.group[-1]:
self.occurs[self.group] += 1
return item.is_missing(item_occurs)
elif model == 'sequence':
if self.match:
if item is self.group[-1]:
occurs[self.group] += 1
return not item.is_emptiable()
elif item.is_emptiable():
if self.group.model == 'choice':
item_occurs = occurs[item]
if not item_occurs:
return False
elif self.group.min_occurs <= occurs[self.group] or self:
return stop_item(self.group)
else:
return True
item_max_occurs = occurs[(item,)] or item_occurs
min_group_occurs = max(1, item_occurs // (item.max_occurs or item_occurs))
max_group_occurs = max(1, item_max_occurs // (item.min_occurs or 1))
occurs[self.group] += min_group_occurs
occurs[(self.group,)] += max_group_occurs
occurs[item] = 0
self.items = self.iter_group()
self.match = False
return item.is_missing(item_max_occurs)
elif self.group.model == 'all':
return False
elif self.match:
pass
elif occurs[item]:
self.match = True
elif item.is_emptiable():
return False
elif self.group.min_occurs <= max(occurs[self.group], occurs[(self.group,)]) or self:
return stop_item(self.group)
else:
return True
if item is self.group[-1]:
for k, item2 in enumerate(self.group, start=1):
item_occurs = occurs[item2]
if not item_occurs:
continue
item_max_occurs = occurs[(item2,)] or item_occurs
if item_max_occurs == 1 or any(not x.is_emptiable() for x in self.group[k:]):
self.occurs[self.group] += 1
break
min_group_occurs = max(1, item_occurs // (item2.max_occurs or item_occurs))
max_group_occurs = max(1, item_max_occurs // (item2.min_occurs or 1))
occurs[self.group] += min_group_occurs
occurs[(self.group,)] += max_group_occurs
break
return item.is_missing(max(occurs[item], occurs[(item,)]))
element, occurs = self.element, self.occurs
if element is None:
@ -469,7 +510,11 @@ class ModelVisitor(MutableSequence):
if match:
occurs[element] += 1
self.match = True
if not element.is_over(occurs[element]):
if self.group.model == 'all':
self.items = (e for e in self.group.iter_elements() if not e.is_over(occurs[e]))
elif not element.is_over(occurs[element]):
return
elif self.group.model == 'choice' and element.is_ambiguous():
return
obj = None
@ -478,47 +523,46 @@ class ModelVisitor(MutableSequence):
yield element, occurs[element], [element]
while True:
while self.group.is_over(occurs[self.group]):
while self.group.is_over(max(occurs[self.group], occurs[(self.group,)])):
stop_item(self.group)
obj = next(self.items, None)
if obj is None:
if not self.match:
if self.group.model == 'all':
for e in self.group:
occurs[e] = occurs[(e,)]
if all(e.min_occurs <= occurs[e] for e in self.group):
occurs[self.group] = 1
group, expected = self.group, self.expected
if stop_item(group) and expected:
yield group, occurs[group], expected
elif self.group.model != 'all':
self.items, self.match = iter(self.group), False
elif any(not e.is_over(occurs[e]) for e in self.group):
for e in self.group:
occurs[(e,)] += occurs[e]
self.items, self.match = (e for e in self.group if not e.is_over(occurs[e])), False
else:
for e in self.group:
occurs[(e,)] += occurs[e]
occurs[self.group] = 1
if isinstance(obj, ModelGroup):
# inner 'sequence' or 'choice' XsdGroup
self.append((self.group, self.items, self.match))
self.group = obj
self.items = self.iter_group()
self.match = False
occurs[obj] = occurs[(obj,)] = 0
elif not isinstance(obj, ModelGroup): # XsdElement or XsdAnyElement
self.element, occurs[obj] = obj, 0
elif obj is not None:
# XsdElement or XsdAnyElement
self.element = obj
if self.group.model == 'sequence':
occurs[obj] = 0
return
elif not self.match:
if self.group.model == 'all':
if all(e.min_occurs <= occurs[e] for e in self.group.iter_elements()):
occurs[self.group] = 1
group, expected = self.group, self.expected
if stop_item(group) and expected:
yield group, occurs[group], expected
elif self.group.model != 'all':
self.items, self.match = self.iter_group(), False
elif any(not e.is_over(occurs[e]) for e in self.group):
self.items = self.iter_group()
self.match = False
else:
self.append((self.group, self.items, self.match))
self.group, self.items, self.match = obj, iter(obj), False
occurs[obj] = 0
if obj.model == 'all':
for e in obj:
occurs[(e,)] = 0
occurs[self.group] = 1
except IndexError:
# Model visit ended
self.element = None
if self.group.is_missing(occurs[self.group]):
if self.group.is_missing(max(occurs[self.group], occurs[(self.group,)])):
if self.group.model == 'choice':
yield self.group, occurs[self.group], self.expected
elif self.group.model == 'sequence':
@ -648,83 +692,3 @@ class ModelVisitor(MutableSequence):
for name, values in unordered_content.items():
for v in values:
yield name, v
class Occurrence(object):
"""
Class for XSD particles occurrence counting and comparison.
"""
def __init__(self, occurs):
self.occurs = occurs
def add(self, occurs):
if self.occurs is None:
pass
elif occurs is None:
self.occurs = None
else:
self.occurs += occurs
def sub(self, occurs):
if self.occurs is None:
pass
elif occurs is None:
self.occurs = 0
else:
self.occurs -= occurs
def mul(self, occurs):
if occurs == 0:
self.occurs = 0
elif not self.occurs:
pass
elif occurs is None:
self.occurs = None
else:
self.occurs *= occurs
def max(self, occurs):
if self.occurs is None:
pass
elif occurs is None:
self.occurs = occurs
else:
self.occurs = max(self.occurs, occurs)
def __eq__(self, occurs):
return self.occurs == occurs
def __ne__(self, occurs):
return self.occurs != occurs
def __ge__(self, occurs):
if self.occurs is None:
return True
elif occurs is None:
return False
else:
return self.occurs >= occurs
def __gt__(self, occurs):
if self.occurs is None:
return True
elif occurs is None:
return False
else:
return self.occurs > occurs
def __le__(self, occurs):
if occurs is None:
return True
elif self.occurs is None:
return False
else:
return self.occurs <= occurs
def __lt__(self, occurs):
if occurs is None:
return True
elif self.occurs is None:
return False
else:
return self.occurs < occurs

View File

@ -32,8 +32,8 @@ from ..qnames import VC_MIN_VERSION, VC_MAX_VERSION, VC_TYPE_AVAILABLE, \
XSD_ALL, XSD_ANY, XSD_ANY_ATTRIBUTE, XSD_INCLUDE, XSD_IMPORT, XSD_REDEFINE, \
XSD_OVERRIDE, XSD_DEFAULT_OPEN_CONTENT
from ..helpers import get_xsd_derivation_attribute, get_xsd_form_attribute
from ..namespaces import XSD_NAMESPACE, XML_NAMESPACE, XSI_NAMESPACE, XHTML_NAMESPACE, \
XLINK_NAMESPACE, VC_NAMESPACE, NamespaceResourcesMap, NamespaceView
from ..namespaces import XSD_NAMESPACE, XML_NAMESPACE, XSI_NAMESPACE, VC_NAMESPACE, \
SCHEMAS_DIR, LOCATION_HINTS, NamespaceResourcesMap, NamespaceView, get_namespace
from ..etree import etree_element, etree_tostring, prune_etree, ParseError
from ..resources import is_remote_url, url_path_is_file, fetch_resource, XMLResource
from ..converters import XMLSchemaConverter
@ -75,14 +75,6 @@ ANY_ELEMENT = etree_element(
'maxOccurs': 'unbounded'
})
# XSD schemas of W3C standards
SCHEMAS_DIR = os.path.join(os.path.dirname(__file__), 'schemas/')
XML_SCHEMA_FILE = os.path.join(SCHEMAS_DIR, 'xml_minimal.xsd')
XSI_SCHEMA_FILE = os.path.join(SCHEMAS_DIR, 'XMLSchema-instance_minimal.xsd')
XLINK_SCHEMA_FILE = os.path.join(SCHEMAS_DIR, 'xlink.xsd')
XHTML_SCHEMA_FILE = os.path.join(SCHEMAS_DIR, 'xhtml1-strict.xsd')
VC_SCHEMA_FILE = os.path.join(SCHEMAS_DIR, 'XMLSchema-versioning_minimal.xsd')
class XMLSchemaMeta(ABCMeta):
@ -830,27 +822,6 @@ class XMLSchemaBase(XsdValidator, ValidationMixin, ElementPathMixin):
except KeyError:
return []
def get_converter(self, converter=None, namespaces=None, **kwargs):
"""
Returns a new converter instance.
:param converter: can be a converter class or instance. If it's an instance \
the new instance is copied from it and configured with the provided arguments.
:param namespaces: is an optional mapping from namespace prefix to URI.
:param kwargs: optional arguments for initialize the converter instance.
:return: a converter instance.
"""
if converter is None:
converter = getattr(self, 'converter', XMLSchemaConverter)
if isinstance(converter, XMLSchemaConverter):
return converter.copy(namespaces=namespaces, **kwargs)
elif issubclass(converter, XMLSchemaConverter):
return converter(namespaces, **kwargs)
else:
msg = "'converter' argument must be a %r subclass or instance: %r"
raise XMLSchemaTypeError(msg % (XMLSchemaConverter, converter))
def get_element(self, tag, path=None, namespaces=None):
if not path:
return self.find(tag, namespaces)
@ -1024,14 +995,15 @@ class XMLSchemaBase(XsdValidator, ValidationMixin, ElementPathMixin):
warnings.warn(self.warnings[-1], XMLSchemaImportWarning, stacklevel=3)
self.imports[namespace] = None
def import_schema(self, namespace, location, base_url=None, force=False):
def import_schema(self, namespace, location, base_url=None, force=False, build=False):
"""
Imports a schema for an external namespace, from a specific URL.
:param namespace: is the URI of the external namespace.
:param location: is the URL of the schema.
:param base_url: is an optional base URL for fetching the schema resource.
:param force: is set to `True` imports the schema also if the namespace is already imported.
:param force: if set to `True` imports the schema also if the namespace is already imported.
:param build: defines when to build the imported schema, the default is to not build.
:return: the imported :class:`XMLSchema` instance.
"""
if not force:
@ -1058,7 +1030,7 @@ class XMLSchemaBase(XsdValidator, ValidationMixin, ElementPathMixin):
base_url=self.base_url,
defuse=self.defuse,
timeout=self.timeout,
build=False,
build=build,
)
if schema.target_namespace != namespace:
raise XMLSchemaValueError('imported schema %r has an unmatched namespace %r' % (location, namespace))
@ -1229,17 +1201,30 @@ class XMLSchemaBase(XsdValidator, ValidationMixin, ElementPathMixin):
id_map = Counter()
inherited = {}
if source.is_lazy() and path is None:
# TODO: Document validation in lazy mode.
# Validation is done pushing a _no_deep argument for root node and with
# a path='*' for validating children. This is a feature under test.
xsd_element = self.get_element(source.root.tag, schema_path)
if xsd_element is None:
yield self.validation_error('lax', "%r is not an element of the schema" % source.root, source.root)
namespace = source.namespace or namespaces.get('', '')
try:
schema = self.maps.namespaces[namespace][0]
except (KeyError, IndexError):
reason = 'the namespace {!r} is not loaded'.format(namespace)
yield self.validation_error('lax', reason, source.root, source, namespaces)
return
for result in xsd_element.iter_decode(source.root, source=source, namespaces=namespaces,
use_defaults=use_defaults, id_map=id_map, no_depth=True,
inherited=inherited, drop_results=True):
kwargs = {
'source': source,
'namespaces': namespaces,
'use_defaults': use_defaults,
'id_map': id_map,
'inherited': inherited
}
if source.is_lazy() and path is None:
xsd_element = schema.get_element(source.root.tag, schema_path, namespaces)
if xsd_element is None:
reason = "{!r} is not an element of the schema".format(source.root)
yield schema.validation_error('lax', reason, source.root, source, namespaces)
return
for result in xsd_element.iter_decode(source.root, max_depth=1, **kwargs):
if isinstance(result, XMLSchemaValidationError):
yield result
else:
@ -1250,13 +1235,13 @@ class XMLSchemaBase(XsdValidator, ValidationMixin, ElementPathMixin):
schema_path = '/%s/*' % source.root.tag
for elem in source.iterfind(path, namespaces):
xsd_element = self.get_element(elem.tag, schema_path, self.namespaces)
xsd_element = schema.get_element(elem.tag, schema_path, namespaces)
if xsd_element is None:
yield self.validation_error('lax', "%r is not an element of the schema" % elem, elem)
reason = "{!r} is not an element of the schema".format(elem)
yield schema.validation_error('lax', reason, elem, source, namespaces)
return
for result in xsd_element.iter_decode(elem, source=source, namespaces=namespaces,
use_defaults=use_defaults, id_map=id_map,
inherited=inherited, drop_results=True):
for result in xsd_element.iter_decode(elem, **kwargs):
if isinstance(result, XMLSchemaValidationError):
yield result
else:
@ -1271,7 +1256,7 @@ class XMLSchemaBase(XsdValidator, ValidationMixin, ElementPathMixin):
def iter_decode(self, source, path=None, schema_path=None, validation='lax', process_namespaces=True,
namespaces=None, use_defaults=True, decimal_type=None, datetime_types=False,
converter=None, filler=None, fill_missing=False, **kwargs):
converter=None, filler=None, fill_missing=False, max_depth=None, **kwargs):
"""
Creates an iterator for decoding an XML source to a data structure.
@ -1299,6 +1284,7 @@ class XMLSchemaBase(XsdValidator, ValidationMixin, ElementPathMixin):
an attribute declaration. If not provided undecodable data is replaced by `None`.
:param fill_missing: if set to `True` the decoder fills also missing attributes. \
The filling value is `None` or a typed value if the *filler* callback is provided.
:param max_depth: maximum level of decoding, for default there is no limit.
:param kwargs: keyword arguments with other options for converter and decoder.
:return: yields a decoded data object, eventually preceded by a sequence of validation \
or decoding errors.
@ -1330,16 +1316,29 @@ class XMLSchemaBase(XsdValidator, ValidationMixin, ElementPathMixin):
kwargs['decimal_type'] = decimal_type
if filler is not None:
kwargs['filler'] = filler
if max_depth is not None:
kwargs['max_depth'] = max_depth
namespace = source.namespace or namespaces.get('', '')
try:
schema = self.maps.namespaces[namespace][0]
except (KeyError, IndexError):
reason = 'the namespace {!r} is not loaded'.format(namespace)
yield self.validation_error('lax', reason, source.root, source, namespaces)
return
for elem in source.iterfind(path, namespaces):
xsd_element = self.get_element(elem.tag, schema_path, namespaces)
xsd_element = schema.get_element(elem.tag, schema_path, namespaces)
if xsd_element is None:
yield self.validation_error(validation, "%r is not an element of the schema" % elem, elem)
reason = "{!r} is not an element of the schema".format(elem)
yield schema.validation_error('lax', reason, elem, source, namespaces)
return
for obj in xsd_element.iter_decode(
elem, validation, converter=converter, source=source, namespaces=namespaces,
use_defaults=use_defaults, datetime_types=datetime_types,
fill_missing=fill_missing, id_map=id_map, inherited=inherited, **kwargs):
elem, validation, converter=converter, source=source,
namespaces=namespaces, use_defaults=use_defaults,
datetime_types=datetime_types, fill_missing=fill_missing,
id_map=id_map, inherited=inherited, **kwargs):
yield obj
for k, v in id_map.items():
@ -1401,7 +1400,16 @@ class XMLSchemaBase(XsdValidator, ValidationMixin, ElementPathMixin):
namespaces = {} if namespaces is None else namespaces.copy()
converter = self.get_converter(converter, namespaces, **kwargs)
if path is not None:
namespace = get_namespace(path) or namespaces.get('', '')
if namespace:
try:
schema = self.maps.namespaces[namespace][0]
except (KeyError, IndexError):
reason = 'the namespace {!r} is not loaded'.format(namespace)
raise XMLSchemaEncodeError(self, obj, self, reason, namespaces=namespaces)
else:
xsd_element = schema.find(path, namespaces=namespaces)
elif path is not None:
xsd_element = self.find(path, namespaces=namespaces)
elif isinstance(obj, dict) and len(obj) == 1:
xsd_element = self.elements.get(list(obj.keys())[0])
@ -1413,10 +1421,10 @@ class XMLSchemaBase(XsdValidator, ValidationMixin, ElementPathMixin):
if not isinstance(xsd_element, XsdElement):
if path is not None:
msg = "the path %r doesn't match any element of the schema!" % path
reason = "the path %r doesn't match any element of the schema!" % path
else:
msg = "unable to select an element for decoding data, provide a valid 'path' argument."
yield XMLSchemaEncodeError(self, obj, self.elements, reason=msg)
reason = "unable to select an element for decoding data, provide a valid 'path' argument."
raise XMLSchemaEncodeError(self, obj, self.elements, reason, namespaces=namespaces)
else:
for result in xsd_element.iter_encode(obj, validation, converter=converter,
unordered=unordered, **kwargs):
@ -1487,13 +1495,10 @@ class XMLSchema10(XMLSchemaBase):
}
meta_schema = os.path.join(SCHEMAS_DIR, 'XSD_1.0/XMLSchema.xsd')
BASE_SCHEMAS = {
XML_NAMESPACE: XML_SCHEMA_FILE,
XSI_NAMESPACE: XSI_SCHEMA_FILE,
}
FALLBACK_LOCATIONS = {
XLINK_NAMESPACE: XLINK_SCHEMA_FILE,
XHTML_NAMESPACE: XHTML_SCHEMA_FILE,
XML_NAMESPACE: os.path.join(SCHEMAS_DIR, 'xml_minimal.xsd'),
XSI_NAMESPACE: os.path.join(SCHEMAS_DIR, 'XMLSchema-instance_minimal.xsd'),
}
FALLBACK_LOCATIONS = LOCATION_HINTS
# ++++ UNDER DEVELOPMENT, DO NOT USE!!! ++++
@ -1550,15 +1555,12 @@ class XMLSchema11(XMLSchemaBase):
}
meta_schema = os.path.join(SCHEMAS_DIR, 'XSD_1.1/XMLSchema.xsd')
BASE_SCHEMAS = {
XML_NAMESPACE: os.path.join(SCHEMAS_DIR, 'xml_minimal.xsd'),
XSI_NAMESPACE: os.path.join(SCHEMAS_DIR, 'XMLSchema-instance_minimal.xsd'),
XSD_NAMESPACE: os.path.join(SCHEMAS_DIR, 'XSD_1.1/xsd11-extra.xsd'),
XML_NAMESPACE: XML_SCHEMA_FILE,
XSI_NAMESPACE: XSI_SCHEMA_FILE,
VC_NAMESPACE: VC_SCHEMA_FILE,
}
FALLBACK_LOCATIONS = {
XLINK_NAMESPACE: XLINK_SCHEMA_FILE,
XHTML_NAMESPACE: XHTML_SCHEMA_FILE,
VC_NAMESPACE: os.path.join(SCHEMAS_DIR, 'XMLSchema-versioning_minimal.xsd'),
}
FALLBACK_LOCATIONS = LOCATION_HINTS
def _parse_inclusions(self):
super(XMLSchema11, self)._parse_inclusions()

View File

@ -1,32 +0,0 @@
<?xml version="1.0" encoding="utf-8" ?>
<schema xmlns="http://www.w3.org/2001/XMLSchema">
<annotation>
<documentation>
A schema with puppet types for creating substitute elements.
</documentation>
</annotation>
<simpleType name="simple_puppet">
<union>
<simpleType>
<list itemType="float"/>
</simpleType>
<simpleType>
<list itemType="integer"/>
</simpleType>
<simpleType>
<restriction base="float"/>
</simpleType>
<simpleType>
<restriction base="int"/>
</simpleType>
<simpleType>
<restriction base="string"/>
</simpleType>
</union>
</simpleType>
<complexType name="complex_puppet">
<
</complexType>
</schema>

View File

@ -334,6 +334,10 @@ class XsdSimpleType(XsdType, ValidationMixin):
else:
return self.base_type.is_derived(other, derivation)
def is_dynamic_consistent(self, other):
return other is self.any_type or other is self.any_simple_type or self.is_derived(other) or \
hasattr(other, 'member_types') and any(self.is_derived(mt) for mt in other.member_types)
def normalize(self, text):
"""
Normalize and restrict value-space with pre-lexical and lexical facets.
@ -513,28 +517,25 @@ class XsdAtomicBuiltin(XsdAtomic):
yield self.decode_error(validation, obj, self.to_python,
reason="value is not an instance of {!r}".format(self.instance_types))
if self.name == XSD_ID:
try:
id_map = kwargs['id_map']
except KeyError:
pass
else:
try:
id_map[obj] += 1
except TypeError:
id_map[obj] = 1
if id_map[obj] > 1 and '_skip_id' not in kwargs:
yield self.validation_error(validation, "Duplicated xsd:ID value {!r}".format(obj))
elif self.name == XSD_IDREF:
if self.name == XSD_IDREF:
try:
id_map = kwargs['id_map']
except KeyError:
pass
else:
if obj not in id_map:
id_map[obj] = kwargs.get('node', 0)
id_map[obj] = 0
elif self.name == XSD_ID and kwargs.get('level') != 0:
try:
id_map = kwargs['id_map']
except KeyError:
pass
else:
if not id_map[obj]:
id_map[obj] = 1
else:
yield self.validation_error(validation, "Duplicated xsd:ID value {!r}".format(obj))
if validation == 'skip':
try:
@ -870,7 +871,8 @@ class XsdUnion(XsdSimpleType):
return all(mt.is_list() for mt in self.member_types)
def is_dynamic_consistent(self, other):
return other.is_derived(self) or hasattr(other, 'member_types') and \
return other is self.any_type or other is self.any_simple_type or \
other.is_derived(self) or hasattr(other, 'member_types') and \
any(mt1.is_derived(mt2) for mt1 in other.member_types for mt2 in self.member_types)
def iter_components(self, xsd_classes=None):

View File

@ -13,13 +13,12 @@ This module contains classes for XML Schema wildcards.
"""
from __future__ import unicode_literals
from ..compat import unicode_type
from ..exceptions import XMLSchemaValueError
from ..namespaces import XSI_NAMESPACE
from ..qnames import XSD_ANY, XSD_ANY_ATTRIBUTE, XSD_OPEN_CONTENT, \
XSD_DEFAULT_OPEN_CONTENT, get_namespace
from ..xpath import XMLSchemaProxy, ElementPathMixin
from .exceptions import XMLSchemaNotBuiltError
from .xsdbase import ValidationMixin, XsdComponent, ParticleMixin
@ -129,25 +128,6 @@ class XsdWildcard(XsdComponent, ValidationMixin):
self.not_qname = names
def _load_namespace(self, namespace):
if namespace in self.schema.maps.namespaces:
return
for url in self.schema.get_locations(namespace):
try:
schema = self.schema.import_schema(namespace, url, base_url=self.schema.base_url)
if schema is not None:
try:
schema.maps.build()
except XMLSchemaNotBuiltError:
# Namespace build fails: remove unbuilt schemas and the url hint
schema.maps.clear(remove_schemas=True, only_unbuilt=True)
self.schema.locations[namespace].remove(url)
else:
break
except (OSError, IOError):
pass
@property
def built(self):
return True
@ -160,7 +140,8 @@ class XsdWildcard(XsdComponent, ValidationMixin):
elif default_namespace is None:
return self.is_namespace_allowed('')
else:
return self.is_namespace_allowed(default_namespace)
return self.is_namespace_allowed('') or \
self.is_namespace_allowed(default_namespace)
def is_namespace_allowed(self, namespace):
if self.not_namespace:
@ -444,48 +425,65 @@ class XsdAnyElement(XsdWildcard, ParticleMixin, ElementPathMixin):
return iter(())
def iter_decode(self, elem, validation='lax', **kwargs):
if self.is_matching(elem.tag):
if self.process_contents == 'skip':
return
if not self.is_matching(elem.tag):
if validation != 'skip':
reason = "element %r not allowed here." % elem.tag
yield self.validation_error(validation, reason, elem, **kwargs)
self._load_namespace(get_namespace(elem.tag))
elif self.process_contents == 'skip':
return
elif self.maps.load_namespace(get_namespace(elem.tag)):
try:
xsd_element = self.maps.lookup_element(elem.tag)
except LookupError:
if kwargs.get('drop_results'):
# Validation-only mode: use anyType for decode a complex element.
if validation == 'skip':
yield self.any_type.decode(elem) if len(elem) > 0 else elem.text
elif self.process_contents == 'strict' and validation != 'skip':
elif self.process_contents == 'strict':
reason = "element %r not found." % elem.tag
yield self.validation_error(validation, reason, elem, **kwargs)
else:
for result in xsd_element.iter_decode(elem, validation, **kwargs):
yield result
elif validation != 'skip':
reason = "element %r not allowed here." % elem.tag
elif validation == 'skip':
yield self.any_type.decode(elem) if len(elem) > 0 else elem.text
elif self.process_contents == 'strict':
reason = "unavailable namespace {!r}".format(get_namespace(elem.tag))
yield self.validation_error(validation, reason, elem, **kwargs)
def iter_encode(self, obj, validation='lax', **kwargs):
if self.process_contents == 'skip':
return
name, value = obj
namespace = get_namespace(name)
if self.is_namespace_allowed(namespace):
self._load_namespace(namespace)
if not self.is_namespace_allowed(namespace):
if validation != 'skip':
reason = "element %r not allowed here." % name
yield self.validation_error(validation, reason, value, **kwargs)
elif self.process_contents == 'skip':
return
elif self.maps.load_namespace(namespace):
try:
xsd_element = self.maps.lookup_element(name)
except LookupError:
if self.process_contents == 'strict' and validation != 'skip':
if validation == 'skip':
yield self.any_type.encode(value)
elif self.process_contents == 'strict':
reason = "element %r not found." % name
yield self.validation_error(validation, reason, **kwargs)
else:
for result in xsd_element.iter_encode(value, validation, **kwargs):
yield result
elif validation != 'skip':
reason = "element %r not allowed here." % name
yield self.validation_error(validation, reason, value, **kwargs)
elif validation == 'skip':
yield self.any_type.encode(value)
elif self.process_contents == 'strict':
reason = "unavailable namespace {!r}".format(namespace)
yield self.validation_error(validation, reason, **kwargs)
def is_overlap(self, other):
if not isinstance(other, XsdAnyElement):
@ -562,47 +560,66 @@ class XsdAnyAttribute(XsdWildcard):
def iter_decode(self, attribute, validation='lax', **kwargs):
name, value = attribute
if self.is_matching(name):
if self.process_contents == 'skip':
return
self._load_namespace(get_namespace(name))
if not self.is_matching(name):
if validation != 'skip':
reason = "attribute %r not allowed." % name
yield self.validation_error(validation, reason, attribute, **kwargs)
elif self.process_contents == 'skip':
return
elif self.maps.load_namespace(get_namespace(name)):
try:
xsd_attribute = self.maps.lookup_attribute(name)
except LookupError:
if kwargs.get('drop_results'):
# Validation-only mode: returns the value if a decoder is not found.
if validation == 'skip':
yield value
elif self.process_contents == 'strict' and validation != 'skip':
elif self.process_contents == 'strict':
reason = "attribute %r not found." % name
yield self.validation_error(validation, reason, attribute, **kwargs)
else:
for result in xsd_attribute.iter_decode(value, validation, **kwargs):
yield result
elif validation != 'skip':
reason = "attribute %r not allowed." % name
yield self.validation_error(validation, reason, attribute, **kwargs)
elif validation == 'skip':
yield value
elif self.process_contents == 'strict':
reason = "unavailable namespace {!r}".format(get_namespace(name))
yield self.validation_error(validation, reason, **kwargs)
def iter_encode(self, attribute, validation='lax', **kwargs):
if self.process_contents == 'skip':
return
name, value = attribute
namespace = get_namespace(name)
if self.is_namespace_allowed(namespace):
self._load_namespace(namespace)
if not self.is_namespace_allowed(namespace):
if validation != 'skip':
reason = "attribute %r not allowed." % name
yield self.validation_error(validation, reason, attribute, **kwargs)
elif self.process_contents == 'skip':
return
elif self.maps.load_namespace(namespace):
try:
xsd_attribute = self.maps.lookup_attribute(name)
except LookupError:
if self.process_contents == 'strict' and validation != 'skip':
if validation == 'skip':
yield unicode_type(value)
elif self.process_contents == 'strict':
reason = "attribute %r not found." % name
yield self.validation_error(validation, reason, attribute, **kwargs)
else:
for result in xsd_attribute.iter_encode(value, validation, **kwargs):
yield result
elif validation != 'skip':
reason = "attribute %r not allowed." % name
yield self.validation_error(validation, reason, attribute, **kwargs)
elif validation == 'skip':
yield unicode_type(value)
elif self.process_contents == 'strict':
reason = "unavailable namespace {!r}".format(get_namespace(name))
yield self.validation_error(validation, reason, **kwargs)
class Xsd11AnyElement(XsdAnyElement):
@ -640,12 +657,15 @@ class Xsd11AnyElement(XsdAnyElement):
if name is None:
return False
elif not name or name[0] == '{':
namespace = get_namespace(name)
elif default_namespace is None:
namespace = ''
if not self.is_namespace_allowed(get_namespace(name)):
return False
elif default_namespace is not None:
if not self.is_namespace_allowed(''):
return False
else:
name = '{%s}%s' % (default_namespace, name)
namespace = default_namespace
if not self.is_namespace_allowed('') and not self.is_namespace_allowed(default_namespace):
return False
if group in self.precedences:
if occurs is None:
@ -660,7 +680,8 @@ class Xsd11AnyElement(XsdAnyElement):
if any(e.is_matching(name) for e in group.iter_elements()
if not isinstance(e, XsdAnyElement)):
return False
return name not in self.not_qname and self.is_namespace_allowed(namespace)
return name not in self.not_qname
def is_consistent(self, other):
if isinstance(other, XsdAnyElement) or self.process_contents == 'skip':
@ -761,8 +782,8 @@ class XsdOpenContent(XsdComponent):
return True
def is_restriction(self, other):
if self.mode == 'none' or other is None or other.mode == 'none':
return True
if other is None or other.mode == 'none':
return self.mode == 'none'
elif self.mode == 'interleave' and other.mode == 'suffix':
return False
else:

View File

@ -21,6 +21,7 @@ from ..qnames import XSD_ANNOTATION, XSD_APPINFO, XSD_DOCUMENTATION, XML_LANG, \
get_qname, local_name, qname_to_prefixed
from ..etree import etree_tostring
from ..helpers import is_etree_element
from ..converters import XMLSchemaConverter
from .exceptions import XMLSchemaParseError, XMLSchemaValidationError, \
XMLSchemaDecodeError, XMLSchemaEncodeError
@ -195,6 +196,27 @@ class XsdValidator(object):
self.parse_error(msg % (value, ' | '.join(admitted_values)), elem)
return ''
def get_converter(self, converter=None, namespaces=None, **kwargs):
"""
Returns a new converter instance.
:param converter: can be a converter class or instance. If it's an instance \
the new instance is copied from it and configured with the provided arguments.
:param namespaces: is an optional mapping from namespace prefix to URI.
:param kwargs: optional arguments for initialize the converter instance.
:return: a converter instance.
"""
if converter is None:
converter = getattr(self, 'converter', XMLSchemaConverter)
if isinstance(converter, XMLSchemaConverter):
return converter.copy(namespaces=namespaces, **kwargs)
elif issubclass(converter, XMLSchemaConverter):
return converter(namespaces, **kwargs)
else:
msg = "'converter' argument must be a %r subclass or instance: %r"
raise XMLSchemaTypeError(msg % (XMLSchemaConverter, converter))
class XsdComponent(XsdValidator):
"""
@ -277,7 +299,7 @@ class XsdComponent(XsdValidator):
@property
def target_namespace(self):
"""Property that references to schema's targetNamespace."""
return self.schema.target_namespace
return self.schema.target_namespace if self.ref is None else self.ref.target_namespace
@property
def default_namespace(self):
@ -679,8 +701,8 @@ class XsdType(XsdComponent):
return any(self.is_derived(xsd_type, derivation) for derivation in block)
def is_dynamic_consistent(self, other):
return self.is_derived(other) or hasattr(other, 'member_types') and \
any(self.is_derived(mt) for mt in other.member_types)
return other is self.any_type or self.is_derived(other) or \
hasattr(other, 'member_types') and any(self.is_derived(mt) for mt in other.member_types)
def is_key(self):
return self.name == XSD_ID or self.is_derived(self.maps.types[XSD_ID])