Merge pull request #117 from danifus/visitorconv

Add VisitorConverter: uses visitor model for encoding
This commit is contained in:
Davide Brunato 2019-07-02 17:16:23 +02:00 committed by GitHub
commit 40fd9d9137
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 291 additions and 11 deletions

View File

@ -121,6 +121,8 @@ to JSON data <http://wiki.open311.org/JSON_and_XML_Conversion/>`_.
.. automethod:: element_decode
.. automethod:: element_encode
.. autoclass:: xmlschema.UnorderedConverter
.. autoclass:: xmlschema.ParkerConverter
.. autoclass:: xmlschema.BadgerFishConverter

View File

@ -24,9 +24,11 @@ ElementData = namedtuple('ElementData', ['tag', 'text', 'content', 'attributes']
"""
Namedtuple for Element data interchange between decoders and converters.
The field *tag* is a string containing the Element's tag, *text* can be `None`
or a string representing the Element's text, *content* can be `None` or a list
containing the Element's children, *attributes* can be `None` or a dictionary
containing the Element's attributes.
or a string representing the Element's text, *content* can be `None`, a list
containing the Element's children or a dictionary containing element name to
list of element contents for the Element's children (used for unordered input
data), *attributes* can be `None` or a dictionary containing the Element's
attributes.
"""
@ -366,6 +368,141 @@ class XMLSchemaConverter(NamespaceMapper):
return ElementData(tag, text, content, attributes)
class UnorderedConverter(XMLSchemaConverter):
"""
Same as :class:`XMLSchemaConverter` but :meth:`element_encode` is
modified so the order of the elements in the encoded output is based on
the model visitor pattern rather than the order in which the elements
were added to the input dictionary. As the order of the input
dictionary is not preserved, text between sibling elements will raise
an exception.
eg.
.. code-block:: python
import xmlschema
from xmlschema.converters import UnorderedConverter
xsd = \"\"\"<?xml version="1.0" encoding="UTF-8"?>
<schema xmlns:ns="ns" xmlns="http://www.w3.org/2001/XMLSchema"
targetNamespace="ns" elementFormDefault="unqualified" version="1.0">
<element name="foo">
<complexType>
<sequence minOccurs="1" maxOccurs="2">
<element name="A" type="integer" />
<element name="B" type="integer" />
</sequence>
</complexType>
</element>
</schema>\"\"\"
schema = xmlschema.XMLSchema(xsd, converter=UnorderedConverter)
tree = schema.to_etree(
{"A": [1, 2], "B": [3, 4]},
)
# Returns equivalent of:
# <ns:foo xmlns:ns="ns">
# <A>1</A>
# <B>3</B>
# <A>2</A>
# <B>4</B>
# </ns:foo>
Schemas which contain repeated sequences (``maxOccurs > 1``) of
optional elements may be ambiguous using this approach when some of the
optional elements are not present. In those cases, decoding and then
encoding may not reproduce the original ordering.
"""
def element_encode(self, obj, xsd_element, level=0):
"""
Extracts XML decoded data from a data structure for encoding into an ElementTree.
:param obj: the decoded object.
:param xsd_element: the `XsdElement` associated to the decoded data structure.
:param level: the level related to the encoding process (0 means the root).
:return: an ElementData instance.
"""
if level != 0:
tag = xsd_element.name
elif not self.preserve_root:
tag = xsd_element.qualified_name
else:
tag = xsd_element.qualified_name
try:
obj = obj.get(tag, xsd_element.local_name)
except (KeyError, AttributeError, TypeError):
pass
if not isinstance(obj, (self.dict, dict)):
if xsd_element.type.is_simple() or xsd_element.type.has_simple_content():
return ElementData(tag, obj, None, self.dict())
else:
return ElementData(tag, None, obj, self.dict())
unmap_qname = self.unmap_qname
unmap_attribute_qname = self._unmap_attribute_qname
text_key = self.text_key
attr_prefix = self.attr_prefix
ns_prefix = self.ns_prefix
cdata_prefix = self.cdata_prefix
text = None
# `iter_encode` assumes that the values of this dict will all be lists
# where each item is the content of a single element. When building
# content_lu, content which is not a list or lists to be placed into a
# single element (element has a list content type) must be wrapped in a
# list to retain that structure.
content_lu = {}
attributes = self.dict()
for name, value in obj.items():
if text_key and name == text_key:
text = obj[text_key]
elif (cdata_prefix and name.startswith(cdata_prefix)) or \
name[0].isdigit() and cdata_prefix == '':
raise XMLSchemaValueError(
"cdata segments are not compatible with the '{}' converter".format(
self.__class__.__name__
)
)
elif name == ns_prefix:
self[''] = value
elif name.startswith('%s:' % ns_prefix):
self[name[len(ns_prefix) + 1:]] = value
elif attr_prefix and name.startswith(attr_prefix):
name = name[len(attr_prefix):]
attributes[unmap_attribute_qname(name)] = value
elif not isinstance(value, (self.list, list)) or not value:
content_lu[unmap_qname(name)] = [value]
elif isinstance(value[0], (self.dict, dict, self.list, list)):
content_lu[unmap_qname(name)] = value
else:
# `value` is a list but not a list of lists or list of
# dicts.
ns_name = unmap_qname(name)
for xsd_child in xsd_element.type.content_type.iter_elements():
matched_element = xsd_child.match(ns_name, self.get(''))
if matched_element is not None:
if matched_element.type.is_list():
content_lu[unmap_qname(name)] = [value]
else:
content_lu[unmap_qname(name)] = value
break
else:
if attr_prefix == '' and ns_name not in attributes:
for xsd_attribute in xsd_element.attributes.values():
if xsd_attribute.is_matching(ns_name):
attributes[ns_name] = value
break
else:
content_lu[unmap_qname(name)] = [value]
else:
content_lu[unmap_qname(name)] = [value]
return ElementData(tag, text, content_lu, attributes)
class ParkerConverter(XMLSchemaConverter):
"""
XML Schema based converter class for Parker convention.

View File

@ -27,9 +27,11 @@ from xmlschema import (
XMLSchemaEncodeError, XMLSchemaValidationError, ParkerConverter,
BadgerFishConverter, AbderaConverter, JsonMLConverter
)
from xmlschema.converters import UnorderedConverter
from xmlschema.compat import unicode_type, ordered_dict_class
from xmlschema.etree import etree_element, etree_tostring, is_etree_element, ElementTree, \
etree_elements_assert_equal, lxml_etree, lxml_etree_element
from xmlschema.exceptions import XMLSchemaValueError
from xmlschema.validators.exceptions import XMLSchemaChildrenValidationError
from xmlschema.helpers import local_name
from xmlschema.qnames import XSI_TYPE
@ -777,6 +779,14 @@ class TestDecoding(XMLSchemaTestCase):
default_dict_root = self.col_schema.to_dict(self.col_xml_file, preserve_root=True)
self.assertEqual(default_dict_root, {'col:collection': _COLLECTION_DICT})
def test_visitor_converter(self):
visitor_dict = self.col_schema.to_dict(self.col_xml_file, converter=UnorderedConverter)
self.assertEqual(visitor_dict, _COLLECTION_DICT)
visitor_dict_root = self.col_schema.to_dict(
self.col_xml_file, converter=UnorderedConverter(preserve_root=True))
self.assertEqual(visitor_dict_root, {'col:collection': _COLLECTION_DICT})
def test_parker_converter(self):
parker_dict = self.col_schema.to_dict(self.col_xml_file, converter=xmlschema.ParkerConverter)
self.assertEqual(parker_dict, _COLLECTION_PARKER)
@ -1259,17 +1269,53 @@ class TestEncoding(XMLSchemaTestCase):
indent=0,
)
self.check_encode(schema.elements['A'], {'B1': 'abc', 'B2': 10, 'B4': False}, XMLSchemaValidationError)
converter_cls = getattr(self.schema_class, "converter", None)
if converter_cls and issubclass(converter_cls, UnorderedConverter):
# UnorderedConverter doesn't use ordered content which makes
# it incompatible with cdata.
self.check_encode(
xsd_component=schema.elements['A'],
data=ordered_dict_class([('B1', 'abc'), ('B2', 10), ('#1', 'hello'), ('B3', True)]),
expected=XMLSchemaValueError,
indent=0, cdata_prefix='#'
)
else:
self.check_encode(
xsd_component=schema.elements['A'],
data=ordered_dict_class([('B1', 'abc'), ('B2', 10), ('#1', 'hello'), ('B3', True)]),
expected=u'<ns:A xmlns:ns="ns">\n<B1>abc</B1>\n<B2>10</B2>\nhello\n<B3>true</B3>\n</ns:A>',
indent=0, cdata_prefix='#'
)
self.check_encode(
xsd_component=schema.elements['A'],
data=ordered_dict_class([('B1', 'abc'), ('B2', 10), ('#1', 'hello')]),
expected=XMLSchemaValidationError, indent=0, cdata_prefix='#'
)
def test_encode_unordered_content(self):
schema = self.get_schema("""
<element name="A" type="ns:A_type" />
<complexType name="A_type">
<sequence>
<element name="B1" type="string"/>
<element name="B2" type="integer"/>
<element name="B3" type="boolean"/>
</sequence>
</complexType>
""")
converter_cls = getattr(self.schema_class, "converter", None)
if converter_cls and issubclass(converter_cls, UnorderedConverter):
expected = u'<ns:A xmlns:ns="ns">\n<B1>abc</B1>\n<B2>10</B2>\n<B3>true</B3>\n</ns:A>'
else:
expected = XMLSchemaChildrenValidationError
self.check_encode(
xsd_component=schema.elements['A'],
data=ordered_dict_class([('B1', 'abc'), ('B2', 10), ('#1', 'hello'), ('B3', True)]),
expected=u'<ns:A xmlns:ns="ns">\n<B1>abc</B1>\n<B2>10</B2>\nhello\n<B3>true</B3>\n</ns:A>',
data=ordered_dict_class([('B2', 10), ('B1', 'abc'), ('B3', True)]),
expected=expected,
indent=0, cdata_prefix='#'
)
self.check_encode(
xsd_component=schema.elements['A'],
data=ordered_dict_class([('B1', 'abc'), ('B2', 10), ('#1', 'hello')]),
expected=XMLSchemaValidationError, indent=0, cdata_prefix='#'
)
def test_encode_datetime(self):
xs = self.get_schema('<element name="dt" type="dateTime"/>')
@ -1356,6 +1402,41 @@ class TestEncoding11(TestEncoding):
schema_class = XMLSchema11
class XMLSchemaUnorderedConverter(xmlschema.XMLSchema):
converter = UnorderedConverter
class TestEncodingUnorderedConverter10(TestEncoding):
schema_class = XMLSchemaUnorderedConverter
def test_visitor_converter_repeated_sequence_of_elements(self):
schema = self.get_schema("""
<element name="foo">
<complexType>
<sequence minOccurs="1" maxOccurs="2">
<element name="A" minOccurs="0" type="integer" nillable="true" />
<element name="B" minOccurs="0" type="integer" nillable="true" />
</sequence>
</complexType>
</element>
""")
tree = schema.to_etree(
{"A": [1, 2], "B": [3, 4]},
)
vals = []
for elem in tree:
vals.append(elem.text)
self.assertEqual(vals, ['1', '3', '2', '4'])
class XMLSchema11UnorderedConverter(XMLSchema11):
converter = UnorderedConverter
class TestEncodingUnorderedConverter11(TestEncoding):
schema_class = XMLSchema11UnorderedConverter
# Creates decoding/encoding tests classes from XML files
globals().update(tests_factory(make_validator_test_class, 'xml'))

View File

@ -639,6 +639,61 @@ class XsdGroup(XsdComponent, ModelGroup, ValidationMixin):
yield result_list
def sort_content(self, content):
"""
Takes a dictionary and returns a list of element name and content tuples.
Ordering is inferred from ModelVisitor with any elements that don't
fit the schema placed at the end of the returned list. The calling
function is responsible for raising or collecting errors from those
unplaced elements.
:param content: a dictionary of element name to list of element contents.
The values of this dictionary must be lists where each item of the
list is the content of a single element.
:return: yields of a list of the Element being encoded's children.
"""
consumable_content = {key: iter(val) for key, val in content.items()}
ordered_content = []
model = ModelVisitor(self)
while model.element is not None:
elem_name = None
if model.element.name in consumable_content:
elem_name = model.element.name
else:
for elem in model.element.iter_substitutes():
if elem.name in consumable_content:
elem_name = elem.name
break
match = False
if elem_name is not None:
match = True
try:
ordered_content.append(
(elem_name, next(consumable_content[elem_name]))
)
except StopIteration:
match = False
del consumable_content[elem_name]
if not consumable_content:
break
# Consume the return of advance otherwise we get stuck in an
# infinite loop. Checking validity is the responsibility of
# `iter_encode`.
list(model.advance(match))
# Add the remaining content onto the end of the data. It's up to
# the `iter_encode` function to decide whether their presence is an
# error (validation="lax", etc.).
for elem_name, values in consumable_content.items():
for value in values:
ordered_content.append((elem_name, value))
return ordered_content
def iter_encode(self, element_data, validation='lax', **kwargs):
"""
Creates an iterator for encoding data to a list containing Element data.
@ -670,7 +725,12 @@ class XsdGroup(XsdComponent, ModelGroup, ValidationMixin):
model = ModelVisitor(self)
cdata_index = 0
for index, (name, value) in enumerate(element_data.content):
if isinstance(element_data.content, dict):
content = self.sort_content(element_data.content)
else:
content = element_data.content
for index, (name, value) in enumerate(content):
if isinstance(name, int):
if not children:
text = padding + value if text is None else text + value + padding