Add schema matching by path

This commit is contained in:
Davide Brunato 2019-10-05 20:47:32 +02:00
parent 04f63f6b4d
commit 526e9baa9f
8 changed files with 96 additions and 106 deletions

View File

@ -28,4 +28,4 @@ from .xpath_token import XPathToken
from .xpath1_parser import XPath1Parser
from .xpath2_constructors import XPath2Parser
from .xpath_selectors import select, iter_select, Selector
from .schema_proxy import AbstractSchemaProxy, XMLSchemaProxy
from .schema_proxy import AbstractSchemaProxy

View File

@ -10,8 +10,7 @@
#
from abc import ABCMeta, abstractmethod
from .compat import add_metaclass
from .exceptions import ElementPathTypeError, ElementPathValueError
from .namespaces import XSD_NAMESPACE
from .exceptions import ElementPathTypeError
from .xpath_nodes import is_etree_element
from .xpath_context import XPathSchemaContext
@ -186,6 +185,12 @@ class AbstractSchemaProxy(object):
:returns: an object that represents an XSD element or `None`.
"""
# TODO: can make this as @abstractmethod from release v1.3.1
def find(self, path, namespaces=None):
"""
Find the schema component using an XPath expression.
"""
@abstractmethod
def get_substitution_group(self, qname):
"""
@ -234,82 +239,5 @@ class AbstractSchemaProxy(object):
"""
class XMLSchemaProxy(AbstractSchemaProxy):
"""
Schema proxy for the *xmlschema* library. It will be removed soon because
xmlschema v1.0.14 will includes an its own version of schema proxy that
uses a custom context implementation that recognizes circular references.
"""
def __init__(self, schema=None, base_element=None):
if schema is None:
from xmlschema import XMLSchema
schema = XMLSchema.meta_schema
super(XMLSchemaProxy, self).__init__(schema, base_element)
if base_element is not None:
try:
if base_element.schema is not schema:
raise ElementPathValueError("%r is not an element of %r" % (base_element, schema))
except AttributeError:
raise ElementPathTypeError("%r is not an XsdElement" % base_element)
def get_type(self, qname):
try:
return self._schema.maps.types[qname]
except KeyError:
return None
def get_attribute(self, qname):
try:
return self._schema.maps.attributes[qname]
except KeyError:
return None
def get_element(self, qname):
try:
return self._schema.maps.elements[qname]
except KeyError:
return None
def get_substitution_group(self, qname):
try:
return self._schema.maps.substitution_groups[qname]
except KeyError:
return None
def is_instance(self, obj, type_qname):
xsd_type = self._schema.maps.types[type_qname]
try:
xsd_type.encode(obj)
except ValueError:
return False
else:
return True
def cast_as(self, obj, type_qname):
xsd_type = self._schema.maps.types[type_qname]
return xsd_type.decode(obj)
def iter_atomic_types(self):
for xsd_type in self._schema.maps.types.values():
if xsd_type.target_namespace != XSD_NAMESPACE and hasattr(xsd_type, 'primitive_type'):
yield xsd_type
def get_primitive_type(self, xsd_type):
if not xsd_type.is_simple():
if not xsd_type.has_simple_content():
return self._schema.maps.types['{%s}anyType' % XSD_NAMESPACE]
xsd_type = xsd_type.content_type
if not hasattr(xsd_type, 'primitive_type'):
if xsd_type.base_type is None:
return xsd_type
return self.get_primitive_type(xsd_type.base_type)
elif xsd_type.primitive_type is not xsd_type:
return self.get_primitive_type(xsd_type.primitive_type)
else:
return xsd_type
__all__ = ['AbstractXsdComponent', 'AbstractEtreeElement', 'AbstractXsdType', 'AbstractXsdAttribute',
'AbstractXsdElement', 'AbstractSchemaProxy', 'XMLSchemaProxy']
__all__ = ['AbstractXsdComponent', 'AbstractEtreeElement', 'AbstractXsdType',
'AbstractXsdAttribute', 'AbstractXsdElement', 'AbstractSchemaProxy']

View File

@ -20,6 +20,7 @@ from .xpath_context import XPathSchemaContext
from .tdop_parser import Parser, MultiLabel
from .namespaces import XML_ID, XML_LANG, XPATH_1_DEFAULT_NAMESPACES, \
XPATH_FUNCTIONS_NAMESPACE, XSD_NAMESPACE, qname_to_prefixed
from .schema_proxy import AbstractSchemaProxy
from .xpath_token import XPathToken
from .xpath_nodes import AttributeNode, NamespaceNode, TypedAttribute, TypedElement,\
is_etree_element, is_xpath_node, is_element_node, is_document_node, is_attribute_node, \
@ -246,13 +247,11 @@ def select(self, context=None):
return
name = self.value
if name[0] != '{' and self.parser.default_namespace:
tag = u'{%s}%s' % (self.parser.default_namespace, name)
else:
tag = name
if isinstance(context, XPathSchemaContext):
# Bind with the XSD type
# Bind with the XSD type from a schema
if name[0] != '{' and self.parser.default_namespace:
name = '{%s}%s' % (self.parser.default_namespace, name)
for item in context.iter_children_or_self():
xsd_type = self.match_xsd_type(item, name)
if xsd_type is not None:
@ -266,14 +265,60 @@ def select(self, context=None):
yield TypedAttribute(item, value)
else:
yield TypedElement(item, value)
return
if name[0] != '{' and self.parser.default_namespace:
tag = '{%s}%s' % (self.parser.default_namespace, name)
else:
tag = name
# Checks if the token is bound to an XSD type. If not try a match using
# the element path. If this match fails the xsd_type attribute is set
# with the schema object to prevent other checks until the schema change.
if self.xsd_type is self.parser.schema:
elif self.xsd_type is None:
# Untyped selection
for item in context.iter_children_or_self():
if is_attribute_node(item, name):
yield item
elif is_element_node(item, tag):
yield item
elif self.xsd_type is None or isinstance(self.xsd_type, AbstractSchemaProxy):
# Try to match the type using the path
for item in context.iter_children_or_self():
try:
if is_attribute_node(item, name):
path = context.get_path(item)
xsd_attribute = self.parser.schema.find(path, self.parser.namespaces)
if xsd_attribute is not None:
self.xsd_type = xsd_attribute.type
yield TypedAttribute(item, self.xsd_type.decode(item[1]))
else:
self.xsd_type = self.parser.schema
yield item
elif is_element_node(item, tag):
path = context.get_path(item)
xsd_element = self.parser.schema.find(path, self.parser.namespaces)
if xsd_element is not None:
self.xsd_type = xsd_element.type
if isinstance(item, TypedElement):
yield item
elif self.xsd_type.is_simple() or self.xsd_type.has_simple_content():
yield TypedElement(item, self.xsd_type.decode(item.text))
else:
yield item
else:
self.xsd_type = self.parser.schema
yield item
except (TypeError, ValueError):
msg = "Type {!r} does not match sequence type of {!r}"
self.wrong_sequence_type(msg.format(self.xsd_type, item))
else:
# XSD typed selection
for item in context.iter_children_or_self():
@ -369,7 +414,7 @@ def select(self, context=None):
if context is not None:
for item in context.iter_children_or_self():
if is_attribute_node(item, value):
yield item[1]
yield item
elif is_element_node(item, value):
yield item
@ -491,12 +536,16 @@ def select(self, context=None):
# Logical Operators
@method(infix('or', bp=20))
def evaluate(self, context=None):
return bool(self[0].evaluate(context) or self[1].evaluate(context))
if context is None:
return bool(self[0].evaluate() or self[1].evaluate())
return bool(self[0].evaluate(context.copy()) or self[1].evaluate(context.copy()))
@method(infix('and', bp=25))
def evaluate(self, context=None):
return bool(self[0].evaluate(context) and self[1].evaluate(context))
if context is None:
return bool(self[0].evaluate() and self[1].evaluate())
return bool(self[0].evaluate(context.copy()) and self[1].evaluate(context.copy()))
@method(infix('=', bp=30))
@ -748,10 +797,7 @@ def led(self, left):
@method('[')
def select(self, context=None):
if isinstance(context, XPathSchemaContext):
for item in self[0].select(context):
yield item
elif context is not None:
if context is not None:
for position, item in enumerate(self[0].select(context), start=1):
predicate = list(self[1].select(context.copy()))
if len(predicate) == 1 and isinstance(predicate[0], NumericTypeProxy):

View File

@ -105,7 +105,7 @@ class XPathContext(object):
@lru_cache(maxsize=1024)
def get_path(self, item):
"""Cached path resolver for elements and attributes."""
"""Cached path resolver for elements and attributes. Returns absolute paths."""
path = []
if isinstance(item, (AttributeNode, TypedAttribute)):
@ -116,9 +116,9 @@ class XPathContext(object):
while True:
parent = self.get_parent(item)
if parent is None:
return '/'.join(reversed(path))
path.append(item.tag)
if parent is None:
return '/{}'.format('/'.join(reversed(path)))
item = parent
def is_principal_node_kind(self):

View File

@ -381,12 +381,9 @@ class XPathToken(Token):
schema type an exception is raised.
:param schema_item: an XPath item related with a schema instance.
:param name: a not empty string.
:param name: a QName in extended format for matching the item.
:returns: the matched XSD type or `None` if there isn't a match.
"""
if name[0] != '{' and self.parser.default_namespace:
name = '{%s}%s' % (self.parser.default_namespace, name)
if isinstance(schema_item, AttributeNode):
if not schema_item[1].is_matching(name):
return

View File

@ -121,6 +121,17 @@ class XPath2ParserXMLSchemaTest(test_xpath2_parser.XPath2ParserTest):
any_simple_type = schema_proxy.get_type('{%s}anySimpleType' % XSD_NAMESPACE)
self.assertEqual(schema_proxy.get_primitive_type(any_simple_type), any_simple_type)
def test_find_api(self):
schema_src = """<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="test_element" type="xs:string"/>
</xs:schema>"""
schema = xmlschema.XMLSchema(schema_src)
schema_proxy = XMLSchemaProxy(schema=schema)
if xmlschema.__version__ == '1.0.14':
self.assertIsNone(schema_proxy.find('/test_element')) # Not implemented!
else:
self.assertEqual(schema_proxy.find('/test_element'), schema.elements['test_element'])
def test_is_instance_api(self):
self.assertFalse(self.schema_proxy.is_instance(True, '{%s}integer' % XSD_NAMESPACE))
self.assertTrue(self.schema_proxy.is_instance(5, '{%s}integer' % XSD_NAMESPACE))

View File

@ -661,7 +661,8 @@ class XPath2ParserTest(test_xpath1_parser.XPath1ParserTest):
if self.etree is lxml_etree:
prefixes = {'p0', 'p1'}
else:
prefixes = {'p0', 'p2', 'fn', 'xlink', 'err'} | {x for x in self.etree._namespace_map.values()}
prefixes = {'p0', 'p2', 'fn', 'xlink', 'err', 'vc', 'xslt', '', 'hfp'}
prefixes |= {x for x in self.etree._namespace_map.values()}
self.check_selector("fn:in-scope-prefixes(.)", root, prefixes, namespaces={'p0': 'ns0', 'p2': 'ns2'})
def test_string_constructors(self):

View File

@ -53,8 +53,15 @@ class XPathContextTest(unittest.TestCase):
context = XPathContext(root)
self.assertEqual(context.get_path(root), '')
self.assertEqual(context.get_path(root[0]), 'B1')
self.assertEqual(context.get_path(root), '/A')
self.assertEqual(context.get_path(root[0]), '/A/B1')
self.assertEqual(context.get_path(root[0][0]), '/A/B1/C1')
self.assertEqual(context.get_path(root[1]), '/A/B2')
self.assertEqual(context.get_path(root[2]), '/A/B3')
self.assertEqual(context.get_path(root[2][0]), '/A/B3/C1')
self.assertEqual(context.get_path(root[2][1]), '/A/B3/C2')
context._elem = root[2][1]
self.assertEqual(context.get_path(AttributeNode('max', '10')), '/A/B3/C2/@max')
def test_iter_attributes(self):
root = ElementTree.XML('<A a1="10" a2="20"/>')