Stop reading `name` and `url` from file object attrs
These attrs shouldn't be used to reopen the file object as: - they may not reflect the original file or resource (file objects opened from a zipfile will have a name that doesn't correspond to any file on disk). - Depending on how the fid was opened, these attrs could be crafted to read arbitrary files from disk. If the creator of a .zip gives a file inside the zip file a path of `/etc/passwd` we may end up opening that file. Instead of reopening the file, we keep track of the file object and seek to the beginning of the file. This means (for most operations) the file object must be seekable. On Python 2 urlopen returns an unseekable object for 'file://' paths. One test had to be skipped in Python 2 for this reason.
This commit is contained in:
parent
4085e8daa5
commit
61e1f609fc
|
@ -245,7 +245,7 @@ class XMLResource(object):
|
|||
if base_url is not None and not isinstance(base_url, string_base_type):
|
||||
raise XMLSchemaValueError(u"'base_url' argument has to be a string: {!r}".format(base_url))
|
||||
|
||||
self._root = self._document = self._url = self._text = None
|
||||
self._root = self._document = self._url = self._text = self._fid = None
|
||||
self._base_url = base_url
|
||||
self.defuse = defuse
|
||||
self.timeout = timeout
|
||||
|
@ -274,7 +274,7 @@ class XMLResource(object):
|
|||
|
||||
def __setattr__(self, name, value):
|
||||
if name == 'source':
|
||||
self._root, self._document, self._text, self._url = self._fromsource(value)
|
||||
self._root, self._document, self._text, self._url, self._fid = self._fromsource(value)
|
||||
elif name == 'defuse' and value not in DEFUSE_MODES:
|
||||
raise XMLSchemaValueError(u"'defuse' attribute: {!r} is not a defuse mode.".format(value))
|
||||
elif name == 'timeout' and (not isinstance(value, int) or value <= 0):
|
||||
|
@ -287,16 +287,16 @@ class XMLResource(object):
|
|||
url, lazy = None, self._lazy
|
||||
if is_etree_element(source):
|
||||
self._lazy = False
|
||||
return source, None, None, None # Source is already an Element --> nothing to load
|
||||
return source, None, None, None, None # Source is already an Element --> nothing to load
|
||||
elif isinstance(source, string_base_type):
|
||||
_url, self._url = self._url, None
|
||||
try:
|
||||
if lazy:
|
||||
# check if source is a string containing a valid XML root
|
||||
for _, root in self.iterparse(StringIO(source), events=('start',)):
|
||||
return root, None, source, None
|
||||
return root, None, source, None, None
|
||||
else:
|
||||
return self.fromstring(source), None, source, None
|
||||
return self.fromstring(source), None, source, None, None
|
||||
except (ElementTree.ParseError, PyElementTree.ParseError, UnicodeEncodeError):
|
||||
if '\n' in source:
|
||||
raise
|
||||
|
@ -309,33 +309,24 @@ class XMLResource(object):
|
|||
try:
|
||||
if lazy:
|
||||
for _, root in self.iterparse(source, events=('start',)):
|
||||
return root, None, source.getvalue(), None
|
||||
return root, None, source.getvalue(), None, None
|
||||
else:
|
||||
document = self.parse(source)
|
||||
return document.getroot(), document, source.getvalue(), None
|
||||
return document.getroot(), document, source.getvalue(), None, None
|
||||
finally:
|
||||
self._url = _url
|
||||
|
||||
elif hasattr(source, 'read'):
|
||||
# source should be a file-like object
|
||||
_url, self._url = self._url, url
|
||||
try:
|
||||
if hasattr(source, 'url'):
|
||||
url = source.url
|
||||
if lazy:
|
||||
for _, root in self.iterparse(source, events=('start',)):
|
||||
return root, None, None, url, source
|
||||
else:
|
||||
url = normalize_url(source.name)
|
||||
except AttributeError:
|
||||
pass
|
||||
else:
|
||||
_url, self._url = self._url, url
|
||||
try:
|
||||
if lazy:
|
||||
for _, root in self.iterparse(source, events=('start',)):
|
||||
return root, None, None, url
|
||||
else:
|
||||
document = self.parse(source)
|
||||
return document.getroot(), document, None, url
|
||||
finally:
|
||||
self._url = _url
|
||||
document = self.parse(source)
|
||||
return document.getroot(), document, None, url, source
|
||||
finally:
|
||||
self._url = _url
|
||||
|
||||
else:
|
||||
# Try ElementTree object at last
|
||||
|
@ -346,7 +337,7 @@ class XMLResource(object):
|
|||
else:
|
||||
if is_etree_element(root):
|
||||
self._lazy = False
|
||||
return root, source, None, None
|
||||
return root, source, None, None, None
|
||||
|
||||
if url is None:
|
||||
raise XMLSchemaTypeError(
|
||||
|
@ -359,11 +350,11 @@ class XMLResource(object):
|
|||
try:
|
||||
if lazy:
|
||||
for _, root in self.iterparse(resource, events=('start',)):
|
||||
return root, None, None, url
|
||||
return root, None, None, url, None
|
||||
else:
|
||||
document = self.parse(resource)
|
||||
root = document.getroot()
|
||||
return root, document, None, url
|
||||
return root, document, None, url, None
|
||||
finally:
|
||||
self._url = _url
|
||||
resource.close()
|
||||
|
@ -482,6 +473,10 @@ class XMLResource(object):
|
|||
|
||||
def open(self):
|
||||
"""Returns a opened resource reader object for the instance URL."""
|
||||
if self._fid is not None:
|
||||
self._fid.seek(0)
|
||||
return self._fid
|
||||
|
||||
if self._url is None:
|
||||
raise XMLSchemaValueError("can't open, the resource has no URL associated.")
|
||||
try:
|
||||
|
@ -494,7 +489,7 @@ class XMLResource(object):
|
|||
Loads the XML text from the data source. If the data source is an Element
|
||||
the source XML text can't be retrieved.
|
||||
"""
|
||||
if self._url is None:
|
||||
if self._url is None and self._fid is None:
|
||||
return # Created from Element or text source --> already loaded
|
||||
|
||||
resource = self.open()
|
||||
|
@ -503,16 +498,25 @@ class XMLResource(object):
|
|||
except (OSError, IOError) as err:
|
||||
raise XMLSchemaOSError("cannot load data from %r: %s" % (self._url, err))
|
||||
finally:
|
||||
resource.close()
|
||||
# We don't want to close the file obj if it wasn't originally
|
||||
# opened by `XMLResource`. That is the concern of the code
|
||||
# where the file obj came from.
|
||||
if self._fid is None:
|
||||
resource.close()
|
||||
|
||||
try:
|
||||
self._text = data.decode('utf-8') if PY3 else data.encode('utf-8')
|
||||
except UnicodeDecodeError:
|
||||
if PY3:
|
||||
self._text = data.decode('iso-8859-1')
|
||||
else:
|
||||
with codecs.open(urlsplit(self._url).path, mode='rb', encoding='iso-8859-1') as f:
|
||||
self._text = f.read().encode('iso-8859-1')
|
||||
if isinstance(data, bytes):
|
||||
try:
|
||||
text = data.decode('utf-8') if PY3 else data.encode('utf-8')
|
||||
except UnicodeDecodeError:
|
||||
if PY3:
|
||||
text = data.decode('iso-8859-1')
|
||||
else:
|
||||
with codecs.open(urlsplit(self._url).path, mode='rb', encoding='iso-8859-1') as f:
|
||||
text = f.read().encode('iso-8859-1')
|
||||
else:
|
||||
text = data
|
||||
|
||||
self._text = text
|
||||
|
||||
def is_lazy(self):
|
||||
"""Returns `True` if the XML resource is lazy."""
|
||||
|
@ -528,6 +532,9 @@ class XMLResource(object):
|
|||
for elem in self._root.iter(tag):
|
||||
yield elem
|
||||
return
|
||||
elif self._fid is not None:
|
||||
self._fid.seek(0)
|
||||
resource = self._fid
|
||||
elif self._url is not None:
|
||||
resource = urlopen(self._url, timeout=self.timeout)
|
||||
else:
|
||||
|
@ -539,7 +546,8 @@ class XMLResource(object):
|
|||
yield elem
|
||||
elem.clear()
|
||||
finally:
|
||||
resource.close()
|
||||
if self._fid is None:
|
||||
resource.close()
|
||||
|
||||
def iterfind(self, path=None, namespaces=None):
|
||||
"""XML resource tree iterfind selector."""
|
||||
|
@ -550,6 +558,9 @@ class XMLResource(object):
|
|||
for e in iter_select(self._root, path, namespaces, strict=False):
|
||||
yield e
|
||||
return
|
||||
elif self._fid is not None:
|
||||
self._fid.seek(0)
|
||||
resource = self._fid
|
||||
elif self._url is not None:
|
||||
resource = urlopen(self._url, timeout=self.timeout)
|
||||
else:
|
||||
|
@ -587,7 +598,8 @@ class XMLResource(object):
|
|||
elif level == 0:
|
||||
elem.clear()
|
||||
finally:
|
||||
resource.close()
|
||||
if self._fid is None:
|
||||
resource.close()
|
||||
|
||||
def iter_location_hints(self):
|
||||
"""Yields schema location hints from the XML tree."""
|
||||
|
@ -639,7 +651,7 @@ class XMLResource(object):
|
|||
local_root = self.root.tag[0] != '{'
|
||||
nsmap = {}
|
||||
|
||||
if self._url is not None:
|
||||
if self._url is not None or self._fid is not None:
|
||||
resource = self.open()
|
||||
try:
|
||||
for event, node in self.iterparse(resource, events=('start-ns', 'end')):
|
||||
|
@ -650,7 +662,11 @@ class XMLResource(object):
|
|||
except (ElementTree.ParseError, PyElementTree.ParseError, UnicodeEncodeError):
|
||||
pass
|
||||
finally:
|
||||
resource.close()
|
||||
# We don't want to close the file obj if it wasn't
|
||||
# originally opened by `XMLResource`. That is the concern
|
||||
# of the code where the file obj came from.
|
||||
if self._fid is None:
|
||||
resource.close()
|
||||
elif isinstance(self._text, string_base_type):
|
||||
try:
|
||||
for event, node in self.iterparse(StringIO(self._text), events=('start-ns', 'end')):
|
||||
|
|
|
@ -15,7 +15,6 @@ This module runs tests concerning resources.
|
|||
import unittest
|
||||
import os
|
||||
import platform
|
||||
import zipfile
|
||||
|
||||
try:
|
||||
from pathlib import PureWindowsPath, PurePath
|
||||
|
@ -27,7 +26,7 @@ from xmlschema import (
|
|||
load_xml_resource, XMLResource, XMLSchemaURLError
|
||||
)
|
||||
from xmlschema.tests import XMLSchemaTestCase, SKIP_REMOTE_TESTS
|
||||
from xmlschema.compat import urlopen, urlsplit, uses_relative, StringIO, BytesIO
|
||||
from xmlschema.compat import urlopen, urlsplit, uses_relative, StringIO
|
||||
from xmlschema.etree import ElementTree, PyElementTree, lxml_etree, is_etree_element, etree_element, py_etree_element
|
||||
|
||||
|
||||
|
@ -196,17 +195,22 @@ class TestResources(XMLSchemaTestCase):
|
|||
resource.load()
|
||||
self.assertIsNone(resource.text)
|
||||
|
||||
@unittest.skipIf(
|
||||
platform.python_version_tuple()[0] < '3',
|
||||
"Skip: urlopen on Python 2 can't seek 'file://' paths."
|
||||
)
|
||||
def test_xml_resource_from_resource(self):
|
||||
xml_file = urlopen('file://{}'.format(add_leading_slash(self.vh_xml_file)))
|
||||
try:
|
||||
resource = XMLResource(xml_file)
|
||||
self.assertEqual(resource.source, xml_file)
|
||||
self.assertEqual(resource.root.tag, '{http://example.com/vehicles}vehicles')
|
||||
self.check_url(resource.url, self.vh_xml_file)
|
||||
self.assertIsNone(resource.url)
|
||||
self.assertIsNone(resource.document)
|
||||
self.assertIsNone(resource.text)
|
||||
resource.load()
|
||||
self.assertTrue(resource.text.startswith('<?xml'))
|
||||
self.assertFalse(xml_file.closed)
|
||||
finally:
|
||||
xml_file.close()
|
||||
|
||||
|
@ -215,21 +219,35 @@ class TestResources(XMLSchemaTestCase):
|
|||
resource = XMLResource(schema_file)
|
||||
self.assertEqual(resource.source, schema_file)
|
||||
self.assertEqual(resource.root.tag, '{http://www.w3.org/2001/XMLSchema}schema')
|
||||
self.check_url(resource.url, self.vh_xsd_file)
|
||||
self.assertIsNone(resource.url)
|
||||
self.assertIsNone(resource.document)
|
||||
self.assertIsNone(resource.text)
|
||||
resource.load()
|
||||
self.assertTrue(resource.text.startswith('<xs:schema'))
|
||||
self.assertFalse(schema_file.closed)
|
||||
for _ in resource.iter():
|
||||
pass
|
||||
self.assertFalse(schema_file.closed)
|
||||
for _ in resource.iterfind():
|
||||
pass
|
||||
self.assertFalse(schema_file.closed)
|
||||
|
||||
with open(self.vh_xsd_file) as schema_file:
|
||||
resource = XMLResource(schema_file, lazy=False)
|
||||
self.assertEqual(resource.source, schema_file)
|
||||
self.assertEqual(resource.root.tag, '{http://www.w3.org/2001/XMLSchema}schema')
|
||||
self.check_url(resource.url, self.vh_xsd_file)
|
||||
self.assertIsNone(resource.url)
|
||||
self.assertIsInstance(resource.document, ElementTree.ElementTree)
|
||||
self.assertIsNone(resource.text)
|
||||
resource.load()
|
||||
self.assertTrue(resource.text.startswith('<xs:schema'))
|
||||
self.assertFalse(schema_file.closed)
|
||||
for _ in resource.iter():
|
||||
pass
|
||||
self.assertFalse(schema_file.closed)
|
||||
for _ in resource.iterfind():
|
||||
pass
|
||||
self.assertFalse(schema_file.closed)
|
||||
|
||||
def test_xml_resource_from_string(self):
|
||||
with open(self.vh_xsd_file) as schema_file:
|
||||
|
@ -345,13 +363,15 @@ class TestResources(XMLSchemaTestCase):
|
|||
def test_xml_resource_get_namespaces(self):
|
||||
with open(self.vh_xml_file) as schema_file:
|
||||
resource = XMLResource(schema_file)
|
||||
self.assertEqual(resource.url, normalize_url(self.vh_xml_file))
|
||||
self.assertIsNone(resource.url)
|
||||
self.assertEqual(set(resource.get_namespaces().keys()), {'vh', 'xsi'})
|
||||
self.assertFalse(schema_file.closed)
|
||||
|
||||
with open(self.vh_xsd_file) as schema_file:
|
||||
resource = XMLResource(schema_file)
|
||||
self.assertEqual(resource.url, normalize_url(self.vh_xsd_file))
|
||||
self.assertIsNone(resource.url)
|
||||
self.assertEqual(set(resource.get_namespaces().keys()), {'xs', 'vh'})
|
||||
self.assertFalse(schema_file.closed)
|
||||
|
||||
resource = XMLResource(self.col_xml_file)
|
||||
self.assertEqual(resource.url, normalize_url(self.col_xml_file))
|
||||
|
@ -392,18 +412,23 @@ class TestResources(XMLSchemaTestCase):
|
|||
zip using the zipfile module and with Django files in some
|
||||
instances.
|
||||
"""
|
||||
zipname = "not__on____disk.xml"
|
||||
bytes_fid = BytesIO()
|
||||
with zipfile.ZipFile(bytes_fid, 'w') as zf:
|
||||
with open(self.vh_xml_file) as fid:
|
||||
zf.writestr(zipname, fid.read())
|
||||
class FileProxy(object):
|
||||
def __init__(self, fid, fake_name):
|
||||
self._fid = fid
|
||||
self.name = fake_name
|
||||
|
||||
bytes_fid.seek(0)
|
||||
with zipfile.ZipFile(bytes_fid) as zf:
|
||||
with zf.open(zipname) as fid:
|
||||
resource = XMLResource(fid)
|
||||
# This should not cause an error.
|
||||
resource.load()
|
||||
def __getattr__(self, attr):
|
||||
try:
|
||||
return self.__dict__[attr]
|
||||
except (KeyError, AttributeError):
|
||||
return getattr(self.__dict__["_fid"], attr)
|
||||
|
||||
fake_name = "not__on____disk.xml"
|
||||
with open(self.vh_xml_file) as schema_file:
|
||||
resource = XMLResource(FileProxy(schema_file, fake_name))
|
||||
self.assertIsNone(resource.url)
|
||||
self.assertEqual(set(resource.get_namespaces().keys()), {'vh', 'xsi'})
|
||||
self.assertFalse(schema_file.closed)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Loading…
Reference in New Issue