Stop reading `name` and `url` from file object attrs

These attrs shouldn't be used to reopen the file object as:
- they may not reflect the original file or resource (file objects
  opened from a zipfile will have a name that doesn't correspond to any
  file on disk).
- Depending on how the fid was opened, these attrs could be crafted to
  read arbitrary files from disk. If the creator of a .zip gives a file
  inside the zip file a path of `/etc/passwd` we may end up opening that
  file.

Instead of reopening the file, we keep track of the file object and seek
to the beginning of the file. This means (for most operations) the file
object must be seekable. On Python 2 urlopen returns an unseekable
object for 'file://' paths. One test had to be skipped in Python 2 for
this reason.
This commit is contained in:
Daniel Hillier 2019-07-05 22:23:28 +10:00
parent 4085e8daa5
commit 61e1f609fc
2 changed files with 100 additions and 59 deletions

View File

@ -245,7 +245,7 @@ class XMLResource(object):
if base_url is not None and not isinstance(base_url, string_base_type):
raise XMLSchemaValueError(u"'base_url' argument has to be a string: {!r}".format(base_url))
self._root = self._document = self._url = self._text = None
self._root = self._document = self._url = self._text = self._fid = None
self._base_url = base_url
self.defuse = defuse
self.timeout = timeout
@ -274,7 +274,7 @@ class XMLResource(object):
def __setattr__(self, name, value):
if name == 'source':
self._root, self._document, self._text, self._url = self._fromsource(value)
self._root, self._document, self._text, self._url, self._fid = self._fromsource(value)
elif name == 'defuse' and value not in DEFUSE_MODES:
raise XMLSchemaValueError(u"'defuse' attribute: {!r} is not a defuse mode.".format(value))
elif name == 'timeout' and (not isinstance(value, int) or value <= 0):
@ -287,16 +287,16 @@ class XMLResource(object):
url, lazy = None, self._lazy
if is_etree_element(source):
self._lazy = False
return source, None, None, None # Source is already an Element --> nothing to load
return source, None, None, None, None # Source is already an Element --> nothing to load
elif isinstance(source, string_base_type):
_url, self._url = self._url, None
try:
if lazy:
# check if source is a string containing a valid XML root
for _, root in self.iterparse(StringIO(source), events=('start',)):
return root, None, source, None
return root, None, source, None, None
else:
return self.fromstring(source), None, source, None
return self.fromstring(source), None, source, None, None
except (ElementTree.ParseError, PyElementTree.ParseError, UnicodeEncodeError):
if '\n' in source:
raise
@ -309,33 +309,24 @@ class XMLResource(object):
try:
if lazy:
for _, root in self.iterparse(source, events=('start',)):
return root, None, source.getvalue(), None
return root, None, source.getvalue(), None, None
else:
document = self.parse(source)
return document.getroot(), document, source.getvalue(), None
return document.getroot(), document, source.getvalue(), None, None
finally:
self._url = _url
elif hasattr(source, 'read'):
# source should be a file-like object
_url, self._url = self._url, url
try:
if hasattr(source, 'url'):
url = source.url
if lazy:
for _, root in self.iterparse(source, events=('start',)):
return root, None, None, url, source
else:
url = normalize_url(source.name)
except AttributeError:
pass
else:
_url, self._url = self._url, url
try:
if lazy:
for _, root in self.iterparse(source, events=('start',)):
return root, None, None, url
else:
document = self.parse(source)
return document.getroot(), document, None, url
finally:
self._url = _url
document = self.parse(source)
return document.getroot(), document, None, url, source
finally:
self._url = _url
else:
# Try ElementTree object at last
@ -346,7 +337,7 @@ class XMLResource(object):
else:
if is_etree_element(root):
self._lazy = False
return root, source, None, None
return root, source, None, None, None
if url is None:
raise XMLSchemaTypeError(
@ -359,11 +350,11 @@ class XMLResource(object):
try:
if lazy:
for _, root in self.iterparse(resource, events=('start',)):
return root, None, None, url
return root, None, None, url, None
else:
document = self.parse(resource)
root = document.getroot()
return root, document, None, url
return root, document, None, url, None
finally:
self._url = _url
resource.close()
@ -482,6 +473,10 @@ class XMLResource(object):
def open(self):
"""Returns a opened resource reader object for the instance URL."""
if self._fid is not None:
self._fid.seek(0)
return self._fid
if self._url is None:
raise XMLSchemaValueError("can't open, the resource has no URL associated.")
try:
@ -494,7 +489,7 @@ class XMLResource(object):
Loads the XML text from the data source. If the data source is an Element
the source XML text can't be retrieved.
"""
if self._url is None:
if self._url is None and self._fid is None:
return # Created from Element or text source --> already loaded
resource = self.open()
@ -503,16 +498,25 @@ class XMLResource(object):
except (OSError, IOError) as err:
raise XMLSchemaOSError("cannot load data from %r: %s" % (self._url, err))
finally:
resource.close()
# We don't want to close the file obj if it wasn't originally
# opened by `XMLResource`. That is the concern of the code
# where the file obj came from.
if self._fid is None:
resource.close()
try:
self._text = data.decode('utf-8') if PY3 else data.encode('utf-8')
except UnicodeDecodeError:
if PY3:
self._text = data.decode('iso-8859-1')
else:
with codecs.open(urlsplit(self._url).path, mode='rb', encoding='iso-8859-1') as f:
self._text = f.read().encode('iso-8859-1')
if isinstance(data, bytes):
try:
text = data.decode('utf-8') if PY3 else data.encode('utf-8')
except UnicodeDecodeError:
if PY3:
text = data.decode('iso-8859-1')
else:
with codecs.open(urlsplit(self._url).path, mode='rb', encoding='iso-8859-1') as f:
text = f.read().encode('iso-8859-1')
else:
text = data
self._text = text
def is_lazy(self):
"""Returns `True` if the XML resource is lazy."""
@ -528,6 +532,9 @@ class XMLResource(object):
for elem in self._root.iter(tag):
yield elem
return
elif self._fid is not None:
self._fid.seek(0)
resource = self._fid
elif self._url is not None:
resource = urlopen(self._url, timeout=self.timeout)
else:
@ -539,7 +546,8 @@ class XMLResource(object):
yield elem
elem.clear()
finally:
resource.close()
if self._fid is None:
resource.close()
def iterfind(self, path=None, namespaces=None):
"""XML resource tree iterfind selector."""
@ -550,6 +558,9 @@ class XMLResource(object):
for e in iter_select(self._root, path, namespaces, strict=False):
yield e
return
elif self._fid is not None:
self._fid.seek(0)
resource = self._fid
elif self._url is not None:
resource = urlopen(self._url, timeout=self.timeout)
else:
@ -587,7 +598,8 @@ class XMLResource(object):
elif level == 0:
elem.clear()
finally:
resource.close()
if self._fid is None:
resource.close()
def iter_location_hints(self):
"""Yields schema location hints from the XML tree."""
@ -639,7 +651,7 @@ class XMLResource(object):
local_root = self.root.tag[0] != '{'
nsmap = {}
if self._url is not None:
if self._url is not None or self._fid is not None:
resource = self.open()
try:
for event, node in self.iterparse(resource, events=('start-ns', 'end')):
@ -650,7 +662,11 @@ class XMLResource(object):
except (ElementTree.ParseError, PyElementTree.ParseError, UnicodeEncodeError):
pass
finally:
resource.close()
# We don't want to close the file obj if it wasn't
# originally opened by `XMLResource`. That is the concern
# of the code where the file obj came from.
if self._fid is None:
resource.close()
elif isinstance(self._text, string_base_type):
try:
for event, node in self.iterparse(StringIO(self._text), events=('start-ns', 'end')):

View File

@ -15,7 +15,6 @@ This module runs tests concerning resources.
import unittest
import os
import platform
import zipfile
try:
from pathlib import PureWindowsPath, PurePath
@ -27,7 +26,7 @@ from xmlschema import (
load_xml_resource, XMLResource, XMLSchemaURLError
)
from xmlschema.tests import XMLSchemaTestCase, SKIP_REMOTE_TESTS
from xmlschema.compat import urlopen, urlsplit, uses_relative, StringIO, BytesIO
from xmlschema.compat import urlopen, urlsplit, uses_relative, StringIO
from xmlschema.etree import ElementTree, PyElementTree, lxml_etree, is_etree_element, etree_element, py_etree_element
@ -196,17 +195,22 @@ class TestResources(XMLSchemaTestCase):
resource.load()
self.assertIsNone(resource.text)
@unittest.skipIf(
platform.python_version_tuple()[0] < '3',
"Skip: urlopen on Python 2 can't seek 'file://' paths."
)
def test_xml_resource_from_resource(self):
xml_file = urlopen('file://{}'.format(add_leading_slash(self.vh_xml_file)))
try:
resource = XMLResource(xml_file)
self.assertEqual(resource.source, xml_file)
self.assertEqual(resource.root.tag, '{http://example.com/vehicles}vehicles')
self.check_url(resource.url, self.vh_xml_file)
self.assertIsNone(resource.url)
self.assertIsNone(resource.document)
self.assertIsNone(resource.text)
resource.load()
self.assertTrue(resource.text.startswith('<?xml'))
self.assertFalse(xml_file.closed)
finally:
xml_file.close()
@ -215,21 +219,35 @@ class TestResources(XMLSchemaTestCase):
resource = XMLResource(schema_file)
self.assertEqual(resource.source, schema_file)
self.assertEqual(resource.root.tag, '{http://www.w3.org/2001/XMLSchema}schema')
self.check_url(resource.url, self.vh_xsd_file)
self.assertIsNone(resource.url)
self.assertIsNone(resource.document)
self.assertIsNone(resource.text)
resource.load()
self.assertTrue(resource.text.startswith('<xs:schema'))
self.assertFalse(schema_file.closed)
for _ in resource.iter():
pass
self.assertFalse(schema_file.closed)
for _ in resource.iterfind():
pass
self.assertFalse(schema_file.closed)
with open(self.vh_xsd_file) as schema_file:
resource = XMLResource(schema_file, lazy=False)
self.assertEqual(resource.source, schema_file)
self.assertEqual(resource.root.tag, '{http://www.w3.org/2001/XMLSchema}schema')
self.check_url(resource.url, self.vh_xsd_file)
self.assertIsNone(resource.url)
self.assertIsInstance(resource.document, ElementTree.ElementTree)
self.assertIsNone(resource.text)
resource.load()
self.assertTrue(resource.text.startswith('<xs:schema'))
self.assertFalse(schema_file.closed)
for _ in resource.iter():
pass
self.assertFalse(schema_file.closed)
for _ in resource.iterfind():
pass
self.assertFalse(schema_file.closed)
def test_xml_resource_from_string(self):
with open(self.vh_xsd_file) as schema_file:
@ -345,13 +363,15 @@ class TestResources(XMLSchemaTestCase):
def test_xml_resource_get_namespaces(self):
with open(self.vh_xml_file) as schema_file:
resource = XMLResource(schema_file)
self.assertEqual(resource.url, normalize_url(self.vh_xml_file))
self.assertIsNone(resource.url)
self.assertEqual(set(resource.get_namespaces().keys()), {'vh', 'xsi'})
self.assertFalse(schema_file.closed)
with open(self.vh_xsd_file) as schema_file:
resource = XMLResource(schema_file)
self.assertEqual(resource.url, normalize_url(self.vh_xsd_file))
self.assertIsNone(resource.url)
self.assertEqual(set(resource.get_namespaces().keys()), {'xs', 'vh'})
self.assertFalse(schema_file.closed)
resource = XMLResource(self.col_xml_file)
self.assertEqual(resource.url, normalize_url(self.col_xml_file))
@ -392,18 +412,23 @@ class TestResources(XMLSchemaTestCase):
zip using the zipfile module and with Django files in some
instances.
"""
zipname = "not__on____disk.xml"
bytes_fid = BytesIO()
with zipfile.ZipFile(bytes_fid, 'w') as zf:
with open(self.vh_xml_file) as fid:
zf.writestr(zipname, fid.read())
class FileProxy(object):
def __init__(self, fid, fake_name):
self._fid = fid
self.name = fake_name
bytes_fid.seek(0)
with zipfile.ZipFile(bytes_fid) as zf:
with zf.open(zipname) as fid:
resource = XMLResource(fid)
# This should not cause an error.
resource.load()
def __getattr__(self, attr):
try:
return self.__dict__[attr]
except (KeyError, AttributeError):
return getattr(self.__dict__["_fid"], attr)
fake_name = "not__on____disk.xml"
with open(self.vh_xml_file) as schema_file:
resource = XMLResource(FileProxy(schema_file, fake_name))
self.assertIsNone(resource.url)
self.assertEqual(set(resource.get_namespaces().keys()), {'vh', 'xsi'})
self.assertFalse(schema_file.closed)
if __name__ == '__main__':