Add schema matching by path
This commit is contained in:
parent
04f63f6b4d
commit
526e9baa9f
|
@ -28,4 +28,4 @@ from .xpath_token import XPathToken
|
|||
from .xpath1_parser import XPath1Parser
|
||||
from .xpath2_constructors import XPath2Parser
|
||||
from .xpath_selectors import select, iter_select, Selector
|
||||
from .schema_proxy import AbstractSchemaProxy, XMLSchemaProxy
|
||||
from .schema_proxy import AbstractSchemaProxy
|
||||
|
|
|
@ -10,8 +10,7 @@
|
|||
#
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from .compat import add_metaclass
|
||||
from .exceptions import ElementPathTypeError, ElementPathValueError
|
||||
from .namespaces import XSD_NAMESPACE
|
||||
from .exceptions import ElementPathTypeError
|
||||
from .xpath_nodes import is_etree_element
|
||||
from .xpath_context import XPathSchemaContext
|
||||
|
||||
|
@ -186,6 +185,12 @@ class AbstractSchemaProxy(object):
|
|||
:returns: an object that represents an XSD element or `None`.
|
||||
"""
|
||||
|
||||
# TODO: can make this as @abstractmethod from release v1.3.1
|
||||
def find(self, path, namespaces=None):
|
||||
"""
|
||||
Find the schema component using an XPath expression.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_substitution_group(self, qname):
|
||||
"""
|
||||
|
@ -234,82 +239,5 @@ class AbstractSchemaProxy(object):
|
|||
"""
|
||||
|
||||
|
||||
class XMLSchemaProxy(AbstractSchemaProxy):
|
||||
"""
|
||||
Schema proxy for the *xmlschema* library. It will be removed soon because
|
||||
xmlschema v1.0.14 will includes an its own version of schema proxy that
|
||||
uses a custom context implementation that recognizes circular references.
|
||||
"""
|
||||
def __init__(self, schema=None, base_element=None):
|
||||
if schema is None:
|
||||
from xmlschema import XMLSchema
|
||||
schema = XMLSchema.meta_schema
|
||||
super(XMLSchemaProxy, self).__init__(schema, base_element)
|
||||
|
||||
if base_element is not None:
|
||||
try:
|
||||
if base_element.schema is not schema:
|
||||
raise ElementPathValueError("%r is not an element of %r" % (base_element, schema))
|
||||
except AttributeError:
|
||||
raise ElementPathTypeError("%r is not an XsdElement" % base_element)
|
||||
|
||||
def get_type(self, qname):
|
||||
try:
|
||||
return self._schema.maps.types[qname]
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
def get_attribute(self, qname):
|
||||
try:
|
||||
return self._schema.maps.attributes[qname]
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
def get_element(self, qname):
|
||||
try:
|
||||
return self._schema.maps.elements[qname]
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
def get_substitution_group(self, qname):
|
||||
try:
|
||||
return self._schema.maps.substitution_groups[qname]
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
def is_instance(self, obj, type_qname):
|
||||
xsd_type = self._schema.maps.types[type_qname]
|
||||
try:
|
||||
xsd_type.encode(obj)
|
||||
except ValueError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def cast_as(self, obj, type_qname):
|
||||
xsd_type = self._schema.maps.types[type_qname]
|
||||
return xsd_type.decode(obj)
|
||||
|
||||
def iter_atomic_types(self):
|
||||
for xsd_type in self._schema.maps.types.values():
|
||||
if xsd_type.target_namespace != XSD_NAMESPACE and hasattr(xsd_type, 'primitive_type'):
|
||||
yield xsd_type
|
||||
|
||||
def get_primitive_type(self, xsd_type):
|
||||
if not xsd_type.is_simple():
|
||||
if not xsd_type.has_simple_content():
|
||||
return self._schema.maps.types['{%s}anyType' % XSD_NAMESPACE]
|
||||
xsd_type = xsd_type.content_type
|
||||
|
||||
if not hasattr(xsd_type, 'primitive_type'):
|
||||
if xsd_type.base_type is None:
|
||||
return xsd_type
|
||||
return self.get_primitive_type(xsd_type.base_type)
|
||||
elif xsd_type.primitive_type is not xsd_type:
|
||||
return self.get_primitive_type(xsd_type.primitive_type)
|
||||
else:
|
||||
return xsd_type
|
||||
|
||||
|
||||
__all__ = ['AbstractXsdComponent', 'AbstractEtreeElement', 'AbstractXsdType', 'AbstractXsdAttribute',
|
||||
'AbstractXsdElement', 'AbstractSchemaProxy', 'XMLSchemaProxy']
|
||||
__all__ = ['AbstractXsdComponent', 'AbstractEtreeElement', 'AbstractXsdType',
|
||||
'AbstractXsdAttribute', 'AbstractXsdElement', 'AbstractSchemaProxy']
|
||||
|
|
|
@ -20,6 +20,7 @@ from .xpath_context import XPathSchemaContext
|
|||
from .tdop_parser import Parser, MultiLabel
|
||||
from .namespaces import XML_ID, XML_LANG, XPATH_1_DEFAULT_NAMESPACES, \
|
||||
XPATH_FUNCTIONS_NAMESPACE, XSD_NAMESPACE, qname_to_prefixed
|
||||
from .schema_proxy import AbstractSchemaProxy
|
||||
from .xpath_token import XPathToken
|
||||
from .xpath_nodes import AttributeNode, NamespaceNode, TypedAttribute, TypedElement,\
|
||||
is_etree_element, is_xpath_node, is_element_node, is_document_node, is_attribute_node, \
|
||||
|
@ -246,13 +247,11 @@ def select(self, context=None):
|
|||
return
|
||||
|
||||
name = self.value
|
||||
if name[0] != '{' and self.parser.default_namespace:
|
||||
tag = u'{%s}%s' % (self.parser.default_namespace, name)
|
||||
else:
|
||||
tag = name
|
||||
|
||||
if isinstance(context, XPathSchemaContext):
|
||||
# Bind with the XSD type
|
||||
# Bind with the XSD type from a schema
|
||||
if name[0] != '{' and self.parser.default_namespace:
|
||||
name = '{%s}%s' % (self.parser.default_namespace, name)
|
||||
|
||||
for item in context.iter_children_or_self():
|
||||
xsd_type = self.match_xsd_type(item, name)
|
||||
if xsd_type is not None:
|
||||
|
@ -266,14 +265,60 @@ def select(self, context=None):
|
|||
yield TypedAttribute(item, value)
|
||||
else:
|
||||
yield TypedElement(item, value)
|
||||
return
|
||||
|
||||
if name[0] != '{' and self.parser.default_namespace:
|
||||
tag = '{%s}%s' % (self.parser.default_namespace, name)
|
||||
else:
|
||||
tag = name
|
||||
|
||||
# Checks if the token is bound to an XSD type. If not try a match using
|
||||
# the element path. If this match fails the xsd_type attribute is set
|
||||
# with the schema object to prevent other checks until the schema change.
|
||||
if self.xsd_type is self.parser.schema:
|
||||
|
||||
elif self.xsd_type is None:
|
||||
# Untyped selection
|
||||
for item in context.iter_children_or_self():
|
||||
if is_attribute_node(item, name):
|
||||
yield item
|
||||
elif is_element_node(item, tag):
|
||||
yield item
|
||||
|
||||
elif self.xsd_type is None or isinstance(self.xsd_type, AbstractSchemaProxy):
|
||||
|
||||
# Try to match the type using the path
|
||||
for item in context.iter_children_or_self():
|
||||
try:
|
||||
if is_attribute_node(item, name):
|
||||
path = context.get_path(item)
|
||||
xsd_attribute = self.parser.schema.find(path, self.parser.namespaces)
|
||||
|
||||
if xsd_attribute is not None:
|
||||
self.xsd_type = xsd_attribute.type
|
||||
yield TypedAttribute(item, self.xsd_type.decode(item[1]))
|
||||
else:
|
||||
self.xsd_type = self.parser.schema
|
||||
yield item
|
||||
elif is_element_node(item, tag):
|
||||
path = context.get_path(item)
|
||||
xsd_element = self.parser.schema.find(path, self.parser.namespaces)
|
||||
|
||||
if xsd_element is not None:
|
||||
self.xsd_type = xsd_element.type
|
||||
if isinstance(item, TypedElement):
|
||||
yield item
|
||||
elif self.xsd_type.is_simple() or self.xsd_type.has_simple_content():
|
||||
yield TypedElement(item, self.xsd_type.decode(item.text))
|
||||
else:
|
||||
yield item
|
||||
else:
|
||||
self.xsd_type = self.parser.schema
|
||||
yield item
|
||||
|
||||
except (TypeError, ValueError):
|
||||
msg = "Type {!r} does not match sequence type of {!r}"
|
||||
self.wrong_sequence_type(msg.format(self.xsd_type, item))
|
||||
|
||||
else:
|
||||
# XSD typed selection
|
||||
for item in context.iter_children_or_self():
|
||||
|
@ -369,7 +414,7 @@ def select(self, context=None):
|
|||
if context is not None:
|
||||
for item in context.iter_children_or_self():
|
||||
if is_attribute_node(item, value):
|
||||
yield item[1]
|
||||
yield item
|
||||
elif is_element_node(item, value):
|
||||
yield item
|
||||
|
||||
|
@ -491,12 +536,16 @@ def select(self, context=None):
|
|||
# Logical Operators
|
||||
@method(infix('or', bp=20))
|
||||
def evaluate(self, context=None):
|
||||
return bool(self[0].evaluate(context) or self[1].evaluate(context))
|
||||
if context is None:
|
||||
return bool(self[0].evaluate() or self[1].evaluate())
|
||||
return bool(self[0].evaluate(context.copy()) or self[1].evaluate(context.copy()))
|
||||
|
||||
|
||||
@method(infix('and', bp=25))
|
||||
def evaluate(self, context=None):
|
||||
return bool(self[0].evaluate(context) and self[1].evaluate(context))
|
||||
if context is None:
|
||||
return bool(self[0].evaluate() and self[1].evaluate())
|
||||
return bool(self[0].evaluate(context.copy()) and self[1].evaluate(context.copy()))
|
||||
|
||||
|
||||
@method(infix('=', bp=30))
|
||||
|
@ -748,10 +797,7 @@ def led(self, left):
|
|||
|
||||
@method('[')
|
||||
def select(self, context=None):
|
||||
if isinstance(context, XPathSchemaContext):
|
||||
for item in self[0].select(context):
|
||||
yield item
|
||||
elif context is not None:
|
||||
if context is not None:
|
||||
for position, item in enumerate(self[0].select(context), start=1):
|
||||
predicate = list(self[1].select(context.copy()))
|
||||
if len(predicate) == 1 and isinstance(predicate[0], NumericTypeProxy):
|
||||
|
|
|
@ -105,7 +105,7 @@ class XPathContext(object):
|
|||
|
||||
@lru_cache(maxsize=1024)
|
||||
def get_path(self, item):
|
||||
"""Cached path resolver for elements and attributes."""
|
||||
"""Cached path resolver for elements and attributes. Returns absolute paths."""
|
||||
path = []
|
||||
|
||||
if isinstance(item, (AttributeNode, TypedAttribute)):
|
||||
|
@ -116,9 +116,9 @@ class XPathContext(object):
|
|||
|
||||
while True:
|
||||
parent = self.get_parent(item)
|
||||
if parent is None:
|
||||
return '/'.join(reversed(path))
|
||||
path.append(item.tag)
|
||||
if parent is None:
|
||||
return '/{}'.format('/'.join(reversed(path)))
|
||||
item = parent
|
||||
|
||||
def is_principal_node_kind(self):
|
||||
|
|
|
@ -381,12 +381,9 @@ class XPathToken(Token):
|
|||
schema type an exception is raised.
|
||||
|
||||
:param schema_item: an XPath item related with a schema instance.
|
||||
:param name: a not empty string.
|
||||
:param name: a QName in extended format for matching the item.
|
||||
:returns: the matched XSD type or `None` if there isn't a match.
|
||||
"""
|
||||
if name[0] != '{' and self.parser.default_namespace:
|
||||
name = '{%s}%s' % (self.parser.default_namespace, name)
|
||||
|
||||
if isinstance(schema_item, AttributeNode):
|
||||
if not schema_item[1].is_matching(name):
|
||||
return
|
||||
|
|
|
@ -121,6 +121,17 @@ class XPath2ParserXMLSchemaTest(test_xpath2_parser.XPath2ParserTest):
|
|||
any_simple_type = schema_proxy.get_type('{%s}anySimpleType' % XSD_NAMESPACE)
|
||||
self.assertEqual(schema_proxy.get_primitive_type(any_simple_type), any_simple_type)
|
||||
|
||||
def test_find_api(self):
|
||||
schema_src = """<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
|
||||
<xs:element name="test_element" type="xs:string"/>
|
||||
</xs:schema>"""
|
||||
schema = xmlschema.XMLSchema(schema_src)
|
||||
schema_proxy = XMLSchemaProxy(schema=schema)
|
||||
if xmlschema.__version__ == '1.0.14':
|
||||
self.assertIsNone(schema_proxy.find('/test_element')) # Not implemented!
|
||||
else:
|
||||
self.assertEqual(schema_proxy.find('/test_element'), schema.elements['test_element'])
|
||||
|
||||
def test_is_instance_api(self):
|
||||
self.assertFalse(self.schema_proxy.is_instance(True, '{%s}integer' % XSD_NAMESPACE))
|
||||
self.assertTrue(self.schema_proxy.is_instance(5, '{%s}integer' % XSD_NAMESPACE))
|
||||
|
|
|
@ -661,7 +661,8 @@ class XPath2ParserTest(test_xpath1_parser.XPath1ParserTest):
|
|||
if self.etree is lxml_etree:
|
||||
prefixes = {'p0', 'p1'}
|
||||
else:
|
||||
prefixes = {'p0', 'p2', 'fn', 'xlink', 'err'} | {x for x in self.etree._namespace_map.values()}
|
||||
prefixes = {'p0', 'p2', 'fn', 'xlink', 'err', 'vc', 'xslt', '', 'hfp'}
|
||||
prefixes |= {x for x in self.etree._namespace_map.values()}
|
||||
self.check_selector("fn:in-scope-prefixes(.)", root, prefixes, namespaces={'p0': 'ns0', 'p2': 'ns2'})
|
||||
|
||||
def test_string_constructors(self):
|
||||
|
|
|
@ -53,8 +53,15 @@ class XPathContextTest(unittest.TestCase):
|
|||
|
||||
context = XPathContext(root)
|
||||
|
||||
self.assertEqual(context.get_path(root), '')
|
||||
self.assertEqual(context.get_path(root[0]), 'B1')
|
||||
self.assertEqual(context.get_path(root), '/A')
|
||||
self.assertEqual(context.get_path(root[0]), '/A/B1')
|
||||
self.assertEqual(context.get_path(root[0][0]), '/A/B1/C1')
|
||||
self.assertEqual(context.get_path(root[1]), '/A/B2')
|
||||
self.assertEqual(context.get_path(root[2]), '/A/B3')
|
||||
self.assertEqual(context.get_path(root[2][0]), '/A/B3/C1')
|
||||
self.assertEqual(context.get_path(root[2][1]), '/A/B3/C2')
|
||||
context._elem = root[2][1]
|
||||
self.assertEqual(context.get_path(AttributeNode('max', '10')), '/A/B3/C2/@max')
|
||||
|
||||
def test_iter_attributes(self):
|
||||
root = ElementTree.XML('<A a1="10" a2="20"/>')
|
||||
|
|
Loading…
Reference in New Issue