Update XSD validation with wildcards

- Add load_namespace() to XsdGlobals
  . Modify iter_decode/iter_encode for xs:any and xs:anyAttribute
This commit is contained in:
Davide Brunato 2019-10-16 15:16:19 +02:00
parent 8407f09139
commit 248b9f9b68
2 changed files with 132 additions and 59 deletions

View File

@ -17,7 +17,7 @@ from collections import Counter
from ..compat import string_base_type
from ..exceptions import XMLSchemaKeyError, XMLSchemaTypeError, XMLSchemaValueError, XMLSchemaWarning
from ..namespaces import XSD_NAMESPACE, NamespaceResourcesMap
from ..namespaces import XSD_NAMESPACE, LOCATION_HINTS, NamespaceResourcesMap
from ..qnames import XSD_REDEFINE, XSD_OVERRIDE, XSD_NOTATION, XSD_ANY_TYPE, \
XSD_SIMPLE_TYPE, XSD_COMPLEX_TYPE, XSD_GROUP, XSD_ATTRIBUTE, XSD_ATTRIBUTE_GROUP, \
XSD_ELEMENT, XSI_TYPE, get_qname, local_name, qname_to_extended
@ -203,6 +203,7 @@ class XsdGlobals(XsdValidator):
self.validator = validator
self.namespaces = NamespaceResourcesMap() # Registered schemas by namespace URI
self.missing_locations = [] # Missing or failing resource locations
self.types = {} # Global types (both complex and simple)
self.attributes = {} # Global attributes
@ -384,6 +385,61 @@ class XsdGlobals(XsdValidator):
elif not any(schema.url == obj.url and schema.__class__ == obj.__class__ for obj in ns_schemas):
ns_schemas.append(schema)
def load_namespace(self, namespace, build=True):
"""
Load namespace from available location hints. Returns `True` if the namespace
is already loaded or if the namespace can be loaded from one of the locations,
returns `False` otherwise. Failing locations are inserted into the missing
locations list.
:param namespace: the namespace to load.
:param build: if left with `True` value builds the maps after load. If the \
build fails the resource URL is added to missing locations.
"""
namespace = namespace.strip()
if namespace in self.namespaces:
return True
elif self.validator.meta_schema is None:
return False # Do not load additional namespaces for meta-schema (XHTML)
# Try from schemas location hints: usually the namespaces related to these
# hints are already loaded during schema construction, but it's better to
# retry once if the initial load has failed.
for schema in self.iter_schemas():
for url in schema.get_locations(namespace):
if url in self.missing_locations:
continue
try:
if schema.import_schema(namespace, url, schema.base_url) is not None:
if build:
self.build()
except (OSError, IOError):
pass
except XMLSchemaNotBuiltError:
self.clear(remove_schemas=True, only_unbuilt=True)
self.missing_locations.append(url)
else:
return True
# Try from library location hint, if there is any.
if namespace in LOCATION_HINTS:
url = LOCATION_HINTS[namespace]
if url not in self.missing_locations:
try:
if self.validator.import_schema(namespace, url) is not None:
if build:
self.build()
except (OSError, IOError):
return False
except XMLSchemaNotBuiltError:
self.clear(remove_schemas=True, only_unbuilt=True)
self.missing_locations.append(url)
else:
return True
return False
def clear(self, remove_schemas=False, only_unbuilt=False):
"""
Clears the instance maps and schemas.
@ -415,6 +471,7 @@ class XsdGlobals(XsdValidator):
self.namespaces = namespaces
else:
self.missing_locations.clear()
for global_map in self.global_maps:
global_map.clear()
self.substitution_groups.clear()

View File

@ -13,13 +13,12 @@ This module contains classes for XML Schema wildcards.
"""
from __future__ import unicode_literals
from ..compat import unicode_type
from ..exceptions import XMLSchemaValueError
from ..namespaces import XSI_NAMESPACE
from ..qnames import XSD_ANY, XSD_ANY_ATTRIBUTE, XSD_OPEN_CONTENT, \
XSD_DEFAULT_OPEN_CONTENT, get_namespace
from ..xpath import XMLSchemaProxy, ElementPathMixin
from .exceptions import XMLSchemaNotBuiltError
from .xsdbase import ValidationMixin, XsdComponent, ParticleMixin
@ -129,25 +128,6 @@ class XsdWildcard(XsdComponent, ValidationMixin):
self.not_qname = names
def _load_namespace(self, namespace):
if namespace in self.schema.maps.namespaces:
return
for url in self.schema.get_locations(namespace):
try:
schema = self.schema.import_schema(namespace, url, base_url=self.schema.base_url)
if schema is not None:
try:
schema.maps.build()
except XMLSchemaNotBuiltError:
# Namespace build fails: remove unbuilt schemas and the url hint
schema.maps.clear(remove_schemas=True, only_unbuilt=True)
self.schema.locations[namespace].remove(url)
else:
break
except (OSError, IOError):
pass
@property
def built(self):
return True
@ -444,48 +424,65 @@ class XsdAnyElement(XsdWildcard, ParticleMixin, ElementPathMixin):
return iter(())
def iter_decode(self, elem, validation='lax', **kwargs):
if self.is_matching(elem.tag):
if self.process_contents == 'skip':
return
if not self.is_matching(elem.tag):
if validation != 'skip':
reason = "element %r not allowed here." % elem.tag
yield self.validation_error(validation, reason, elem, **kwargs)
self._load_namespace(get_namespace(elem.tag))
elif self.process_contents == 'skip':
return
elif self.maps.load_namespace(get_namespace(elem.tag)):
try:
xsd_element = self.maps.lookup_element(elem.tag)
except LookupError:
if kwargs.get('drop_results'):
# Validation-only mode: use anyType for decode a complex element.
if validation == 'skip':
yield self.any_type.decode(elem) if len(elem) > 0 else elem.text
elif self.process_contents == 'strict' and validation != 'skip':
elif self.process_contents == 'strict':
reason = "element %r not found." % elem.tag
yield self.validation_error(validation, reason, elem, **kwargs)
else:
for result in xsd_element.iter_decode(elem, validation, **kwargs):
yield result
elif validation != 'skip':
reason = "element %r not allowed here." % elem.tag
elif validation == 'skip':
yield self.any_type.decode(elem) if len(elem) > 0 else elem.text
elif self.process_contents == 'strict':
reason = "unavailable namespace {!r}".format(get_namespace(elem.tag))
yield self.validation_error(validation, reason, elem, **kwargs)
def iter_encode(self, obj, validation='lax', **kwargs):
if self.process_contents == 'skip':
return
name, value = obj
namespace = get_namespace(name)
if self.is_namespace_allowed(namespace):
self._load_namespace(namespace)
if not self.is_namespace_allowed(namespace):
if validation != 'skip':
reason = "element %r not allowed here." % name
yield self.validation_error(validation, reason, value, **kwargs)
elif self.process_contents == 'skip':
return
elif self.maps.load_namespace(namespace):
try:
xsd_element = self.maps.lookup_element(name)
except LookupError:
if self.process_contents == 'strict' and validation != 'skip':
if validation == 'skip':
yield self.any_type.encode(value)
elif self.process_contents == 'strict':
reason = "element %r not found." % name
yield self.validation_error(validation, reason, **kwargs)
else:
for result in xsd_element.iter_encode(value, validation, **kwargs):
yield result
elif validation != 'skip':
reason = "element %r not allowed here." % name
yield self.validation_error(validation, reason, value, **kwargs)
elif validation == 'skip':
yield self.any_type.encode(value)
elif self.process_contents == 'strict':
reason = "unavailable namespace {!r}".format(namespace)
yield self.validation_error(validation, reason, **kwargs)
def is_overlap(self, other):
if not isinstance(other, XsdAnyElement):
@ -562,47 +559,66 @@ class XsdAnyAttribute(XsdWildcard):
def iter_decode(self, attribute, validation='lax', **kwargs):
name, value = attribute
if self.is_matching(name):
if self.process_contents == 'skip':
return
self._load_namespace(get_namespace(name))
if not self.is_matching(name):
if validation != 'skip':
reason = "attribute %r not allowed." % name
yield self.validation_error(validation, reason, attribute, **kwargs)
elif self.process_contents == 'skip':
return
elif self.maps.load_namespace(get_namespace(name)):
try:
xsd_attribute = self.maps.lookup_attribute(name)
except LookupError:
if kwargs.get('drop_results'):
# Validation-only mode: returns the value if a decoder is not found.
if validation == 'skip':
yield value
elif self.process_contents == 'strict' and validation != 'skip':
elif self.process_contents == 'strict':
reason = "attribute %r not found." % name
yield self.validation_error(validation, reason, attribute, **kwargs)
else:
for result in xsd_attribute.iter_decode(value, validation, **kwargs):
yield result
elif validation != 'skip':
reason = "attribute %r not allowed." % name
yield self.validation_error(validation, reason, attribute, **kwargs)
elif validation == 'skip':
yield value
elif self.process_contents == 'strict':
reason = "unavailable namespace {!r}".format(get_namespace(name))
yield self.validation_error(validation, reason, **kwargs)
def iter_encode(self, attribute, validation='lax', **kwargs):
if self.process_contents == 'skip':
return
name, value = attribute
namespace = get_namespace(name)
if self.is_namespace_allowed(namespace):
self._load_namespace(namespace)
if not self.is_namespace_allowed(namespace):
if validation != 'skip':
reason = "attribute %r not allowed." % name
yield self.validation_error(validation, reason, attribute, **kwargs)
elif self.process_contents == 'skip':
return
elif self.maps.load_namespace(namespace):
try:
xsd_attribute = self.maps.lookup_attribute(name)
except LookupError:
if self.process_contents == 'strict' and validation != 'skip':
if validation == 'skip':
yield unicode_type(value)
elif self.process_contents == 'strict':
reason = "attribute %r not found." % name
yield self.validation_error(validation, reason, attribute, **kwargs)
else:
for result in xsd_attribute.iter_encode(value, validation, **kwargs):
yield result
elif validation != 'skip':
reason = "attribute %r not allowed." % name
yield self.validation_error(validation, reason, attribute, **kwargs)
elif validation == 'skip':
yield unicode_type(value)
elif self.process_contents == 'strict':
reason = "unavailable namespace {!r}".format(get_namespace(name))
yield self.validation_error(validation, reason, **kwargs)
class Xsd11AnyElement(XsdAnyElement):