Merge branch 'master' of github.com:sissaschool/xmlschema into develop

This commit is contained in:
Davide Brunato 2019-10-17 10:41:27 +02:00
commit 9443adf396
2 changed files with 125 additions and 46 deletions

View File

@ -250,7 +250,7 @@ class XMLResource(object):
if base_url is not None and not isinstance(base_url, string_base_type):
raise XMLSchemaValueError(u"'base_url' argument has to be a string: {!r}".format(base_url))
self._root = self._document = self._url = self._text = None
self._root = self._document = self._url = self._text = self._fid = None
self._base_url = base_url
self.defuse = defuse
self.timeout = timeout
@ -279,7 +279,7 @@ class XMLResource(object):
def __setattr__(self, name, value):
if name == 'source':
self._root, self._document, self._text, self._url = self._fromsource(value)
self._root, self._document, self._text, self._url, self._fid = self._fromsource(value)
elif name == 'defuse' and value not in DEFUSE_MODES:
raise XMLSchemaValueError(u"'defuse' attribute: {!r} is not a defuse mode.".format(value))
elif name == 'timeout' and (not isinstance(value, int) or value <= 0):
@ -292,16 +292,16 @@ class XMLResource(object):
url, lazy = None, self._lazy
if hasattr(source, 'tag'):
self._lazy = False
return source, None, None, None # Source is already an Element --> nothing to load
return source, None, None, None, None # Source is already an Element --> nothing to load
elif isinstance(source, string_base_type):
_url, self._url = self._url, None
try:
if lazy:
# check if source is a string containing a valid XML root
for _, root in self.iterparse(StringIO(source), events=('start',)):
return root, None, source, None
return root, None, source, None, None
else:
return self.fromstring(source), None, source, None
return self.fromstring(source), None, source, None, None
except (ElementTree.ParseError, PyElementTree.ParseError, UnicodeEncodeError):
if '\n' in source:
raise
@ -314,33 +314,24 @@ class XMLResource(object):
try:
if lazy:
for _, root in self.iterparse(source, events=('start',)):
return root, None, source.getvalue(), None
return root, None, source.getvalue(), None, None
else:
document = self.parse(source)
return document.getroot(), document, source.getvalue(), None
return document.getroot(), document, source.getvalue(), None, None
finally:
self._url = _url
elif hasattr(source, 'read'):
# source should be a file-like object
_url, self._url = self._url, url
try:
if hasattr(source, 'url'):
url = source.url
if lazy:
for _, root in self.iterparse(source, events=('start',)):
return root, None, None, url, source
else:
url = normalize_url(source.name)
except AttributeError:
pass
else:
_url, self._url = self._url, url
try:
if lazy:
for _, root in self.iterparse(source, events=('start',)):
return root, None, None, url
else:
document = self.parse(source)
return document.getroot(), document, None, url
finally:
self._url = _url
document = self.parse(source)
return document.getroot(), document, None, url, source
finally:
self._url = _url
else:
# Try ElementTree object at last
@ -351,7 +342,7 @@ class XMLResource(object):
else:
if hasattr(root, 'tag'):
self._lazy = False
return root, source, None, None
return root, source, None, None, None
if url is None:
raise XMLSchemaTypeError(
@ -364,11 +355,11 @@ class XMLResource(object):
try:
if lazy:
for _, root in self.iterparse(resource, events=('start',)):
return root, None, None, url
return root, None, None, url, None
else:
document = self.parse(resource)
root = document.getroot()
return root, document, None, url
return root, document, None, url, None
finally:
self._url = _url
resource.close()
@ -487,6 +478,10 @@ class XMLResource(object):
def open(self):
"""Returns a opened resource reader object for the instance URL."""
if self._fid is not None:
self._fid.seek(0)
return self._fid
if self._url is None:
raise XMLSchemaValueError("can't open, the resource has no URL associated.")
try:
@ -499,7 +494,7 @@ class XMLResource(object):
Loads the XML text from the data source. If the data source is an Element
the source XML text can't be retrieved.
"""
if self._url is None:
if self._url is None and self._fid is None:
return # Created from Element or text source --> already loaded
resource = self.open()
@ -508,16 +503,25 @@ class XMLResource(object):
except (OSError, IOError) as err:
raise XMLSchemaOSError("cannot load data from %r: %s" % (self._url, err))
finally:
resource.close()
# We don't want to close the file obj if it wasn't originally
# opened by `XMLResource`. That is the concern of the code
# where the file obj came from.
if self._fid is None:
resource.close()
try:
self._text = data.decode('utf-8') if PY3 else data.encode('utf-8')
except UnicodeDecodeError:
if PY3:
self._text = data.decode('iso-8859-1')
else:
with codecs.open(urlsplit(self._url).path, mode='rb', encoding='iso-8859-1') as f:
self._text = f.read().encode('iso-8859-1')
if isinstance(data, bytes):
try:
text = data.decode('utf-8') if PY3 else data.encode('utf-8')
except UnicodeDecodeError:
if PY3:
text = data.decode('iso-8859-1')
else:
with codecs.open(urlsplit(self._url).path, mode='rb', encoding='iso-8859-1') as f:
text = f.read().encode('iso-8859-1')
else:
text = data
self._text = text
def is_lazy(self):
"""Returns `True` if the XML resource is lazy."""
@ -533,6 +537,9 @@ class XMLResource(object):
for elem in self._root.iter(tag):
yield elem
return
elif self._fid is not None:
self._fid.seek(0)
resource = self._fid
elif self._url is not None:
resource = urlopen(self._url, timeout=self.timeout)
else:
@ -544,7 +551,8 @@ class XMLResource(object):
yield elem
elem.clear()
finally:
resource.close()
if self._fid is None:
resource.close()
def iterfind(self, path=None, namespaces=None):
"""XML resource tree iterfind selector."""
@ -555,6 +563,9 @@ class XMLResource(object):
for e in iter_select(self._root, path, namespaces, strict=False):
yield e
return
elif self._fid is not None:
self._fid.seek(0)
resource = self._fid
elif self._url is not None:
resource = urlopen(self._url, timeout=self.timeout)
else:
@ -592,7 +603,8 @@ class XMLResource(object):
elif level == 0:
elem.clear()
finally:
resource.close()
if self._fid is None:
resource.close()
def iter_location_hints(self):
"""Yields schema location hints from the XML tree."""
@ -644,7 +656,7 @@ class XMLResource(object):
local_root = self.root.tag[0] != '{'
nsmap = {}
if self._url is not None:
if self._url is not None or self._fid is not None:
resource = self.open()
try:
for event, node in self.iterparse(resource, events=('start-ns', 'end')):
@ -655,7 +667,11 @@ class XMLResource(object):
except (ElementTree.ParseError, PyElementTree.ParseError, UnicodeEncodeError):
pass
finally:
resource.close()
# We don't want to close the file obj if it wasn't
# originally opened by `XMLResource`. That is the concern
# of the code where the file obj came from.
if self._fid is None:
resource.close()
elif isinstance(self._text, string_base_type):
try:
for event, node in self.iterparse(StringIO(self._text), events=('start-ns', 'end')):

View File

@ -206,17 +206,22 @@ class TestResources(unittest.TestCase):
resource.load()
self.assertIsNone(resource.text)
@unittest.skipIf(
platform.python_version_tuple()[0] < '3',
"Skip: urlopen on Python 2 can't seek 'file://' paths."
)
def test_xml_resource_from_resource(self):
xml_file = urlopen('file://{}'.format(add_leading_slash(self.vh_xml_file)))
try:
resource = XMLResource(xml_file)
self.assertEqual(resource.source, xml_file)
self.assertEqual(resource.root.tag, '{http://example.com/vehicles}vehicles')
self.check_url(resource.url, self.vh_xml_file)
self.assertIsNone(resource.url)
self.assertIsNone(resource.document)
self.assertIsNone(resource.text)
resource.load()
self.assertTrue(resource.text.startswith('<?xml'))
self.assertFalse(xml_file.closed)
finally:
xml_file.close()
@ -225,21 +230,35 @@ class TestResources(unittest.TestCase):
resource = XMLResource(schema_file)
self.assertEqual(resource.source, schema_file)
self.assertEqual(resource.root.tag, '{http://www.w3.org/2001/XMLSchema}schema')
self.check_url(resource.url, self.vh_xsd_file)
self.assertIsNone(resource.url)
self.assertIsNone(resource.document)
self.assertIsNone(resource.text)
resource.load()
self.assertTrue(resource.text.startswith('<xs:schema'))
self.assertFalse(schema_file.closed)
for _ in resource.iter():
pass
self.assertFalse(schema_file.closed)
for _ in resource.iterfind():
pass
self.assertFalse(schema_file.closed)
with open(self.vh_xsd_file) as schema_file:
resource = XMLResource(schema_file, lazy=False)
self.assertEqual(resource.source, schema_file)
self.assertEqual(resource.root.tag, '{http://www.w3.org/2001/XMLSchema}schema')
self.check_url(resource.url, self.vh_xsd_file)
self.assertIsNone(resource.url)
self.assertIsInstance(resource.document, ElementTree.ElementTree)
self.assertIsNone(resource.text)
resource.load()
self.assertTrue(resource.text.startswith('<xs:schema'))
self.assertFalse(schema_file.closed)
for _ in resource.iter():
pass
self.assertFalse(schema_file.closed)
for _ in resource.iterfind():
pass
self.assertFalse(schema_file.closed)
def test_xml_resource_from_string(self):
with open(self.vh_xsd_file) as schema_file:
@ -355,13 +374,15 @@ class TestResources(unittest.TestCase):
def test_xml_resource_get_namespaces(self):
with open(self.vh_xml_file) as schema_file:
resource = XMLResource(schema_file)
self.assertEqual(resource.url, normalize_url(self.vh_xml_file))
self.assertIsNone(resource.url)
self.assertEqual(set(resource.get_namespaces().keys()), {'vh', 'xsi'})
self.assertFalse(schema_file.closed)
with open(self.vh_xsd_file) as schema_file:
resource = XMLResource(schema_file)
self.assertEqual(resource.url, normalize_url(self.vh_xsd_file))
self.assertIsNone(resource.url)
self.assertEqual(set(resource.get_namespaces().keys()), {'xs', 'vh'})
self.assertFalse(schema_file.closed)
resource = XMLResource(self.col_xml_file)
self.assertEqual(resource.url, normalize_url(self.col_xml_file))
@ -378,7 +399,49 @@ class TestResources(unittest.TestCase):
self.assertEqual(len(locations), 2)
self.check_url(locations[0][1], os.path.join(self.col_dir, 'other.xsd'))
@unittest.skipIf(SKIP_REMOTE_TESTS or platform.system() == 'Windows',
"Remote networks are not accessible or avoid SSL verification error on Windows.")
def test_remote_schemas_loading(self):
col_schema = self.schema_class("https://raw.githubusercontent.com/brunato/xmlschema/master/"
"xmlschema/tests/test_cases/examples/collection/collection.xsd")
self.assertTrue(isinstance(col_schema, self.schema_class))
vh_schema = self.schema_class("https://raw.githubusercontent.com/brunato/xmlschema/master/"
"xmlschema/tests/test_cases/examples/vehicles/vehicles.xsd")
self.assertTrue(isinstance(vh_schema, self.schema_class))
def test_schema_defuse(self):
vh_schema = self.schema_class(self.vh_xsd_file, defuse='always')
self.assertIsInstance(vh_schema.root, etree_element)
for schema in vh_schema.maps.iter_schemas():
self.assertIsInstance(schema.root, etree_element)
def test_fid_with_name_attr(self):
"""XMLResource gets correct data when passed a file like object
with a name attribute that isn't on disk.
These file descriptors appear when working with the contents from a
zip using the zipfile module and with Django files in some
instances.
"""
class FileProxy(object):
def __init__(self, fid, fake_name):
self._fid = fid
self.name = fake_name
def __getattr__(self, attr):
try:
return self.__dict__[attr]
except (KeyError, AttributeError):
return getattr(self.__dict__["_fid"], attr)
fake_name = "not__on____disk.xml"
with open(self.vh_xml_file) as schema_file:
resource = XMLResource(FileProxy(schema_file, fake_name))
self.assertIsNone(resource.url)
self.assertEqual(set(resource.get_namespaces().keys()), {'vh', 'xsi'})
self.assertFalse(schema_file.closed)
if __name__ == '__main__':
from xmlschema.tests import print_test_header