Complete the revision of resource module
- normalize_url() now processes file names containing '#' chars - Fix iterfind() of lazy resource - Add more tests for XML resources
This commit is contained in:
parent
8db83477df
commit
c075ff22e5
|
@ -66,14 +66,21 @@ def normalize_url(url, base_url=None, keep_relative=False):
|
|||
conformant to URL format specification.
|
||||
:return: A normalized URL.
|
||||
"""
|
||||
def add_trailing_slash(r):
|
||||
return urlunsplit((r[0], r[1], r[2] + '/' if r[2] and r[2][-1] != '/' else r[2], r[3], r[4]))
|
||||
def add_trailing_slash(x):
|
||||
return urlunsplit((x[0], x[1], x[2] + '/' if x[2] and x[2][-1] != '/' else x[2], x[3], x[4]))
|
||||
|
||||
def filter_url(x):
|
||||
x = x.strip().replace('\\', '/')
|
||||
while x.startswith('//'):
|
||||
x = x.replace('//', '/', 1)
|
||||
if not urlsplit(x).scheme:
|
||||
x = x.replace('#', '%23')
|
||||
return x
|
||||
|
||||
url = filter_url(url)
|
||||
|
||||
if base_url is not None:
|
||||
base_url = base_url.replace('\\', '/')
|
||||
while base_url.startswith('//'):
|
||||
base_url = base_url.replace('//', '/', 1)
|
||||
|
||||
base_url = filter_url(base_url)
|
||||
base_url_parts = urlsplit(base_url)
|
||||
base_url = add_trailing_slash(base_url_parts)
|
||||
if base_url_parts.scheme not in uses_relative:
|
||||
|
@ -102,10 +109,6 @@ def normalize_url(url, base_url=None, keep_relative=False):
|
|||
if base_url_parts.netloc and not url.startswith(base_url_parts.netloc) and url.startswith('//'):
|
||||
url = 'file:' + url
|
||||
|
||||
url = url.replace('\\', '/')
|
||||
while url.startswith('//'):
|
||||
url = url.replace('//', '/', 1)
|
||||
|
||||
url_parts = urlsplit(url, scheme='file')
|
||||
if url_parts.scheme not in uses_relative:
|
||||
return 'file:///{}'.format(url_parts.geturl()) # Eg. k:/Python/lib/....
|
||||
|
@ -622,6 +625,7 @@ class XMLResource(object):
|
|||
else:
|
||||
resource = StringIO(self._text)
|
||||
|
||||
# Note: lazy iteration change the order (top level element is the last)
|
||||
try:
|
||||
for event, elem in self.iterparse(resource, events=('end',)):
|
||||
if tag is None or elem.tag == tag:
|
||||
|
@ -664,8 +668,8 @@ class XMLResource(object):
|
|||
elem.clear()
|
||||
else:
|
||||
selector = Selector(path, namespaces, strict=False, parser=XmlResourceXPathParser)
|
||||
path.replace(' ', '').replace('./', '')
|
||||
path_level = path.count('/') + 1
|
||||
path = path.replace(' ', '').replace('./', '')
|
||||
path_level = path.count('/') + 1 if path != '.' else 0
|
||||
select_all = '*' in path and set(path).issubset({'*', '/'})
|
||||
|
||||
level = 0
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
DUMMY CONTENT
|
|
@ -13,7 +13,6 @@
|
|||
This module runs tests concerning resources.
|
||||
"""
|
||||
import unittest
|
||||
import time
|
||||
import os
|
||||
import platform
|
||||
|
||||
|
@ -123,12 +122,25 @@ class TestResources(unittest.TestCase):
|
|||
self.assertEqual(normalize_url('dir2/schema.xsd', '//root/dir1'), 'file:///root/dir1/dir2/schema.xsd')
|
||||
self.assertEqual(normalize_url('dir2/schema.xsd', '////root/dir1'), 'file:///root/dir1/dir2/schema.xsd')
|
||||
|
||||
self.check_url(normalize_url('issue #000.xml', 'file://host/home/'),
|
||||
'file://host/home/issue %23000.xml')
|
||||
self.check_url(normalize_url('data.xml', 'file://host/home/issue 000'),
|
||||
'file://host/home/issue 000/data.xml')
|
||||
self.check_url(normalize_url('data.xml', '/host/home/issue #000'),
|
||||
'/host/home/issue %23000/data.xml')
|
||||
|
||||
def test_fetch_resource(self):
|
||||
wrong_path = casepath('resources/dummy_file.txt')
|
||||
self.assertRaises(XMLSchemaURLError, fetch_resource, wrong_path)
|
||||
right_path = casepath('resources/dummy file.txt')
|
||||
self.assertTrue(fetch_resource(right_path).endswith('dummy file.txt'))
|
||||
|
||||
ambiguous_path = casepath('resources/dummy file #2.txt')
|
||||
self.assertTrue(fetch_resource(ambiguous_path).endswith('dummy file %232.txt'))
|
||||
|
||||
with urlopen(fetch_resource(ambiguous_path)) as res:
|
||||
self.assertEqual(res.read(), b'DUMMY CONTENT')
|
||||
|
||||
def test_fetch_namespaces(self):
|
||||
self.assertFalse(fetch_namespaces(casepath('resources/malformed.xml')))
|
||||
|
||||
|
@ -436,75 +448,58 @@ class TestResources(unittest.TestCase):
|
|||
resource.open()
|
||||
|
||||
def test_xml_resource_iter(self):
|
||||
for lazy in (False, True):
|
||||
resource = XMLResource(self.schema_class.meta_schema.source.url, lazy=lazy)
|
||||
k = 0
|
||||
for k, _ in enumerate(resource.iter()):
|
||||
pass
|
||||
self.assertEqual(k, 1389)
|
||||
|
||||
k = 0
|
||||
for k, _ in enumerate(resource.iter('{%s}complexType' % XSD_NAMESPACE)):
|
||||
pass
|
||||
self.assertEqual(k, 55)
|
||||
|
||||
def test_xml_resource_iterfind(self):
|
||||
resource = XMLResource(self.schema_class.meta_schema.source.url, lazy=False)
|
||||
self.assertFalse(resource.is_lazy())
|
||||
lazy_resource = XMLResource(self.schema_class.meta_schema.source.url)
|
||||
self.assertTrue(lazy_resource.is_lazy())
|
||||
|
||||
start_time = time.time()
|
||||
for _ in range(10):
|
||||
for _ in resource.iterfind():
|
||||
pass
|
||||
t1 = time.time() - start_time
|
||||
tags = [x.tag for x in resource.iter()]
|
||||
self.assertEqual(len(tags), 1390)
|
||||
self.assertEqual(tags[0], '{%s}schema' % XSD_NAMESPACE)
|
||||
|
||||
start_time = time.time()
|
||||
for _ in range(10):
|
||||
for _ in resource.iterfind(path='.'):
|
||||
pass
|
||||
t2 = time.time() - start_time
|
||||
self.assertLessEqual(t1, t2 / 30.0)
|
||||
self.assertGreaterEqual(t1, t2 / 100.0)
|
||||
lazy_tags = [x.tag for x in lazy_resource.iter()]
|
||||
self.assertEqual(len(lazy_tags), 1390)
|
||||
self.assertEqual(lazy_tags[-1], '{%s}schema' % XSD_NAMESPACE)
|
||||
self.assertNotEqual(tags, lazy_tags)
|
||||
|
||||
start_time = time.time()
|
||||
counter = 0
|
||||
for _ in resource.iterfind(path='*'):
|
||||
counter += 1
|
||||
t3 = time.time() - start_time
|
||||
self.assertGreaterEqual(t2, t3 / counter * 10)
|
||||
tags = [x.tag for x in resource.iter('{%s}complexType' % XSD_NAMESPACE)]
|
||||
self.assertEqual(len(tags), 56)
|
||||
self.assertEqual(tags[0], '{%s}complexType' % XSD_NAMESPACE)
|
||||
self.assertListEqual(tags, [x.tag for x in lazy_resource.iter('{%s}complexType' % XSD_NAMESPACE)])
|
||||
|
||||
resource = XMLResource(self.schema_class.meta_schema.source.url)
|
||||
self.assertTrue(resource.is_lazy())
|
||||
def test_xml_resource_iterfind(self):
|
||||
namespaces = {'xs': XSD_NAMESPACE}
|
||||
resource = XMLResource(self.schema_class.meta_schema.source.url, lazy=False)
|
||||
self.assertFalse(resource.is_lazy())
|
||||
lazy_resource = XMLResource(self.schema_class.meta_schema.source.url)
|
||||
self.assertTrue(lazy_resource.is_lazy())
|
||||
|
||||
start_time = time.time()
|
||||
for _ in range(10):
|
||||
for _ in resource.iterfind():
|
||||
pass
|
||||
tl1 = time.time() - start_time
|
||||
self.assertLessEqual(t1, tl1 / 1000.0)
|
||||
self.assertGreaterEqual(t1, tl1 / 10000.0)
|
||||
# Note: Element change with lazy resource so compare only tags
|
||||
|
||||
start_time = time.time()
|
||||
for _ in range(10):
|
||||
for _ in resource.iterfind(path='.'):
|
||||
pass
|
||||
tl2 = time.time() - start_time
|
||||
tags = [x.tag for x in resource.iterfind()]
|
||||
self.assertEqual(len(tags), 1)
|
||||
self.assertEqual(tags[0], '{%s}schema' % XSD_NAMESPACE)
|
||||
self.assertListEqual(tags, [x.tag for x in lazy_resource.iterfind()])
|
||||
|
||||
self.assertLessEqual(t2, tl2 / 80.0)
|
||||
self.assertGreaterEqual(t2, tl2 / 1000.0)
|
||||
tags = [x.tag for x in resource.iterfind(path='.')]
|
||||
self.assertEqual(len(tags), 1)
|
||||
self.assertEqual(tags[0], '{%s}schema' % XSD_NAMESPACE)
|
||||
self.assertListEqual(tags, [x.tag for x in lazy_resource.iterfind(path='.')])
|
||||
|
||||
start_time = time.time()
|
||||
counter3 = 0
|
||||
for _ in resource.iterfind(path='*'):
|
||||
counter3 += 1
|
||||
tl3 = time.time() - start_time
|
||||
self.assertGreaterEqual(tl2, tl3 / counter3 * 10)
|
||||
tags = [x.tag for x in resource.iterfind(path='*')]
|
||||
self.assertEqual(len(tags), 156)
|
||||
self.assertEqual(tags[0], '{%s}annotation' % XSD_NAMESPACE)
|
||||
self.assertListEqual(tags, [x.tag for x in lazy_resource.iterfind(path='*')])
|
||||
|
||||
start_time = time.time()
|
||||
for _ in resource.iterfind(path='. /. / xs:complexType', namespaces={'xs': XSD_NAMESPACE}):
|
||||
pass
|
||||
tl4 = time.time() - start_time
|
||||
self.assertTrue(0.7 < (tl3 / tl4) < 1)
|
||||
tags = [x.tag for x in resource.iterfind('xs:complexType', namespaces)]
|
||||
self.assertEqual(len(tags), 35)
|
||||
self.assertTrue(all(t == '{%s}complexType' % XSD_NAMESPACE for t in tags))
|
||||
self.assertListEqual(tags, [x.tag for x in lazy_resource.iterfind('xs:complexType', namespaces)])
|
||||
|
||||
tags = [x.tag for x in resource.iterfind('. /. / xs:complexType', namespaces)]
|
||||
self.assertEqual(len(tags), 35)
|
||||
self.assertTrue(all(t == '{%s}complexType' % XSD_NAMESPACE for t in tags))
|
||||
self.assertListEqual(tags, [x.tag for x in lazy_resource.iterfind('. /. / xs:complexType', namespaces)])
|
||||
|
||||
def test_xml_resource_get_namespaces(self):
|
||||
with open(self.vh_xml_file) as schema_file:
|
||||
|
|
|
@ -1,32 +0,0 @@
|
|||
<?xml version="1.0" encoding="utf-8" ?>
|
||||
<schema xmlns="http://www.w3.org/2001/XMLSchema">
|
||||
<annotation>
|
||||
<documentation>
|
||||
A schema with puppet types for creating substitute elements.
|
||||
</documentation>
|
||||
</annotation>
|
||||
|
||||
<simpleType name="simple_puppet">
|
||||
<union>
|
||||
<simpleType>
|
||||
<list itemType="float"/>
|
||||
</simpleType>
|
||||
<simpleType>
|
||||
<list itemType="integer"/>
|
||||
</simpleType>
|
||||
<simpleType>
|
||||
<restriction base="float"/>
|
||||
</simpleType>
|
||||
<simpleType>
|
||||
<restriction base="int"/>
|
||||
</simpleType>
|
||||
<simpleType>
|
||||
<restriction base="string"/>
|
||||
</simpleType>
|
||||
</union>
|
||||
</simpleType>
|
||||
|
||||
<complexType name="complex_puppet">
|
||||
<
|
||||
</complexType>
|
||||
</schema>
|
Loading…
Reference in New Issue