Update XSD validation with wildcards
- Add load_namespace() to XsdGlobals . Modify iter_decode/iter_encode for xs:any and xs:anyAttribute
This commit is contained in:
parent
8407f09139
commit
248b9f9b68
|
@ -17,7 +17,7 @@ from collections import Counter
|
|||
|
||||
from ..compat import string_base_type
|
||||
from ..exceptions import XMLSchemaKeyError, XMLSchemaTypeError, XMLSchemaValueError, XMLSchemaWarning
|
||||
from ..namespaces import XSD_NAMESPACE, NamespaceResourcesMap
|
||||
from ..namespaces import XSD_NAMESPACE, LOCATION_HINTS, NamespaceResourcesMap
|
||||
from ..qnames import XSD_REDEFINE, XSD_OVERRIDE, XSD_NOTATION, XSD_ANY_TYPE, \
|
||||
XSD_SIMPLE_TYPE, XSD_COMPLEX_TYPE, XSD_GROUP, XSD_ATTRIBUTE, XSD_ATTRIBUTE_GROUP, \
|
||||
XSD_ELEMENT, XSI_TYPE, get_qname, local_name, qname_to_extended
|
||||
|
@ -203,6 +203,7 @@ class XsdGlobals(XsdValidator):
|
|||
|
||||
self.validator = validator
|
||||
self.namespaces = NamespaceResourcesMap() # Registered schemas by namespace URI
|
||||
self.missing_locations = [] # Missing or failing resource locations
|
||||
|
||||
self.types = {} # Global types (both complex and simple)
|
||||
self.attributes = {} # Global attributes
|
||||
|
@ -384,6 +385,61 @@ class XsdGlobals(XsdValidator):
|
|||
elif not any(schema.url == obj.url and schema.__class__ == obj.__class__ for obj in ns_schemas):
|
||||
ns_schemas.append(schema)
|
||||
|
||||
def load_namespace(self, namespace, build=True):
|
||||
"""
|
||||
Load namespace from available location hints. Returns `True` if the namespace
|
||||
is already loaded or if the namespace can be loaded from one of the locations,
|
||||
returns `False` otherwise. Failing locations are inserted into the missing
|
||||
locations list.
|
||||
|
||||
:param namespace: the namespace to load.
|
||||
:param build: if left with `True` value builds the maps after load. If the \
|
||||
build fails the resource URL is added to missing locations.
|
||||
"""
|
||||
namespace = namespace.strip()
|
||||
if namespace in self.namespaces:
|
||||
return True
|
||||
elif self.validator.meta_schema is None:
|
||||
return False # Do not load additional namespaces for meta-schema (XHTML)
|
||||
|
||||
# Try from schemas location hints: usually the namespaces related to these
|
||||
# hints are already loaded during schema construction, but it's better to
|
||||
# retry once if the initial load has failed.
|
||||
for schema in self.iter_schemas():
|
||||
for url in schema.get_locations(namespace):
|
||||
if url in self.missing_locations:
|
||||
continue
|
||||
|
||||
try:
|
||||
if schema.import_schema(namespace, url, schema.base_url) is not None:
|
||||
if build:
|
||||
self.build()
|
||||
except (OSError, IOError):
|
||||
pass
|
||||
except XMLSchemaNotBuiltError:
|
||||
self.clear(remove_schemas=True, only_unbuilt=True)
|
||||
self.missing_locations.append(url)
|
||||
else:
|
||||
return True
|
||||
|
||||
# Try from library location hint, if there is any.
|
||||
if namespace in LOCATION_HINTS:
|
||||
url = LOCATION_HINTS[namespace]
|
||||
if url not in self.missing_locations:
|
||||
try:
|
||||
if self.validator.import_schema(namespace, url) is not None:
|
||||
if build:
|
||||
self.build()
|
||||
except (OSError, IOError):
|
||||
return False
|
||||
except XMLSchemaNotBuiltError:
|
||||
self.clear(remove_schemas=True, only_unbuilt=True)
|
||||
self.missing_locations.append(url)
|
||||
else:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def clear(self, remove_schemas=False, only_unbuilt=False):
|
||||
"""
|
||||
Clears the instance maps and schemas.
|
||||
|
@ -415,6 +471,7 @@ class XsdGlobals(XsdValidator):
|
|||
self.namespaces = namespaces
|
||||
|
||||
else:
|
||||
self.missing_locations.clear()
|
||||
for global_map in self.global_maps:
|
||||
global_map.clear()
|
||||
self.substitution_groups.clear()
|
||||
|
|
|
@ -13,13 +13,12 @@ This module contains classes for XML Schema wildcards.
|
|||
"""
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from ..compat import unicode_type
|
||||
from ..exceptions import XMLSchemaValueError
|
||||
from ..namespaces import XSI_NAMESPACE
|
||||
from ..qnames import XSD_ANY, XSD_ANY_ATTRIBUTE, XSD_OPEN_CONTENT, \
|
||||
XSD_DEFAULT_OPEN_CONTENT, get_namespace
|
||||
from ..xpath import XMLSchemaProxy, ElementPathMixin
|
||||
|
||||
from .exceptions import XMLSchemaNotBuiltError
|
||||
from .xsdbase import ValidationMixin, XsdComponent, ParticleMixin
|
||||
|
||||
|
||||
|
@ -129,25 +128,6 @@ class XsdWildcard(XsdComponent, ValidationMixin):
|
|||
|
||||
self.not_qname = names
|
||||
|
||||
def _load_namespace(self, namespace):
|
||||
if namespace in self.schema.maps.namespaces:
|
||||
return
|
||||
|
||||
for url in self.schema.get_locations(namespace):
|
||||
try:
|
||||
schema = self.schema.import_schema(namespace, url, base_url=self.schema.base_url)
|
||||
if schema is not None:
|
||||
try:
|
||||
schema.maps.build()
|
||||
except XMLSchemaNotBuiltError:
|
||||
# Namespace build fails: remove unbuilt schemas and the url hint
|
||||
schema.maps.clear(remove_schemas=True, only_unbuilt=True)
|
||||
self.schema.locations[namespace].remove(url)
|
||||
else:
|
||||
break
|
||||
except (OSError, IOError):
|
||||
pass
|
||||
|
||||
@property
|
||||
def built(self):
|
||||
return True
|
||||
|
@ -444,48 +424,65 @@ class XsdAnyElement(XsdWildcard, ParticleMixin, ElementPathMixin):
|
|||
return iter(())
|
||||
|
||||
def iter_decode(self, elem, validation='lax', **kwargs):
|
||||
if self.is_matching(elem.tag):
|
||||
if self.process_contents == 'skip':
|
||||
return
|
||||
if not self.is_matching(elem.tag):
|
||||
if validation != 'skip':
|
||||
reason = "element %r not allowed here." % elem.tag
|
||||
yield self.validation_error(validation, reason, elem, **kwargs)
|
||||
|
||||
self._load_namespace(get_namespace(elem.tag))
|
||||
elif self.process_contents == 'skip':
|
||||
return
|
||||
|
||||
elif self.maps.load_namespace(get_namespace(elem.tag)):
|
||||
try:
|
||||
xsd_element = self.maps.lookup_element(elem.tag)
|
||||
except LookupError:
|
||||
if kwargs.get('drop_results'):
|
||||
# Validation-only mode: use anyType for decode a complex element.
|
||||
if validation == 'skip':
|
||||
yield self.any_type.decode(elem) if len(elem) > 0 else elem.text
|
||||
elif self.process_contents == 'strict' and validation != 'skip':
|
||||
elif self.process_contents == 'strict':
|
||||
reason = "element %r not found." % elem.tag
|
||||
yield self.validation_error(validation, reason, elem, **kwargs)
|
||||
else:
|
||||
for result in xsd_element.iter_decode(elem, validation, **kwargs):
|
||||
yield result
|
||||
elif validation != 'skip':
|
||||
reason = "element %r not allowed here." % elem.tag
|
||||
|
||||
elif validation == 'skip':
|
||||
yield self.any_type.decode(elem) if len(elem) > 0 else elem.text
|
||||
|
||||
elif self.process_contents == 'strict':
|
||||
reason = "unavailable namespace {!r}".format(get_namespace(elem.tag))
|
||||
yield self.validation_error(validation, reason, elem, **kwargs)
|
||||
|
||||
def iter_encode(self, obj, validation='lax', **kwargs):
|
||||
if self.process_contents == 'skip':
|
||||
return
|
||||
|
||||
name, value = obj
|
||||
namespace = get_namespace(name)
|
||||
|
||||
if self.is_namespace_allowed(namespace):
|
||||
self._load_namespace(namespace)
|
||||
if not self.is_namespace_allowed(namespace):
|
||||
if validation != 'skip':
|
||||
reason = "element %r not allowed here." % name
|
||||
yield self.validation_error(validation, reason, value, **kwargs)
|
||||
|
||||
elif self.process_contents == 'skip':
|
||||
return
|
||||
|
||||
elif self.maps.load_namespace(namespace):
|
||||
try:
|
||||
xsd_element = self.maps.lookup_element(name)
|
||||
except LookupError:
|
||||
if self.process_contents == 'strict' and validation != 'skip':
|
||||
if validation == 'skip':
|
||||
yield self.any_type.encode(value)
|
||||
elif self.process_contents == 'strict':
|
||||
reason = "element %r not found." % name
|
||||
yield self.validation_error(validation, reason, **kwargs)
|
||||
else:
|
||||
for result in xsd_element.iter_encode(value, validation, **kwargs):
|
||||
yield result
|
||||
elif validation != 'skip':
|
||||
reason = "element %r not allowed here." % name
|
||||
yield self.validation_error(validation, reason, value, **kwargs)
|
||||
|
||||
elif validation == 'skip':
|
||||
yield self.any_type.encode(value)
|
||||
|
||||
elif self.process_contents == 'strict':
|
||||
reason = "unavailable namespace {!r}".format(namespace)
|
||||
yield self.validation_error(validation, reason, **kwargs)
|
||||
|
||||
def is_overlap(self, other):
|
||||
if not isinstance(other, XsdAnyElement):
|
||||
|
@ -562,47 +559,66 @@ class XsdAnyAttribute(XsdWildcard):
|
|||
|
||||
def iter_decode(self, attribute, validation='lax', **kwargs):
|
||||
name, value = attribute
|
||||
if self.is_matching(name):
|
||||
if self.process_contents == 'skip':
|
||||
return
|
||||
|
||||
self._load_namespace(get_namespace(name))
|
||||
if not self.is_matching(name):
|
||||
if validation != 'skip':
|
||||
reason = "attribute %r not allowed." % name
|
||||
yield self.validation_error(validation, reason, attribute, **kwargs)
|
||||
|
||||
elif self.process_contents == 'skip':
|
||||
return
|
||||
|
||||
elif self.maps.load_namespace(get_namespace(name)):
|
||||
try:
|
||||
xsd_attribute = self.maps.lookup_attribute(name)
|
||||
except LookupError:
|
||||
if kwargs.get('drop_results'):
|
||||
# Validation-only mode: returns the value if a decoder is not found.
|
||||
if validation == 'skip':
|
||||
yield value
|
||||
elif self.process_contents == 'strict' and validation != 'skip':
|
||||
elif self.process_contents == 'strict':
|
||||
reason = "attribute %r not found." % name
|
||||
yield self.validation_error(validation, reason, attribute, **kwargs)
|
||||
else:
|
||||
for result in xsd_attribute.iter_decode(value, validation, **kwargs):
|
||||
yield result
|
||||
elif validation != 'skip':
|
||||
reason = "attribute %r not allowed." % name
|
||||
yield self.validation_error(validation, reason, attribute, **kwargs)
|
||||
|
||||
elif validation == 'skip':
|
||||
yield value
|
||||
|
||||
elif self.process_contents == 'strict':
|
||||
reason = "unavailable namespace {!r}".format(get_namespace(name))
|
||||
yield self.validation_error(validation, reason, **kwargs)
|
||||
|
||||
def iter_encode(self, attribute, validation='lax', **kwargs):
|
||||
if self.process_contents == 'skip':
|
||||
return
|
||||
|
||||
name, value = attribute
|
||||
namespace = get_namespace(name)
|
||||
if self.is_namespace_allowed(namespace):
|
||||
self._load_namespace(namespace)
|
||||
|
||||
if not self.is_namespace_allowed(namespace):
|
||||
if validation != 'skip':
|
||||
reason = "attribute %r not allowed." % name
|
||||
yield self.validation_error(validation, reason, attribute, **kwargs)
|
||||
|
||||
elif self.process_contents == 'skip':
|
||||
return
|
||||
|
||||
elif self.maps.load_namespace(namespace):
|
||||
try:
|
||||
xsd_attribute = self.maps.lookup_attribute(name)
|
||||
except LookupError:
|
||||
if self.process_contents == 'strict' and validation != 'skip':
|
||||
if validation == 'skip':
|
||||
yield unicode_type(value)
|
||||
elif self.process_contents == 'strict':
|
||||
reason = "attribute %r not found." % name
|
||||
yield self.validation_error(validation, reason, attribute, **kwargs)
|
||||
else:
|
||||
for result in xsd_attribute.iter_encode(value, validation, **kwargs):
|
||||
yield result
|
||||
elif validation != 'skip':
|
||||
reason = "attribute %r not allowed." % name
|
||||
yield self.validation_error(validation, reason, attribute, **kwargs)
|
||||
|
||||
elif validation == 'skip':
|
||||
yield unicode_type(value)
|
||||
|
||||
elif self.process_contents == 'strict':
|
||||
reason = "unavailable namespace {!r}".format(get_namespace(name))
|
||||
yield self.validation_error(validation, reason, **kwargs)
|
||||
|
||||
|
||||
class Xsd11AnyElement(XsdAnyElement):
|
||||
|
|
Loading…
Reference in New Issue