diff --git a/pylint.rc b/pylint.rc
index d51fc65e1..7efd90d67 100644
--- a/pylint.rc
+++ b/pylint.rc
@@ -12,7 +12,6 @@ disable=
broad-except,
consider-using-dict-comprehension,
consider-using-set-comprehension,
- consider-using-with,
cyclic-import,
duplicate-code,
fixme,
diff --git a/tests/admin_pages/test_all.py b/tests/admin_pages/test_all.py
index bc8d1dde0..f24dc490c 100644
--- a/tests/admin_pages/test_all.py
+++ b/tests/admin_pages/test_all.py
@@ -115,15 +115,13 @@ def test_admin_for_all(pub):
resp = get_app(pub).get('/backoffice/settings/', status=200)
# check it doesn't work with a non-empty ADMIN_FOR_ALL file
- fd = open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w')
- fd.write('x.x.x.x')
- fd.close()
+ with open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w') as fd:
+ fd.write('x.x.x.x')
resp = get_app(pub).get('/backoffice/settings/', status=302)
# check it works if the file contains the user IP address
- fd = open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w')
- fd.write('127.0.0.1')
- fd.close()
+ with open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w') as fd:
+ fd.write('127.0.0.1')
resp = get_app(pub).get('/backoffice/settings/', status=200)
# check it's also ok if the user is logged in but doesn't have the
diff --git a/tests/admin_pages/test_datasource.py b/tests/admin_pages/test_datasource.py
index 5611f0c35..f8d3e50b2 100644
--- a/tests/admin_pages/test_datasource.py
+++ b/tests/admin_pages/test_datasource.py
@@ -251,9 +251,8 @@ def test_data_sources_view(pub):
# check json
json_file_path = os.path.join(pub.app_dir, 'test.json')
- json_file = open(json_file_path, 'w')
- json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
data_source.data_source = {'type': 'json', 'value': 'file://%s' % json_file_path}
data_source.store()
@@ -263,9 +262,8 @@ def test_data_sources_view(pub):
assert 'foo' in resp.text
# with other attributes
- json_file = open(json_file_path, 'w')
- json.dump({'results': [{'pk': '1', 'label': 'foo'}, {'pk': '2'}]}, json_file)
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump({'results': [{'pk': '1', 'label': 'foo'}, {'pk': '2'}]}, json_file)
data_source.data_attribute = 'results'
data_source.id_attribute = 'pk'
@@ -288,17 +286,16 @@ def test_data_sources_view(pub):
# check geojson
geojson_file_path = os.path.join(pub.app_dir, 'test.geojson')
- geojson_file = open(geojson_file_path, 'w')
- json.dump(
- {
- 'features': [
- {'properties': {'id': '1', 'text': 'foo', 'label': 'foo'}},
- {'properties': {'id': '2', 'text': 'bar', 'label': 'bar'}},
- ]
- },
- geojson_file,
- )
- geojson_file.close()
+ with open(geojson_file_path, 'w') as geojson_file:
+ json.dump(
+ {
+ 'features': [
+ {'properties': {'id': '1', 'text': 'foo', 'label': 'foo'}},
+ {'properties': {'id': '2', 'text': 'bar', 'label': 'bar'}},
+ ]
+ },
+ geojson_file,
+ )
data_source.data_source = {'type': 'geojson', 'value': 'file://%s' % geojson_file_path}
data_source.store()
with HttpRequestsMocking():
diff --git a/tests/admin_pages/test_form.py b/tests/admin_pages/test_form.py
index 46bb17685..94dfe0cdc 100644
--- a/tests/admin_pages/test_form.py
+++ b/tests/admin_pages/test_form.py
@@ -2439,18 +2439,18 @@ def test_form_archive(pub):
resp = resp.click(href='archive')
resp = resp.form.submit('submit')
assert resp.content_type == 'application/x-wcs-archive'
- tf = tarfile.open(fileobj=io.BytesIO(resp.body))
- assert 'formdef' in [x.name for x in tf.getmembers()]
- assert len(tf.getmembers()) == 8 # 7 formdata + 1 formdef
+ with tarfile.open(fileobj=io.BytesIO(resp.body)) as tf:
+ assert 'formdef' in [x.name for x in tf.getmembers()]
+ assert len(tf.getmembers()) == 8 # 7 formdata + 1 formdef
# second archive, it shouldn't get anything (but the formdef)
resp = app.get('/backoffice/forms/1/')
resp = resp.click(href='archive')
resp = resp.form.submit('submit')
assert resp.content_type == 'application/x-wcs-archive'
- tf = tarfile.open(fileobj=io.BytesIO(resp.body))
- assert 'formdef' in [x.name for x in tf.getmembers()]
- assert len(tf.getmembers()) == 1 # 0 formdata + 1 formdef
+ with tarfile.open(fileobj=io.BytesIO(resp.body)) as tf:
+ assert 'formdef' in [x.name for x in tf.getmembers()]
+ assert len(tf.getmembers()) == 1 # 0 formdata + 1 formdef
def test_form_overwrite(pub):
diff --git a/tests/admin_pages/test_settings.py b/tests/admin_pages/test_settings.py
index 587c70770..67becb112 100644
--- a/tests/admin_pages/test_settings.py
+++ b/tests/admin_pages/test_settings.py
@@ -115,8 +115,8 @@ def test_settings_export_import(pub):
assert 'completed' in resp.text
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
- zipf = zipfile.ZipFile(zip_content, 'a')
- filelist = zipf.namelist()
+ with zipfile.ZipFile(zip_content, 'a') as zipf:
+ filelist = zipf.namelist()
assert len(filelist) == 0
# check afterjob ajax call
@@ -166,8 +166,8 @@ def test_settings_export_import(pub):
resp = resp.follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
- zipf = zipfile.ZipFile(zip_content, 'a')
- filelist = zipf.namelist()
+ with zipfile.ZipFile(zip_content, 'a') as zipf:
+ filelist = zipf.namelist()
assert 'formdefs/1' not in filelist
assert 'formdefs_xml/1' in filelist
assert 'carddefs/1' not in filelist
@@ -249,8 +249,8 @@ def test_settings_export_import(pub):
resp = resp.follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
- zipf = zipfile.ZipFile(zip_content, 'a')
- filelist = zipf.namelist()
+ with zipfile.ZipFile(zip_content, 'a') as zipf:
+ filelist = zipf.namelist()
assert 'formdefs_xml/%s' % formdef.id in filelist
assert 'workflows_xml/%s' % workflow.id in filelist
assert 'roles_xml/%s' % role.id not in filelist
@@ -283,8 +283,8 @@ def test_settings_export_import(pub):
resp = resp.follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
- zipf = zipfile.ZipFile(zip_content, 'a')
- filelist = zipf.namelist()
+ with zipfile.ZipFile(zip_content, 'a') as zipf:
+ filelist = zipf.namelist()
assert len([x for x in filelist if 'roles_xml/' in x]) == 0
# check an error is displayed if such an import is then used and roles are
@@ -305,11 +305,13 @@ def test_settings_themes(pub):
# create mock theme
os.mkdir(os.path.join(pub.app_dir, 'themes'))
os.mkdir(os.path.join(pub.app_dir, 'themes', 'test'))
- fd = open(os.path.join(pub.app_dir, 'themes', 'test', 'desc.xml'), 'w')
- fd.write(
- '' '' ' ' ''
- )
- fd.close()
+ with open(os.path.join(pub.app_dir, 'themes', 'test', 'desc.xml'), 'w') as fd:
+ fd.write(
+ ''
+ ''
+ ' '
+ ''
+ )
resp = app.get('/backoffice/settings/themes')
assert 'biglist themes' in resp.text
@@ -831,17 +833,17 @@ def test_settings_theme_download_upload(pub):
resp = app.get('/backoffice/settings/themes')
resp = resp.click('download', index=0)
assert resp.headers['content-type'] == 'application/zip'
- zip_content = io.BytesIO(resp.body)
- zipf = zipfile.ZipFile(zip_content, 'a')
- filelist = zipf.namelist()
- assert 'alto/icon.png' in filelist
- assert 'alto/desc.xml' in filelist
- assert 'alto/template.ezt' in filelist
- assert 'alto/wcs.css' in filelist
- # modify it
- zipf.writestr('alto/foobar.txt', 'XXX')
- zipf.close()
+ zip_content = io.BytesIO(resp.body)
+ with zipfile.ZipFile(zip_content, 'a') as zipf:
+ filelist = zipf.namelist()
+ assert 'alto/icon.png' in filelist
+ assert 'alto/desc.xml' in filelist
+ assert 'alto/template.ezt' in filelist
+ assert 'alto/wcs.css' in filelist
+
+ # modify it
+ zipf.writestr('alto/foobar.txt', 'XXX')
# upload it
resp = app.get('/backoffice/settings/themes')
diff --git a/tests/api/test_custom_view.py b/tests/api/test_custom_view.py
index 1544fed64..94a848d61 100644
--- a/tests/api/test_custom_view.py
+++ b/tests/api/test_custom_view.py
@@ -244,8 +244,8 @@ def test_api_ods_formdata_custom_view(pub, local_user):
# check it now gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
- zipf = zipfile.ZipFile(io.BytesIO(resp.body))
- ods_sheet = ET.parse(zipf.open('content.xml'))
+ with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
+ ods_sheet = ET.parse(zipf.open('content.xml'))
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 11
pub.custom_view_class.wipe()
@@ -258,8 +258,8 @@ def test_api_ods_formdata_custom_view(pub, local_user):
custom_view.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/ods/custom-view', user=local_user))
- zipf = zipfile.ZipFile(io.BytesIO(resp.body))
- ods_sheet = ET.parse(zipf.open('content.xml'))
+ with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
+ ods_sheet = ET.parse(zipf.open('content.xml'))
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 21
diff --git a/tests/api/test_formdata.py b/tests/api/test_formdata.py
index f882c1a28..96264b880 100644
--- a/tests/api/test_formdata.py
+++ b/tests/api/test_formdata.py
@@ -45,12 +45,13 @@ def pub(request, emails):
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
- open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
- '''\
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ fd.write(
+ '''\
[api-secrets]
coucou = 1234
'''
- )
+ )
return pub
@@ -965,8 +966,8 @@ def test_api_ods_formdata(pub, local_user):
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet'
- zipf = zipfile.ZipFile(io.BytesIO(resp.body))
- ods_sheet = ET.parse(zipf.open('content.xml'))
+ with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
+ ods_sheet = ET.parse(zipf.open('content.xml'))
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 311
diff --git a/tests/backoffice_pages/test_all.py b/tests/backoffice_pages/test_all.py
index cfa063fb2..03ce3c815 100644
--- a/tests/backoffice_pages/test_all.py
+++ b/tests/backoffice_pages/test_all.py
@@ -66,14 +66,13 @@ def pub(request, emails):
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- fd.write(
- '''
-[api-secrets]
-coucou = 1234
-'''
- )
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ fd.write(
+ '''
+ [api-secrets]
+ coucou = 1234
+ '''
+ )
return pub
@@ -528,9 +527,8 @@ def test_backoffice_listing_order(pub):
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'default-sort-order', '-last_update_time')
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- pub.site_options.write(fd)
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ pub.site_options.write(fd)
resp = app.get('/backoffice/management/form-title/')
assert resp.text.count('data-link') == 17
@@ -605,9 +603,8 @@ def test_backoffice_channel_column(pub):
if not pub.site_options.has_section('variables'):
pub.site_options.add_section('variables')
pub.site_options.set('variables', 'welco_url', 'xxx')
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- pub.site_options.write(fd)
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ pub.site_options.write(fd)
create_superuser(pub)
create_environment(pub)
@@ -1885,9 +1882,8 @@ def test_backoffice_map(pub):
pub.site_options.add_section('options')
pub.site_options.set('options', 'map-tile-urltemplate', 'https://{s}.tile.example.net/{z}/{x}/{y}.png')
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- pub.site_options.write(fd)
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ pub.site_options.write(fd)
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Plot on a Map')
@@ -2085,17 +2081,17 @@ def test_backoffice_download_as_zip(pub):
resp = app.get('/backoffice/management/form-title/%s/' % number31.id)
resp = resp.click('Download all files as .zip')
zip_content = io.BytesIO(resp.body)
- zipf = zipfile.ZipFile(zip_content, 'a')
- filelist = zipf.namelist()
- assert set(filelist) == {'1_bar', '2_bar'}
- for zipinfo in zipf.infolist():
- content = zipf.read(zipinfo)
- if zipinfo.filename == '1_bar':
- assert content == b'hello world'
- elif zipinfo.filename == '2_bar':
- assert content == b'hello world2'
- else:
- assert False # unknown zip part
+ with zipfile.ZipFile(zip_content, 'a') as zipf:
+ filelist = zipf.namelist()
+ assert set(filelist) == {'1_bar', '2_bar'}
+ for zipinfo in zipf.infolist():
+ content = zipf.read(zipinfo)
+ if zipinfo.filename == '1_bar':
+ assert content == b'hello world'
+ elif zipinfo.filename == '2_bar':
+ assert content == b'hello world2'
+ else:
+ assert False # unknown zip part
def test_backoffice_sidebar_user_template(pub):
@@ -3011,9 +3007,8 @@ def test_global_listing(pub):
if not pub.site_options.has_section('variables'):
pub.site_options.add_section('variables')
pub.site_options.set('variables', 'welco_url', 'xxx')
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- pub.site_options.write(fd)
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ pub.site_options.write(fd)
resp = app.get('/backoffice/management/listing?limit=500')
formdata = formdef.data_class().select(lambda x: x.status == 'wf-new')[0]
@@ -3611,9 +3606,8 @@ def test_per_user_view(pub):
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'per-user-view', 'true')
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- pub.site_options.write(fd)
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ pub.site_options.write(fd)
resp = app.get('/backoffice/management/').follow()
assert 'Per User View' in resp.text
diff --git a/tests/backoffice_pages/test_carddata.py b/tests/backoffice_pages/test_carddata.py
index f0eaaade4..6b4042a5d 100644
--- a/tests/backoffice_pages/test_carddata.py
+++ b/tests/backoffice_pages/test_carddata.py
@@ -35,14 +35,13 @@ def pub(request, emails):
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- fd.write(
- '''
-[api-secrets]
-coucou = 1234
-'''
- )
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ fd.write(
+ '''
+ [api-secrets]
+ coucou = 1234
+ '''
+ )
return pub
diff --git a/tests/backoffice_pages/test_custom_view.py b/tests/backoffice_pages/test_custom_view.py
index 133be3305..0f8784e3a 100644
--- a/tests/backoffice_pages/test_custom_view.py
+++ b/tests/backoffice_pages/test_custom_view.py
@@ -31,14 +31,13 @@ def pub(request, emails):
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- fd.write(
- '''
-[api-secrets]
-coucou = 1234
-'''
- )
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ fd.write(
+ '''
+ [api-secrets]
+ coucou = 1234
+ '''
+ )
return pub
diff --git a/tests/backoffice_pages/test_export.py b/tests/backoffice_pages/test_export.py
index ef627e0f3..cd603cf1f 100644
--- a/tests/backoffice_pages/test_export.py
+++ b/tests/backoffice_pages/test_export.py
@@ -37,14 +37,13 @@ def pub(request, emails):
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- fd.write(
- '''
-[api-secrets]
-coucou = 1234
-'''
- )
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ fd.write(
+ '''
+ [api-secrets]
+ coucou = 1234
+ '''
+ )
return pub
@@ -236,9 +235,8 @@ def test_backoffice_csv_export_channel(pub):
if not pub.site_options.has_section('variables'):
pub.site_options.add_section('variables')
pub.site_options.set('variables', 'welco_url', 'xxx')
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- pub.site_options.write(fd)
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ pub.site_options.write(fd)
create_superuser(pub)
@@ -272,9 +270,8 @@ def test_backoffice_csv_export_channel(pub):
def test_backoffice_csv_export_anonymised(pub):
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- pub.site_options.write(fd)
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ pub.site_options.write(fd)
create_superuser(pub)
@@ -483,8 +480,8 @@ def test_backoffice_ods(pub):
assert 'filename=form-title.ods' in resp.headers['content-disposition']
assert resp.body[:2] == b'PK' # ods has a zip container
- zipf = zipfile.ZipFile(io.BytesIO(resp.body))
- ods_sheet = ET.parse(zipf.open('content.xml'))
+ with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
+ ods_sheet = ET.parse(zipf.open('content.xml'))
# check the ods contains a link to the document
elem = ods_sheet.findall('.//{%s}a' % ods.NS['text'])[0]
assert (
diff --git a/tests/backoffice_pages/test_submission.py b/tests/backoffice_pages/test_submission.py
index 74cf60e0a..250131346 100644
--- a/tests/backoffice_pages/test_submission.py
+++ b/tests/backoffice_pages/test_submission.py
@@ -37,14 +37,13 @@ def pub(request, emails):
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- fd.write(
- '''
-[api-secrets]
-coucou = 1234
-'''
- )
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ fd.write(
+ '''
+ [api-secrets]
+ coucou = 1234
+ '''
+ )
return pub
diff --git a/tests/form_pages/test_all.py b/tests/form_pages/test_all.py
index 9321a80a7..7b87c58c7 100644
--- a/tests/form_pages/test_all.py
+++ b/tests/form_pages/test_all.py
@@ -56,22 +56,21 @@ from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, logi
def assert_equal_zip(stream1, stream2):
- z1 = zipfile.ZipFile(stream1)
- z2 = zipfile.ZipFile(stream2)
- assert set(z1.namelist()) == set(z2.namelist())
- for name in z1.namelist():
- if name == 'styles.xml':
- continue
- if name in ['content.xml', 'meta.xml']:
- t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name)))
- try:
- # >= python 3.8: tostring preserves attribute order; use canonicalize to sort them
- t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2)
- except AttributeError:
- pass
- else:
- t1, t2 = z1.read(name), z2.read(name)
- assert t1 == t2, 'file "%s" differs' % name
+ with zipfile.ZipFile(stream1) as z1, zipfile.ZipFile(stream2) as z2:
+ assert set(z1.namelist()) == set(z2.namelist())
+ for name in z1.namelist():
+ if name == 'styles.xml':
+ continue
+ if name in ['content.xml', 'meta.xml']:
+ t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name)))
+ try:
+ # >= python 3.8: tostring preserves attribute order; use canonicalize to sort them
+ t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2)
+ except AttributeError:
+ pass
+ else:
+ t1, t2 = z1.read(name), z2.read(name)
+ assert t1 == t2, 'file "%s" differs' % name
def pytest_generate_tests(metafunc):
diff --git a/tests/form_pages/test_formdata.py b/tests/form_pages/test_formdata.py
index aca721107..d5f6f6ea6 100644
--- a/tests/form_pages/test_formdata.py
+++ b/tests/form_pages/test_formdata.py
@@ -60,22 +60,21 @@ def teardown_module(module):
def assert_equal_zip(stream1, stream2):
- z1 = zipfile.ZipFile(stream1)
- z2 = zipfile.ZipFile(stream2)
- assert set(z1.namelist()) == set(z2.namelist())
- for name in z1.namelist():
- if name == 'styles.xml':
- continue
- if name in ['content.xml', 'meta.xml']:
- t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name)))
- try:
- # >= python 3.8: tostring preserves attribute order; use canonicalize to sort them
- t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2)
- except AttributeError:
- pass
- else:
- t1, t2 = z1.read(name), z2.read(name)
- assert t1 == t2, 'file "%s" differs' % name
+ with zipfile.ZipFile(stream1) as z1, zipfile.ZipFile(stream2) as z2:
+ assert set(z1.namelist()) == set(z2.namelist())
+ for name in z1.namelist():
+ if name == 'styles.xml':
+ continue
+ if name in ['content.xml', 'meta.xml']:
+ t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name)))
+ try:
+ # >= python 3.8: tostring preserves attribute order; use canonicalize to sort them
+ t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2)
+ except AttributeError:
+ pass
+ else:
+ t1, t2 = z1.read(name), z2.read(name)
+ assert t1 == t2, 'file "%s" differs' % name
def test_formdata_attachment_download(pub):
diff --git a/tests/test_categories.py b/tests/test_categories.py
index 78e8be3b2..8ad635280 100644
--- a/tests/test_categories.py
+++ b/tests/test_categories.py
@@ -123,9 +123,8 @@ def test_load_old_pickle():
test.description = 'Hello world'
os.mkdir(os.path.join(pub.app_dir, 'categories'))
- fd = open(os.path.join(pub.app_dir, 'categories', '1'), 'wb')
- pickle.dump(test, fd)
- fd.close()
+ with open(os.path.join(pub.app_dir, 'categories', '1'), 'wb') as fd:
+ pickle.dump(test, fd)
test2 = Category.get(1)
assert test.id == test2.id
diff --git a/tests/test_datasource.py b/tests/test_datasource.py
index 64329e2e3..e40162c0c 100644
--- a/tests/test_datasource.py
+++ b/tests/test_datasource.py
@@ -29,12 +29,13 @@ def pub(request):
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
- open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
- '''
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ fd.write(
+ '''
[wscall-secrets]
api.example.com = 1234
'''
- )
+ )
pub.load_site_options()
@@ -192,44 +193,38 @@ def test_json_datasource(pub, requests_pub, http_requests):
# invalid json file
get_request().datasources_cache = {}
- json_file = open(json_file_path, 'wb')
- json_file.write(codecs.encode(b'foobar', 'zlib_codec'))
- json_file.close()
+ with open(json_file_path, 'wb') as json_file:
+ json_file.write(codecs.encode(b'foobar', 'zlib_codec'))
assert data_sources.get_items(datasource) == []
# empty json file
get_request().datasources_cache = {}
- json_file = open(json_file_path, 'w')
- json.dump({}, json_file)
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump({}, json_file)
assert data_sources.get_items(datasource) == []
# unrelated json file
get_request().datasources_cache = {}
- json_file = open(json_file_path, 'w')
- json.dump('foobar', json_file)
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump('foobar', json_file)
assert data_sources.get_items(datasource) == []
# another unrelated json file
get_request().datasources_cache = {}
- json_file = open(json_file_path, 'w')
- json.dump({'data': 'foobar'}, json_file)
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump({'data': 'foobar'}, json_file)
assert data_sources.get_items(datasource) == []
# json file not using dictionaries
get_request().datasources_cache = {}
- json_file = open(json_file_path, 'w')
- json.dump({'data': [['1', 'foo'], ['2', 'bar']]}, json_file)
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump({'data': [['1', 'foo'], ['2', 'bar']]}, json_file)
assert data_sources.get_items(datasource) == []
# a good json file
get_request().datasources_cache = {}
- json_file = open(json_file_path, 'w')
- json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar'}),
@@ -241,12 +236,11 @@ def test_json_datasource(pub, requests_pub, http_requests):
# a json file with additional keys
get_request().datasources_cache = {}
- json_file = open(json_file_path, 'w')
- json.dump(
- {'data': [{'id': '1', 'text': 'foo', 'more': 'xxx'}, {'id': '2', 'text': 'bar', 'more': 'yyy'}]},
- json_file,
- )
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump(
+ {'data': [{'id': '1', 'text': 'foo', 'more': 'xxx'}, {'id': '2', 'text': 'bar', 'more': 'yyy'}]},
+ json_file,
+ )
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'}),
@@ -299,9 +293,8 @@ def test_json_datasource(pub, requests_pub, http_requests):
# a json file with integer as 'id'
get_request().datasources_cache = {}
- json_file = open(json_file_path, 'w')
- json.dump({'data': [{'id': 1, 'text': 'foo'}, {'id': 2, 'text': 'bar'}]}, json_file)
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump({'data': [{'id': 1, 'text': 'foo'}, {'id': 2, 'text': 'bar'}]}, json_file)
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': 1, 'text': 'foo'}),
('2', 'bar', '2', {'id': 2, 'text': 'bar'}),
@@ -313,9 +306,8 @@ def test_json_datasource(pub, requests_pub, http_requests):
# a json file with empty or no text values
get_request().datasources_cache = {}
- json_file = open(json_file_path, 'w')
- json.dump({'data': [{'id': '1', 'text': ''}, {'id': '2'}]}, json_file)
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump({'data': [{'id': '1', 'text': ''}, {'id': '2'}]}, json_file)
assert data_sources.get_items(datasource) == [
('1', '', '1', {'id': '1', 'text': ''}),
('2', '2', '2', {'id': '2', 'text': '2'}),
@@ -327,18 +319,16 @@ def test_json_datasource(pub, requests_pub, http_requests):
# a json file with empty or no id
get_request().datasources_cache = {}
- json_file = open(json_file_path, 'w')
- json.dump({'data': [{'id': '', 'text': 'foo'}, {'text': 'bar'}, {'id': None}]}, json_file)
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump({'data': [{'id': '', 'text': 'foo'}, {'text': 'bar'}, {'id': None}]}, json_file)
assert data_sources.get_items(datasource) == []
assert data_sources.get_structured_items(datasource) == []
# specify data_attribute
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'data_attribute': 'results'}
get_request().datasources_cache = {}
- json_file = open(json_file_path, 'w')
- json.dump({'results': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump({'results': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo'},
{'id': '2', 'text': 'bar'},
@@ -351,9 +341,8 @@ def test_json_datasource(pub, requests_pub, http_requests):
# specify id_attribute
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'id_attribute': 'pk'}
get_request().datasources_cache = {}
- json_file = open(json_file_path, 'w')
- json.dump({'data': [{'pk': '1', 'text': 'foo'}, {'pk': '2', 'text': 'bar'}]}, json_file)
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump({'data': [{'pk': '1', 'text': 'foo'}, {'pk': '2', 'text': 'bar'}]}, json_file)
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'pk': '1'},
{'id': '2', 'text': 'bar', 'pk': '2'},
@@ -366,9 +355,8 @@ def test_json_datasource(pub, requests_pub, http_requests):
# specify text_attribute
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'text_attribute': 'label'}
get_request().datasources_cache = {}
- json_file = open(json_file_path, 'w')
- json.dump({'data': [{'id': '1', 'label': 'foo'}, {'id': '2', 'label': 'bar'}]}, json_file)
- json_file.close()
+ with open(json_file_path, 'w') as json_file:
+ json.dump({'data': [{'id': '1', 'label': 'foo'}, {'id': '2', 'label': 'bar'}]}, json_file)
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'label': 'foo'},
{'id': '2', 'text': 'bar', 'label': 'bar'},
@@ -508,45 +496,40 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# invalid geojson file
get_request().datasources_cache = {}
- geojson_file = open(geojson_file_path, 'wb')
- geojson_file.write(codecs.encode(b'foobar', 'zlib_codec'))
- geojson_file.close()
+ with open(geojson_file_path, 'wb') as geojson_file:
+ geojson_file.write(codecs.encode(b'foobar', 'zlib_codec'))
assert data_sources.get_items(datasource) == []
# empty geojson file
get_request().datasources_cache = {}
- geojson_file = open(geojson_file_path, 'w')
- json.dump({}, geojson_file)
- geojson_file.close()
+ with open(geojson_file_path, 'w') as geojson_file:
+ json.dump({}, geojson_file)
assert data_sources.get_items(datasource) == []
# unrelated geojson file
get_request().datasources_cache = {}
- geojson_file = open(geojson_file_path, 'w')
- json.dump('foobar', geojson_file)
- geojson_file.close()
+ with open(geojson_file_path, 'w') as geojson_file:
+ json.dump('foobar', geojson_file)
assert data_sources.get_items(datasource) == []
# another unrelated geojson file
get_request().datasources_cache = {}
- geojson_file = open(geojson_file_path, 'w')
- json.dump({'features': 'foobar'}, geojson_file)
- geojson_file.close()
+ with open(geojson_file_path, 'w') as geojson_file:
+ json.dump({'features': 'foobar'}, geojson_file)
assert data_sources.get_items(datasource) == []
# a good geojson file
get_request().datasources_cache = {}
- geojson_file = open(geojson_file_path, 'w')
- json.dump(
- {
- 'features': [
- {'properties': {'id': '1', 'text': 'foo'}},
- {'properties': {'id': '2', 'text': 'bar'}},
- ]
- },
- geojson_file,
- )
- geojson_file.close()
+ with open(geojson_file_path, 'w') as geojson_file:
+ json.dump(
+ {
+ 'features': [
+ {'properties': {'id': '1', 'text': 'foo'}},
+ {'properties': {'id': '2', 'text': 'bar'}},
+ ]
+ },
+ geojson_file,
+ )
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'properties': {'id': '1', 'text': 'foo'}}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'properties': {'id': '2', 'text': 'bar'}}),
@@ -558,17 +541,16 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# a geojson file with additional keys
get_request().datasources_cache = {}
- geojson_file = open(geojson_file_path, 'w')
- json.dump(
- {
- 'features': [
- {'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}},
- {'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}},
- ]
- },
- geojson_file,
- )
- geojson_file.close()
+ with open(geojson_file_path, 'w') as geojson_file:
+ json.dump(
+ {
+ 'features': [
+ {'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}},
+ {'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}},
+ ]
+ },
+ geojson_file,
+ )
assert data_sources.get_items(datasource) == [
(
'1',
@@ -671,12 +653,16 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# a geojson file with integer as 'id'
get_request().datasources_cache = {}
- geojson_file = open(geojson_file_path, 'w')
- json.dump(
- {'features': [{'properties': {'id': 1, 'text': 'foo'}}, {'properties': {'id': 2, 'text': 'bar'}}]},
- geojson_file,
- )
- geojson_file.close()
+ with open(geojson_file_path, 'w') as geojson_file:
+ json.dump(
+ {
+ 'features': [
+ {'properties': {'id': 1, 'text': 'foo'}},
+ {'properties': {'id': 2, 'text': 'bar'}},
+ ]
+ },
+ geojson_file,
+ )
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': 1, 'text': 'foo', 'properties': {'id': 1, 'text': 'foo'}}),
('2', 'bar', '2', {'id': 2, 'text': 'bar', 'properties': {'id': 2, 'text': 'bar'}}),
@@ -688,11 +674,10 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# a geojson file with empty or no text values
get_request().datasources_cache = {}
- geojson_file = open(geojson_file_path, 'w')
- json.dump(
- {'features': [{'properties': {'id': '1', 'text': ''}}, {'properties': {'id': '2'}}]}, geojson_file
- )
- geojson_file.close()
+ with open(geojson_file_path, 'w') as geojson_file:
+ json.dump(
+ {'features': [{'properties': {'id': '1', 'text': ''}}, {'properties': {'id': '2'}}]}, geojson_file
+ )
assert data_sources.get_items(datasource) == [
('1', '1', '1', {'id': '1', 'text': '1', 'properties': {'id': '1', 'text': ''}}),
('2', '2', '2', {'id': '2', 'text': '2', 'properties': {'id': '2'}}),
@@ -704,35 +689,33 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# a geojson file with empty or no id
get_request().datasources_cache = {}
- geojson_file = open(geojson_file_path, 'w')
- json.dump(
- {
- 'features': [
- {'properties': {'id': '', 'text': 'foo'}},
- {'properties': {'text': 'bar'}},
- {'properties': {'id': None}},
- ]
- },
- geojson_file,
- )
- geojson_file.close()
+ with open(geojson_file_path, 'w') as geojson_file:
+ json.dump(
+ {
+ 'features': [
+ {'properties': {'id': '', 'text': 'foo'}},
+ {'properties': {'text': 'bar'}},
+ {'properties': {'id': None}},
+ ]
+ },
+ geojson_file,
+ )
assert data_sources.get_items(datasource) == []
assert data_sources.get_structured_items(datasource) == []
# specify id_property
datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'id_property': 'gid'}
get_request().datasources_cache = {}
- geojson_file = open(geojson_file_path, 'w')
- json.dump(
- {
- 'features': [
- {'properties': {'gid': '1', 'text': 'foo'}},
- {'properties': {'gid': '2', 'text': 'bar'}},
- ]
- },
- geojson_file,
- )
- geojson_file.close()
+ with open(geojson_file_path, 'w') as geojson_file:
+ json.dump(
+ {
+ 'features': [
+ {'properties': {'gid': '1', 'text': 'foo'}},
+ {'properties': {'gid': '2', 'text': 'bar'}},
+ ]
+ },
+ geojson_file,
+ )
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'properties': {'gid': '1', 'text': 'foo'}},
{'id': '2', 'text': 'bar', 'properties': {'gid': '2', 'text': 'bar'}},
@@ -745,17 +728,16 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# check with feature IDs
get_request().datasources_cache = {}
- geojson_file = open(geojson_file_path, 'w')
- json.dump(
- {
- 'features': [
- {'id': '1', 'properties': {'text': 'foo'}},
- {'id': '2', 'properties': {'text': 'bar'}},
- ]
- },
- geojson_file,
- )
- geojson_file.close()
+ with open(geojson_file_path, 'w') as geojson_file:
+ json.dump(
+ {
+ 'features': [
+ {'id': '1', 'properties': {'text': 'foo'}},
+ {'id': '2', 'properties': {'text': 'bar'}},
+ ]
+ },
+ geojson_file,
+ )
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'properties': {'text': 'foo'}},
{'id': '2', 'text': 'bar', 'properties': {'text': 'bar'}},
@@ -768,17 +750,16 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
'label_template_property': '{{ id }}: {{ text }}',
}
get_request().datasources_cache = {}
- geojson_file = open(geojson_file_path, 'w')
- json.dump(
- {
- 'features': [
- {'properties': {'id': '1', 'text': 'foo'}},
- {'properties': {'id': '2', 'text': 'bar'}},
- ]
- },
- geojson_file,
- )
- geojson_file.close()
+ with open(geojson_file_path, 'w') as geojson_file:
+ json.dump(
+ {
+ 'features': [
+ {'properties': {'id': '1', 'text': 'foo'}},
+ {'properties': {'id': '2', 'text': 'bar'}},
+ ]
+ },
+ geojson_file,
+ )
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': '1: foo', 'properties': {'id': '1', 'text': 'foo'}},
{'id': '2', 'text': '2: bar', 'properties': {'id': '2', 'text': 'bar'}},
@@ -801,17 +782,16 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# unknown property or empty value
datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'label_template_property': '{{ label }}'}
get_request().datasources_cache = {}
- geojson_file = open(geojson_file_path, 'w')
- json.dump(
- {
- 'features': [
- {'properties': {'id': '1', 'text': 'foo', 'label': ''}},
- {'properties': {'id': '2', 'text': 'bar'}},
- ]
- },
- geojson_file,
- )
- geojson_file.close()
+ with open(geojson_file_path, 'w') as geojson_file:
+ json.dump(
+ {
+ 'features': [
+ {'properties': {'id': '1', 'text': 'foo', 'label': ''}},
+ {'properties': {'id': '2', 'text': 'bar'}},
+ ]
+ },
+ geojson_file,
+ )
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': '1', 'properties': {'id': '1', 'text': 'foo', 'label': ''}},
{'id': '2', 'text': '2', 'properties': {'id': '2', 'text': 'bar'}},
diff --git a/tests/test_ezt.py b/tests/test_ezt.py
index 1cf2b288d..074709333 100644
--- a/tests/test_ezt.py
+++ b/tests/test_ezt.py
@@ -217,9 +217,8 @@ def test_dict_index():
def test_ezt_script(pub):
os.mkdir(os.path.join(pub.app_dir, 'scripts'))
- fd = open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w')
- fd.write('''result = "Hello %s" % ("world" if not args else args[0])''')
- fd.close()
+ with open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') as fd:
+ fd.write('''result = "Hello %s" % ("world" if not args else args[0])''')
vars = {'script': ScriptsSubstitutionProxy()}
template = Template()
diff --git a/tests/test_hobo.py b/tests/test_hobo.py
index 93711c235..1fbe606d2 100644
--- a/tests/test_hobo.py
+++ b/tests/test_hobo.py
@@ -375,11 +375,10 @@ def test_configure_authentication_methods(http_requests):
def test_deploy():
cleanup()
WcsPublisher.APP_DIR = alt_tempdir
- fd = open(os.path.join(alt_tempdir, 'hobo.json'), 'w')
- hobo_json = copy.deepcopy(HOBO_JSON)
- del hobo_json['services'][1] # authentic
- fd.write(json.dumps(HOBO_JSON))
- fd.close()
+ with open(os.path.join(alt_tempdir, 'hobo.json'), 'w') as fd:
+ hobo_json = copy.deepcopy(HOBO_JSON)
+ del hobo_json['services'][1] # authentic
+ fd.write(json.dumps(HOBO_JSON))
hobo_cmd = CmdCheckHobos()
base_options = {}
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
@@ -406,11 +405,10 @@ def test_deploy():
def test_configure_postgresql():
cleanup()
WcsPublisher.APP_DIR = alt_tempdir
- fd = open(os.path.join(alt_tempdir, 'hobo.json'), 'w')
- hobo_json = copy.deepcopy(HOBO_JSON)
- del hobo_json['services'][1] # authentic
- fd.write(json.dumps(HOBO_JSON))
- fd.close()
+ with open(os.path.join(alt_tempdir, 'hobo.json'), 'w') as fd:
+ hobo_json = copy.deepcopy(HOBO_JSON)
+ del hobo_json['services'][1] # authentic
+ fd.write(json.dumps(HOBO_JSON))
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
@@ -424,10 +422,9 @@ def test_configure_postgresql():
)
assert os.path.exists(os.path.join(alt_tempdir, 'wcs.example.net'))
- fd = open(os.path.join(alt_tempdir, 'wcs.example.net', 'site-options.cfg'), 'w')
- fd.write('[options]\n')
- fd.write('postgresql = true\n')
- fd.close()
+ with open(os.path.join(alt_tempdir, 'wcs.example.net', 'site-options.cfg'), 'w') as fd:
+ fd.write('[options]\n')
+ fd.write('postgresql = true\n')
cleanup()
diff --git a/tests/test_misc.py b/tests/test_misc.py
index 3e41a5934..5b9850332 100644
--- a/tests/test_misc.py
+++ b/tests/test_misc.py
@@ -209,25 +209,22 @@ def test_script_substitution_variable():
assert variables['script'].hello_world()
os.mkdir(os.path.join(pub.app_dir, 'scripts'))
- fd = open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w')
- fd.write('"""docstring"""\nresult = "hello world"')
- fd.close()
+ with open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') as fd:
+ fd.write('"""docstring"""\nresult = "hello world"')
assert variables['script'].hello_world() == 'hello world'
assert Script('hello_world').__doc__ == 'docstring'
os.mkdir(os.path.join(pub.APP_DIR, 'scripts'))
- fd = open(os.path.join(pub.APP_DIR, 'scripts', 'hello_world.py'), 'w')
- fd.write('result = "hello global world"')
- fd.close()
+ with open(os.path.join(pub.APP_DIR, 'scripts', 'hello_world.py'), 'w') as fd:
+ fd.write('result = "hello global world"')
assert variables['script'].hello_world() == 'hello world'
os.unlink(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'))
assert variables['script'].hello_world() == 'hello global world'
- fd = open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w')
- fd.write('result = site_url')
- fd.close()
+ with open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') as fd:
+ fd.write('result = site_url')
assert variables['script'].hello_world() == 'http://example.net'
@@ -404,9 +401,8 @@ def test_pagination():
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'default-page-size', '500')
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- pub.site_options.write(fd)
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ pub.site_options.write(fd)
assert '(1-101/1000)' in get_texts(pagination_links(0, 101, 1000))
assert '(1-500/1000)' in get_texts(pagination_links(0, 500, 1000))
assert '(1-500/1000)' in get_texts(pagination_links(0, 501, 1000)) # 500 is the max
@@ -705,6 +701,24 @@ def test_related_field_repr():
def test_find_vc_version():
import wcs.qommon.admin.menu
+ def mocked_popen(*args, **kwargs):
+ class Process:
+ returncode = 0
+
+ def communicate(self):
+ return (
+ b'''Desired=Unknown/Install/Remove/Purge/Hold
+| Status=Not/Inst/Conf-files/Unpacked/halF-conf/Half-inst/trig-aWait/Trig-pend
+|/ Err?=(none)/Reinst-required (Status,Err: uppercase=bad)
+||/ Name Version Architecture Description
++++-==============-===============-============-=================================================
+ii wcs 5.71-1~eob100+1 all web application to design and set up online forms
+''',
+ '',
+ )
+
+ return Process()
+
with mock.patch('os.path.exists') as os_path_exists, mock.patch('subprocess.Popen') as popen:
def mocked_os_path_exists(path):
@@ -712,25 +726,9 @@ def test_find_vc_version():
os_path_exists.side_effect = mocked_os_path_exists
- def mocked_popen(*args, **kwargs):
- class Process:
- returncode = 0
-
- def communicate(self):
- return (
- b'''Desired=Unknown/Install/Remove/Purge/Hold
-| Status=Not/Inst/Conf-files/Unpacked/halF-conf/Half-inst/trig-aWait/Trig-pend
-|/ Err?=(none)/Reinst-required (Status,Err: uppercase=bad)
-||/ Name Version Architecture Description
-+++-==============-===============-============-=================================================
-ii wcs 5.71-1~eob100+1 all web application to design and set up online forms
-''',
- '',
- )
-
- return Process()
-
- popen.side_effect = mocked_popen
+ handle = mock.MagicMock()
+ handle.__enter__.side_effect = mocked_popen
+ popen.return_value = handle
version = wcs.qommon.admin.menu._find_vc_version()
assert version == 'wcs 5.71-1~eob100+1 (Debian)'
diff --git a/tests/test_publisher.py b/tests/test_publisher.py
index c9659b2da..b7a81b380 100644
--- a/tests/test_publisher.py
+++ b/tests/test_publisher.py
@@ -177,9 +177,8 @@ def test_import_config_zip():
pub.write_cfg()
c = io.BytesIO()
- z = zipfile.ZipFile(c, 'w')
- z.writestr('config.pck', pickle.dumps({'language': {'language': 'fr'}, 'whatever': ['a', 'b', 'c']}))
- z.close()
+ with zipfile.ZipFile(c, 'w') as z:
+ z.writestr('config.pck', pickle.dumps({'language': {'language': 'fr'}, 'whatever': ['a', 'b', 'c']}))
c.seek(0)
pub.import_zip(c)
@@ -188,11 +187,10 @@ def test_import_config_zip():
assert pub.cfg['sp'] == {'what': 'ever'}
c = io.BytesIO()
- z = zipfile.ZipFile(c, 'w')
- z.writestr(
- 'config.json', json.dumps({'language': {'language': 'en'}, 'whatever2': ['a', 'b', {'c': 'd'}]})
- )
- z.close()
+ with zipfile.ZipFile(c, 'w') as z:
+ z.writestr(
+ 'config.json', json.dumps({'language': {'language': 'en'}, 'whatever2': ['a', 'b', {'c': 'd'}]})
+ )
c.seek(0)
pub.import_zip(c)
diff --git a/tests/test_saml_auth.py b/tests/test_saml_auth.py
index 6de1f8271..fa49df00c 100644
--- a/tests/test_saml_auth.py
+++ b/tests/test_saml_auth.py
@@ -81,19 +81,16 @@ def setup_idps(pub, idp_number=1):
'role': lasso.PROVIDER_ROLE_IDP,
}
filename = pub.cfg['idp'][base_id]['metadata']
- fd = open(os.path.join(pub.app_dir, filename), 'w')
- fd.write(metadata)
- fd.close()
+ with open(os.path.join(pub.app_dir, filename), 'w') as fd:
+ fd.write(metadata)
filename = pub.cfg['idp'][base_id]['publickey']
- fd = open(os.path.join(pub.app_dir, filename), 'w')
- fd.write(idp_publickey)
- fd.close()
+ with open(os.path.join(pub.app_dir, filename), 'w') as fd:
+ fd.write(idp_publickey)
filename = pub.cfg['idp'][base_id]['publickey'].replace('public', 'private')
- fd = open(os.path.join(pub.app_dir, filename), 'w')
- fd.write(idp_privatekey)
- fd.close()
+ with open(os.path.join(pub.app_dir, filename), 'w') as fd:
+ fd.write(idp_privatekey)
pub.write_cfg()
diff --git a/tests/test_workflows.py b/tests/test_workflows.py
index 65f9163a7..9a9469cea 100644
--- a/tests/test_workflows.py
+++ b/tests/test_workflows.py
@@ -3506,8 +3506,8 @@ def test_export_to_model_image(pub):
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].base_filename == 'template.odt'
- zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
- zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
+ with zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r') as zfile:
+ zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
# check the image has been replaced by the one from the formdata
assert zinfo.file_size == len(image_data)
@@ -3521,8 +3521,8 @@ def test_export_to_model_image(pub):
item.perform(formdata)
- zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
- zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
+ with zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r') as zfile:
+ zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
# check the original image has been left
assert zinfo.file_size == 580
@@ -3574,8 +3574,8 @@ def test_export_to_model_backoffice_field(pub):
fbo1 = formdata.data['bo1']
assert fbo1.base_filename == 'template.odt'
assert fbo1.content_type == 'application/octet-stream'
- zfile = zipfile.ZipFile(fbo1.get_file())
- assert b'foo-export-to-bofile' in zfile.read('content.xml')
+ with zipfile.ZipFile(fbo1.get_file()) as zfile:
+ assert b'foo-export-to-bofile' in zfile.read('content.xml')
# no more 'bo1' backoffice field: do nothing
formdata = formdef.data_class()()
diff --git a/tests/utilities.py b/tests/utilities.py
index f1dcb949c..00fd23cfa 100644
--- a/tests/utilities.py
+++ b/tests/utilities.py
@@ -108,19 +108,18 @@ def create_temporary_pub(sql_mode=False, templates_mode=False, lazy_mode=False):
created = True
# always reset site-options.cfg
- fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
- fd.write('[wscall-secrets]\n')
- fd.write('idp.example.net = BAR\n')
- fd.write('\n')
- fd.write('[options]\n')
- fd.write('formdef-captcha-option = true\n')
- fd.write('formdef-appearance-keywords = true\n')
- fd.write('workflow-resubmit-action = true\n')
- if lazy_mode:
- fd.write('force-lazy-mode = true\n')
- if sql_mode:
- fd.write('postgresql = true\n')
- fd.close()
+ with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
+ fd.write('[wscall-secrets]\n')
+ fd.write('idp.example.net = BAR\n')
+ fd.write('\n')
+ fd.write('[options]\n')
+ fd.write('formdef-captcha-option = true\n')
+ fd.write('formdef-appearance-keywords = true\n')
+ fd.write('workflow-resubmit-action = true\n')
+ if lazy_mode:
+ fd.write('force-lazy-mode = true\n')
+ if sql_mode:
+ fd.write('postgresql = true\n')
# make sure site options are not cached
pub.site_options = None
diff --git a/wcs/admin/forms.py b/wcs/admin/forms.py
index 3802d6884..293608ee4 100644
--- a/wcs/admin/forms.py
+++ b/wcs/admin/forms.py
@@ -1519,11 +1519,13 @@ class FormDefPage(Directory):
all_forms = [x for x in all_forms if x.last_update_time < date]
self.fd = io.BytesIO()
- t = tarfile.open('wcs.tar.gz', 'w:gz', fileobj=self.fd)
- t.add(self.formdef.get_object_filename(), 'formdef')
- for formdata in all_forms:
- t.add(formdata.get_object_filename(), '%s/%s' % (self.formdef.url_name, str(formdata.id)))
- t.close()
+ with tarfile.open('wcs.tar.gz', 'w:gz', fileobj=self.fd) as t:
+ t.add(self.formdef.get_object_filename(), 'formdef')
+ for formdata in all_forms:
+ t.add(
+ formdata.get_object_filename(),
+ '%s/%s' % (self.formdef.url_name, str(formdata.id)),
+ )
if form.get_widget('keep').parse() is False:
for f in all_forms:
diff --git a/wcs/admin/settings.py b/wcs/admin/settings.py
index 61520a4a3..5396f620f 100644
--- a/wcs/admin/settings.py
+++ b/wcs/admin/settings.py
@@ -848,12 +848,11 @@ class SettingsDirectory(QommonSettingsDirectory):
parent_theme_directory = os.path.dirname(theme_directory)
c = io.BytesIO()
- z = zipfile.ZipFile(c, 'w')
- for base, dummy, filenames in os.walk(theme_directory):
- basetheme = base[len(parent_theme_directory) + 1 :]
- for filename in filenames:
- z.write(os.path.join(base, filename), os.path.join(basetheme, filename))
- z.close()
+ with zipfile.ZipFile(c, 'w') as z:
+ for base, dummy, filenames in os.walk(theme_directory):
+ basetheme = base[len(parent_theme_directory) + 1 :]
+ for filename in filenames:
+ z.write(os.path.join(base, filename), os.path.join(basetheme, filename))
response = get_response()
response.set_content_type('application/zip')
@@ -896,36 +895,36 @@ class SettingsDirectory(QommonSettingsDirectory):
def install_theme_from_file(self, fp):
try:
- z = zipfile.ZipFile(fp, 'r')
+ with zipfile.ZipFile(fp, 'r') as z:
+ theme_dir = os.path.join(get_publisher().app_dir, 'themes')
+ filename_list = [x for x in z.namelist() if x[0] != '/' and x[-1] != '/']
+ if len(filename_list) == 0:
+ get_session().message = ('error', _('Empty theme file.'))
+ return redirect('themes')
+ theme_name = filename_list[0].split('/')[0]
+ if ('%s/desc.xml' % theme_name) not in filename_list:
+ get_session().message = ('error', _('Theme is missing a desc.xml file.'))
+ return redirect('themes')
+ desc_xml = z.read('%s/desc.xml' % theme_name)
+ theme_dict = template.get_theme_dict(io.StringIO(force_text(desc_xml)))
+ if theme_dict.get('name') != theme_name:
+ get_session().message = ('error', _('desc.xml is missing a name attribute.'))
+ return redirect('themes')
+ if os.path.exists(os.path.join(theme_dir, theme_name)):
+ shutil.rmtree(os.path.join(theme_dir, theme_name))
+ for f in z.namelist():
+ if f[-1] == '/':
+ continue
+ path = os.path.join(theme_dir, f)
+ data = z.read(f)
+ if not os.path.exists(os.path.dirname(path)):
+ os.makedirs(os.path.dirname(path))
+ with open(path, 'wb') as _f:
+ _f.write(data)
+ return redirect('themes')
except Exception as e:
get_session().message = ('error', _('Failed to read theme file. (%s)') % str(e))
return redirect('themes')
- theme_dir = os.path.join(get_publisher().app_dir, 'themes')
- filename_list = [x for x in z.namelist() if x[0] != '/' and x[-1] != '/']
- if len(filename_list) == 0:
- get_session().message = ('error', _('Empty theme file.'))
- return redirect('themes')
- theme_name = filename_list[0].split('/')[0]
- if ('%s/desc.xml' % theme_name) not in filename_list:
- get_session().message = ('error', _('Theme is missing a desc.xml file.'))
- return redirect('themes')
- desc_xml = z.read('%s/desc.xml' % theme_name)
- theme_dict = template.get_theme_dict(io.StringIO(force_text(desc_xml)))
- if theme_dict.get('name') != theme_name:
- get_session().message = ('error', _('desc.xml is missing a name attribute.'))
- return redirect('themes')
- if os.path.exists(os.path.join(theme_dir, theme_name)):
- shutil.rmtree(os.path.join(theme_dir, theme_name))
- for f in z.namelist():
- if f[-1] == '/':
- continue
- path = os.path.join(theme_dir, f)
- data = z.read(f)
- if not os.path.exists(os.path.dirname(path)):
- os.makedirs(os.path.dirname(path))
- open(path, 'wb').write(data)
- z.close()
- return redirect('themes')
def install_theme_from_url(self, url):
try:
@@ -1028,83 +1027,82 @@ class SettingsDirectory(QommonSettingsDirectory):
def export(self, job):
c = io.BytesIO()
- z = zipfile.ZipFile(c, 'w')
- for d in self.dirs:
- if d not in (
- 'categories',
- 'carddef_categories',
- 'wscalls',
- 'mail-templates',
- 'apiaccess',
- ):
- continue
- path = os.path.join(self.app_dir, d)
- if not os.path.exists(path):
- continue
- for f in os.listdir(path):
- if f in ('.indexes', '.max_id'):
+ with zipfile.ZipFile(c, 'w') as z:
+ for d in self.dirs:
+ if d not in (
+ 'categories',
+ 'carddef_categories',
+ 'wscalls',
+ 'mail-templates',
+ 'apiaccess',
+ ):
continue
- z.write(os.path.join(path, f), os.path.join(d, f))
- if 'datasources' in self.dirs:
- for ds in NamedDataSource.select():
- if ds.external == 'agenda':
+ path = os.path.join(self.app_dir, d)
+ if not os.path.exists(path):
continue
- node = ds.export_to_xml(include_id=True)
- misc.indent_xml(node)
- z.writestr(
- os.path.join('datasources', str(ds.id)),
- ET.tostring(node),
- )
- if 'formdefs' in self.dirs:
- for formdef in FormDef.select():
- node = formdef.export_to_xml(include_id=True)
- misc.indent_xml(node)
- z.writestr(
- os.path.join('formdefs_xml', str(formdef.id)),
- b'\n' + ET.tostring(node),
- )
- if 'carddefs' in self.dirs:
- for formdef in CardDef.select():
- node = formdef.export_to_xml(include_id=True)
- misc.indent_xml(node)
- z.writestr(
- os.path.join('carddefs_xml', str(formdef.id)),
- b'\n' + ET.tostring(node),
- )
- if 'workflows' in self.dirs:
- for workflow in Workflow.select():
- node = workflow.export_to_xml(include_id=True)
- misc.indent_xml(node)
- z.writestr(
- os.path.join('workflows_xml', str(workflow.id)),
- b'\n' + ET.tostring(node),
- )
- if 'blockdefs' in self.dirs:
- for blockdef in BlockDef.select():
- node = blockdef.export_to_xml(include_id=True)
- misc.indent_xml(node)
- z.writestr(
- os.path.join('blockdefs_xml', str(blockdef.id)),
- b'\n' + ET.tostring(node),
- )
- if 'roles' in self.dirs:
- for role in get_publisher().role_class.select():
- node = role.export_to_xml(include_id=True)
- misc.indent_xml(node)
- z.writestr(
- os.path.join('roles_xml', str(role.id)),
- b'\n' + ET.tostring(node),
- )
+ for f in os.listdir(path):
+ if f in ('.indexes', '.max_id'):
+ continue
+ z.write(os.path.join(path, f), os.path.join(d, f))
+ if 'datasources' in self.dirs:
+ for ds in NamedDataSource.select():
+ if ds.external == 'agenda':
+ continue
+ node = ds.export_to_xml(include_id=True)
+ misc.indent_xml(node)
+ z.writestr(
+ os.path.join('datasources', str(ds.id)),
+ ET.tostring(node),
+ )
+ if 'formdefs' in self.dirs:
+ for formdef in FormDef.select():
+ node = formdef.export_to_xml(include_id=True)
+ misc.indent_xml(node)
+ z.writestr(
+ os.path.join('formdefs_xml', str(formdef.id)),
+ b'\n' + ET.tostring(node),
+ )
+ if 'carddefs' in self.dirs:
+ for formdef in CardDef.select():
+ node = formdef.export_to_xml(include_id=True)
+ misc.indent_xml(node)
+ z.writestr(
+ os.path.join('carddefs_xml', str(formdef.id)),
+ b'\n' + ET.tostring(node),
+ )
+ if 'workflows' in self.dirs:
+ for workflow in Workflow.select():
+ node = workflow.export_to_xml(include_id=True)
+ misc.indent_xml(node)
+ z.writestr(
+ os.path.join('workflows_xml', str(workflow.id)),
+ b'\n' + ET.tostring(node),
+ )
+ if 'blockdefs' in self.dirs:
+ for blockdef in BlockDef.select():
+ node = blockdef.export_to_xml(include_id=True)
+ misc.indent_xml(node)
+ z.writestr(
+ os.path.join('blockdefs_xml', str(blockdef.id)),
+ b'\n' + ET.tostring(node),
+ )
+ if 'roles' in self.dirs:
+ for role in get_publisher().role_class.select():
+ node = role.export_to_xml(include_id=True)
+ misc.indent_xml(node)
+ z.writestr(
+ os.path.join('roles_xml', str(role.id)),
+ b'\n' + ET.tostring(node),
+ )
- if self.settings:
- z.write(os.path.join(self.app_dir, 'config.pck'), 'config.pck')
- for f in os.listdir(self.app_dir):
- if f.startswith('idp-') and os.path.splitext(f)[-1] in ('.pem', '.xml'):
- z.write(os.path.join(self.app_dir, f), f)
- if os.path.exists(os.path.join(self.app_dir, 'config')):
- for f in os.listdir(os.path.join(self.app_dir, 'config')):
- z.write(os.path.join(self.app_dir, 'config', f), os.path.join('config', f))
- z.close()
+ if self.settings:
+ z.write(os.path.join(self.app_dir, 'config.pck'), 'config.pck')
+ for f in os.listdir(self.app_dir):
+ if f.startswith('idp-') and os.path.splitext(f)[-1] in ('.pem', '.xml'):
+ z.write(os.path.join(self.app_dir, f), f)
+ if os.path.exists(os.path.join(self.app_dir, 'config')):
+ for f in os.listdir(os.path.join(self.app_dir, 'config')):
+ z.write(os.path.join(self.app_dir, 'config', f), os.path.join('config', f))
job.file_content = c.getvalue()
job.store()
diff --git a/wcs/admin/workflows.py b/wcs/admin/workflows.py
index 197fc1fad..f6b11d3b6 100644
--- a/wcs/admin/workflows.py
+++ b/wcs/admin/workflows.py
@@ -275,10 +275,10 @@ def graphviz(workflow, url_prefix='', select=None, svg=True, include=False):
out = out.getvalue()
if svg:
try:
- process = Popen(['dot', '-Tsvg'], stdin=PIPE, stdout=PIPE)
- out = process.communicate(force_bytes(out))[0]
- if process.returncode != 0:
- return ''
+ with Popen(['dot', '-Tsvg'], stdin=PIPE, stdout=PIPE) as process:
+ out = process.communicate(force_bytes(out))[0]
+ if process.returncode != 0:
+ return ''
except OSError:
return ''
out = graphviz_post_treatment(out, revert_colours, include=include)
diff --git a/wcs/backoffice/management.py b/wcs/backoffice/management.py
index 75635397c..ca1085043 100644
--- a/wcs/backoffice/management.py
+++ b/wcs/backoffice/management.py
@@ -2997,24 +2997,22 @@ class FormBackOfficeStatusPage(FormStatusPage):
def download_as_zip(self):
formdata = self.filled
zip_content = io.BytesIO()
- zip_file = zipfile.ZipFile(zip_content, 'w')
counter = {'value': 0}
- def add_zip_file(upload):
+ def add_zip_file(upload, zip_file):
counter['value'] += 1
filename = '%s_%s' % (counter['value'], upload.base_filename)
zip_file.writestr(filename, upload.get_content())
- for value in formdata.data.values():
- if isinstance(value, PicklableUpload):
- add_zip_file(value)
- if isinstance(value, dict) and isinstance(value.get('data'), list):
- for subvalue in value.get('data'):
- for subvalue_elem in subvalue.values():
- if isinstance(subvalue_elem, PicklableUpload):
- add_zip_file(subvalue_elem)
-
- zip_file.close()
+ with zipfile.ZipFile(zip_content, 'w') as zip_file:
+ for value in formdata.data.values():
+ if isinstance(value, PicklableUpload):
+ add_zip_file(value, zip_file)
+ if isinstance(value, dict) and isinstance(value.get('data'), list):
+ for subvalue in value.get('data'):
+ for subvalue_elem in subvalue.values():
+ if isinstance(subvalue_elem, PicklableUpload):
+ add_zip_file(subvalue_elem, zip_file)
response = get_response()
response.set_content_type('application/zip')
diff --git a/wcs/ctl/backup.py b/wcs/ctl/backup.py
index c8d29bfb8..a19beabb7 100644
--- a/wcs/ctl/backup.py
+++ b/wcs/ctl/backup.py
@@ -48,17 +48,15 @@ class CmdBackup(Command):
os.mkdir(backup_dir)
backup_filepath = os.path.join(backup_dir, 'backup-%s%s%s-%s%s%s.tar.gz' % time.localtime()[:6])
- backup = tarfile.open(backup_filepath, mode='w:gz')
- for basename, dirnames, filenames in os.walk(pub.app_dir):
- if 'backups' in dirnames: # do not recurse in backup directory
- idx = dirnames.index('backups')
- dirnames[idx : idx + 1] = []
- for filename in filenames:
- backup.add(
- os.path.join(basename, filename), os.path.join(basename, filename)[len(pub.app_dir) :]
- )
-
- backup.close()
+ with tarfile.open(backup_filepath, mode='w:gz') as backup:
+ for basename, dirnames, filenames in os.walk(pub.app_dir):
+ if 'backups' in dirnames: # do not recurse in backup directory
+ idx = dirnames.index('backups')
+ dirnames[idx : idx + 1] = []
+ for filename in filenames:
+ backup.add(
+ os.path.join(basename, filename), os.path.join(basename, filename)[len(pub.app_dir) :]
+ )
CmdBackup.register()
diff --git a/wcs/ctl/management/commands/convert_to_sql.py b/wcs/ctl/management/commands/convert_to_sql.py
index f1b639381..a37543d0e 100644
--- a/wcs/ctl/management/commands/convert_to_sql.py
+++ b/wcs/ctl/management/commands/convert_to_sql.py
@@ -100,13 +100,12 @@ class Command(BaseCommand):
sql.SqlUser.fix_sequences()
if errors:
- error_log = open('error_user.log', 'w')
- for user, trace in errors:
- error_log.write('user_id %s\n' % user.id)
- error_log.write(trace)
- error_log.write('-' * 80)
- error_log.write('\n\n')
- error_log.close()
+ with open('error_user.log', 'w') as error_log:
+ for user, trace in errors:
+ error_log.write('user_id %s\n' % user.id)
+ error_log.write(trace)
+ error_log.write('-' * 80)
+ error_log.write('\n\n')
print('There were some errors, see error_user.log for details.')
def store_forms(self):
@@ -138,15 +137,14 @@ class Command(BaseCommand):
sql_data_class.fix_sequences()
if errors:
- error_log = open('error_formdata.log', 'w')
- for formdata, trace in errors:
- error_log.write(
- '%s %s - %s\n' % (formdata.formdef, formdata.id, localstrftime(formdata.receipt_time))
- )
- error_log.write(trace)
- error_log.write('-' * 80)
- error_log.write('\n\n')
- error_log.close()
+ with open('error_formdata.log', 'w') as error_log:
+ for formdata, trace in errors:
+ error_log.write(
+ '%s %s - %s\n' % (formdata.formdef, formdata.id, localstrftime(formdata.receipt_time))
+ )
+ error_log.write(trace)
+ error_log.write('-' * 80)
+ error_log.write('\n\n')
print('There were some errors, see error_formdata.log.')
def update_progress(self, progress, num_columns=120):
diff --git a/wcs/ctl/restore.py b/wcs/ctl/restore.py
index 94d923819..bc3bcc333 100644
--- a/wcs/ctl/restore.py
+++ b/wcs/ctl/restore.py
@@ -50,12 +50,11 @@ class CmdRestore(Command):
return 1
backup_filepath = sub_options.filename
- backup = tarfile.open(backup_filepath, mode='r:*')
- for tarinfo in backup:
- if os.path.normpath(tarinfo.name).startswith('..'):
- continue
- backup.extract(tarinfo, pub.app_dir)
- backup.close()
+ with tarfile.open(backup_filepath, mode='r:*') as backup:
+ for tarinfo in backup:
+ if os.path.normpath(tarinfo.name).startswith('..'):
+ continue
+ backup.extract(tarinfo, pub.app_dir)
CmdRestore.register()
diff --git a/wcs/publisher.py b/wcs/publisher.py
index 7072535d7..71a0f44a8 100644
--- a/wcs/publisher.py
+++ b/wcs/publisher.py
@@ -162,7 +162,6 @@ class WcsPublisher(QommonPublisher):
self.set_session_manager(self.session_manager_class(session_class=self.session_class))
def import_zip(self, fd):
- z = zipfile.ZipFile(fd)
results = {
'formdefs': 0,
'carddefs': 0,
@@ -203,129 +202,129 @@ class WcsPublisher(QommonPublisher):
rv[key] = value
return rv
- for f in z.namelist():
- if f in ('.indexes', '.max_id'):
- continue
- if os.path.dirname(f) in (
- 'formdefs_xml',
- 'carddefs_xml',
- 'workflows_xml',
- 'blockdefs_xml',
- 'roles_xml',
- ):
- continue
- path = os.path.join(self.app_dir, f)
- if not os.path.exists(os.path.dirname(path)):
- os.mkdir(os.path.dirname(path))
- if not os.path.basename(f):
- # skip directories
- continue
- data = z.read(f)
- if f in ('config.pck', 'config.json'):
- results['settings'] = 1
- if f == 'config.pck':
- d = pickle.loads(data)
- else:
- d = json.loads(force_text(data), object_hook=_decode_dict)
- if 'sp' in self.cfg:
- current_sp = self.cfg['sp']
- else:
- current_sp = None
- self.cfg = d
- if current_sp:
- self.cfg['sp'] = current_sp
- elif 'sp' in self.cfg:
- del self.cfg['sp']
- self.write_cfg()
- continue
- open(path, 'wb').write(data)
- if os.path.split(f)[0] in results:
- results[os.path.split(f)[0]] += 1
+ with zipfile.ZipFile(fd) as z:
+ for f in z.namelist():
+ if f in ('.indexes', '.max_id'):
+ continue
+ if os.path.dirname(f) in (
+ 'formdefs_xml',
+ 'carddefs_xml',
+ 'workflows_xml',
+ 'blockdefs_xml',
+ 'roles_xml',
+ ):
+ continue
+ path = os.path.join(self.app_dir, f)
+ if not os.path.exists(os.path.dirname(path)):
+ os.mkdir(os.path.dirname(path))
+ if not os.path.basename(f):
+ # skip directories
+ continue
+ data = z.read(f)
+ if f in ('config.pck', 'config.json'):
+ results['settings'] = 1
+ if f == 'config.pck':
+ d = pickle.loads(data)
+ else:
+ d = json.loads(force_text(data), object_hook=_decode_dict)
+ if 'sp' in self.cfg:
+ current_sp = self.cfg['sp']
+ else:
+ current_sp = None
+ self.cfg = d
+ if current_sp:
+ self.cfg['sp'] = current_sp
+ elif 'sp' in self.cfg:
+ del self.cfg['sp']
+ self.write_cfg()
+ continue
+ open(path, 'wb').write(data)
+ if os.path.split(f)[0] in results:
+ results[os.path.split(f)[0]] += 1
- # second pass, fields blocks
- from wcs.blocks import BlockDef
+ # second pass, fields blocks
+ from wcs.blocks import BlockDef
- for f in z.namelist():
- if os.path.dirname(f) == 'blockdefs_xml' and os.path.basename(f):
- blockdef = BlockDef.import_from_xml(z.open(f), include_id=True)
- blockdef.store()
- results['blockdefs'] += 1
+ for f in z.namelist():
+ if os.path.dirname(f) == 'blockdefs_xml' and os.path.basename(f):
+ blockdef = BlockDef.import_from_xml(z.open(f), include_id=True)
+ blockdef.store()
+ results['blockdefs'] += 1
- # third pass, workflows
- from wcs.workflows import Workflow
+ # third pass, workflows
+ from wcs.workflows import Workflow
- for f in z.namelist():
- if os.path.dirname(f) == 'workflows_xml' and os.path.basename(f):
- workflow = Workflow.import_from_xml(z.open(f), include_id=True, check_datasources=False)
- workflow.store()
- results['workflows'] += 1
+ for f in z.namelist():
+ if os.path.dirname(f) == 'workflows_xml' and os.path.basename(f):
+ workflow = Workflow.import_from_xml(z.open(f), include_id=True, check_datasources=False)
+ workflow.store()
+ results['workflows'] += 1
- # fourth pass, forms and cards
- from wcs.carddef import CardDef
- from wcs.formdef import FormDef
+ # fourth pass, forms and cards
+ from wcs.carddef import CardDef
+ from wcs.formdef import FormDef
- formdefs = []
- carddefs = []
- for f in z.namelist():
- if os.path.dirname(f) == 'formdefs_xml' and os.path.basename(f):
- formdef = FormDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
- formdef.store()
- formdefs.append(formdef)
- results['formdefs'] += 1
- if os.path.dirname(f) == 'carddefs_xml' and os.path.basename(f):
- carddef = CardDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
- carddef.store()
- carddefs.append(carddef)
- results['carddefs'] += 1
-
- # sixth pass, roles
- roles = []
- for f in z.namelist():
- if os.path.dirname(f) == 'roles_xml' and os.path.basename(f):
- role = self.role_class.import_from_xml(z.open(f), include_id=True)
- role.store()
- roles.append(role)
- results['roles'] += 1
-
- # rebuild indexes for imported objects
- for k, v in results.items():
- if k == 'settings':
- continue
- if v == 0:
- continue
- klass = None
- if k == 'formdefs':
- from .formdef import FormDef
-
- klass = FormDef
- elif k == 'carddefs':
- from .carddef import CardDef
-
- klass = CardDef
- elif k == 'blockdefs':
- klass = BlockDef
- elif k == 'categories':
- from .categories import Category
-
- klass = Category
- elif k == 'roles':
- klass = self.role_class
- elif k == 'workflows':
- klass = Workflow
- if klass:
- klass.rebuild_indexes()
-
- if k == 'formdefs':
- # in case of formdefs, we store them anew in case SQL changes
- # are required.
- for formdef in formdefs or FormDef.select():
+ formdefs = []
+ carddefs = []
+ for f in z.namelist():
+ if os.path.dirname(f) == 'formdefs_xml' and os.path.basename(f):
+ formdef = FormDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
formdef.store()
- elif k == 'carddefs':
- # ditto for cards
- for carddef in carddefs or CardDef.select():
+ formdefs.append(formdef)
+ results['formdefs'] += 1
+ if os.path.dirname(f) == 'carddefs_xml' and os.path.basename(f):
+ carddef = CardDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
carddef.store()
+ carddefs.append(carddef)
+ results['carddefs'] += 1
+
+ # sixth pass, roles
+ roles = []
+ for f in z.namelist():
+ if os.path.dirname(f) == 'roles_xml' and os.path.basename(f):
+ role = self.role_class.import_from_xml(z.open(f), include_id=True)
+ role.store()
+ roles.append(role)
+ results['roles'] += 1
+
+ # rebuild indexes for imported objects
+ for k, v in results.items():
+ if k == 'settings':
+ continue
+ if v == 0:
+ continue
+ klass = None
+ if k == 'formdefs':
+ from .formdef import FormDef
+
+ klass = FormDef
+ elif k == 'carddefs':
+ from .carddef import CardDef
+
+ klass = CardDef
+ elif k == 'blockdefs':
+ klass = BlockDef
+ elif k == 'categories':
+ from .categories import Category
+
+ klass = Category
+ elif k == 'roles':
+ klass = self.role_class
+ elif k == 'workflows':
+ klass = Workflow
+ if klass:
+ klass.rebuild_indexes()
+
+ if k == 'formdefs':
+ # in case of formdefs, we store them anew in case SQL changes
+ # are required.
+ for formdef in formdefs or FormDef.select():
+ formdef.store()
+ elif k == 'carddefs':
+ # ditto for cards
+ for carddef in carddefs or CardDef.select():
+ carddef.store()
- z.close()
return results
def initialize_sql(self):
diff --git a/wcs/qommon/admin/menu.py b/wcs/qommon/admin/menu.py
index 7898b0605..90b16a7d3 100644
--- a/wcs/qommon/admin/menu.py
+++ b/wcs/qommon/admin/menu.py
@@ -45,12 +45,12 @@ def _find_vc_version():
if os.path.exists('/etc/debian_version'):
# debian case
try:
- process = subprocess.Popen(
+ with subprocess.Popen(
['dpkg', '-l', package], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
- )
- version = process.communicate()[0].splitlines()[-1].split()[2]
- if process.returncode == 0:
- return "%s %s (Debian)" % (package, version.decode())
+ ) as process:
+ version = process.communicate()[0].splitlines()[-1].split()[2]
+ if process.returncode == 0:
+ return "%s %s (Debian)" % (package, version.decode())
except Exception:
pass
return None
@@ -66,13 +66,13 @@ def _find_vc_version():
if os.path.exists(os.path.join(srcdir, '.git')):
try:
- process = subprocess.Popen(
+ with subprocess.Popen(
['git', 'log', '--pretty=oneline', '-1'], stdout=subprocess.PIPE, cwd=srcdir
- )
- output = process.communicate()[0]
+ ) as process:
+ output = process.communicate()[0]
rev = str(output.split()[0].decode('ascii'))
- process = subprocess.Popen(['git', 'branch'], stdout=subprocess.PIPE, cwd=srcdir)
- output = process.communicate()[0]
+ with subprocess.Popen(['git', 'branch'], stdout=subprocess.PIPE, cwd=srcdir) as process:
+ output = process.communicate()[0]
starred_line = [x for x in output.splitlines() if x.startswith(b'*')][0]
branch = str(starred_line.split()[1].decode('ascii'))
url = "https://repos.entrouvert.org/%s.git/commit/?id=%s" % (package, rev)
diff --git a/wcs/qommon/form.py b/wcs/qommon/form.py
index e231e304f..5ec673c0c 100644
--- a/wcs/qommon/form.py
+++ b/wcs/qommon/form.py
@@ -660,7 +660,8 @@ class UploadedFile:
file_path = self.build_file_path()
if not os.path.exists(self.dir_path()):
os.mkdir(self.dir_path())
- open(file_path, 'wb').write(content)
+ with open(file_path, 'wb') as f:
+ f.write(content)
def dir_path(self):
return os.path.join(get_publisher().app_dir, self.directory)
@@ -669,7 +670,7 @@ class UploadedFile:
return os.path.join(get_publisher().app_dir, self.directory, self.filename)
def get_file(self):
- return open(self.build_file_path(), 'rb')
+ return open(self.build_file_path(), 'rb') # pylint: disable=consider-using-with
def get_content(self):
return self.get_file().read()
diff --git a/wcs/qommon/misc.py b/wcs/qommon/misc.py
index 28212940a..127b240cb 100644
--- a/wcs/qommon/misc.py
+++ b/wcs/qommon/misc.py
@@ -679,8 +679,7 @@ def get_thumbnail(filepath, content_type=None):
except subprocess.CalledProcessError:
raise ThumbnailError()
else:
- fp = open(filepath, 'rb')
-
+ fp = open(filepath, 'rb') # pylint: disable=consider-using-with
try:
image = Image.open(fp)
try:
diff --git a/wcs/qommon/ods.py b/wcs/qommon/ods.py
index 4c18efe94..614703a63 100644
--- a/wcs/qommon/ods.py
+++ b/wcs/qommon/ods.py
@@ -157,22 +157,21 @@ class Workbook:
return ET.tostring(self.get_content_node(), 'utf-8')
def save(self, output):
- z = zipfile.ZipFile(output, 'w')
- z.writestr('content.xml', self.get_content())
- z.writestr('styles.xml', self.get_styles())
- z.writestr('mimetype', 'application/vnd.oasis.opendocument.spreadsheet')
- z.writestr(
- 'META-INF/manifest.xml',
- '''
-
-
-
-
-
-
-''',
- )
- z.close()
+ with zipfile.ZipFile(output, 'w') as z:
+ z.writestr('content.xml', self.get_content())
+ z.writestr('styles.xml', self.get_styles())
+ z.writestr('mimetype', 'application/vnd.oasis.opendocument.spreadsheet')
+ z.writestr(
+ 'META-INF/manifest.xml',
+ '''
+
+
+
+
+
+
+ ''',
+ )
class WorkSheet:
diff --git a/wcs/qommon/storage.py b/wcs/qommon/storage.py
index 2a732c39d..6892d0216 100644
--- a/wcs/qommon/storage.py
+++ b/wcs/qommon/storage.py
@@ -501,7 +501,7 @@ class StorableObject:
def get_filename(cls, filename, ignore_errors=False, ignore_migration=False, **kwargs):
fd = None
try:
- fd = open(force_bytes(filename, 'utf-8'), 'rb')
+ fd = open(force_bytes(filename, 'utf-8'), 'rb') # pylint: disable=consider-using-with
o = cls.storage_load(fd, **kwargs)
except IOError:
if ignore_errors:
diff --git a/wcs/qommon/upload_storage.py b/wcs/qommon/upload_storage.py
index 0eeba3924..fdb1955c7 100644
--- a/wcs/qommon/upload_storage.py
+++ b/wcs/qommon/upload_storage.py
@@ -42,7 +42,7 @@ class PicklableUpload(Upload):
return self.__dict__.get('fp')
elif getattr(self, 'qfilename', None):
basedir = os.path.join(get_publisher().app_dir, 'uploads')
- self.fp = open(os.path.join(basedir, self.qfilename), 'rb')
+ self.fp = open(os.path.join(basedir, self.qfilename), 'rb') # pylint: disable=consider-using-with
return self.fp
return None
@@ -125,11 +125,10 @@ class UploadStorage:
upload.__class__ = PicklableUpload
dirname = os.path.join(get_publisher().app_dir, 'tempfiles')
filename = os.path.join(dirname, upload.token)
- fd = open(filename, 'wb')
- upload.get_file_pointer().seek(0)
- fd.write(upload.get_file_pointer().read())
- upload.size = fd.tell()
- fd.close()
+ with open(filename, 'wb') as fd:
+ upload.get_file_pointer().seek(0)
+ fd.write(upload.get_file_pointer().read())
+ upload.size = fd.tell()
def get_tempfile(self, temp_data):
value = PicklableUpload(temp_data['orig_filename'], temp_data['content_type'], temp_data['charset'])
@@ -138,7 +137,7 @@ class UploadStorage:
filename = os.path.join(dirname, temp_data['unsigned_token'])
value.token = temp_data['token']
value.file_size = os.path.getsize(filename)
- value.fp = open(filename, 'rb')
+ value.fp = open(filename, 'rb') # pylint: disable=consider-using-with
return value
def save(self, upload):
diff --git a/wcs/qommon/vendor/locket.py b/wcs/qommon/vendor/locket.py
index 9d5aac452..60e800c6c 100644
--- a/wcs/qommon/vendor/locket.py
+++ b/wcs/qommon/vendor/locket.py
@@ -65,15 +65,12 @@ _locks = weakref.WeakValueDictionary()
def lock_file(path, **kwargs):
- _locks_lock.acquire()
- try:
+ with _locks_lock:
lock = _locks.get(path)
if lock is None:
lock = _create_lock_file(path, **kwargs)
_locks[path] = lock
return lock
- finally:
- _locks_lock.release()
def _create_lock_file(path, **kwargs):
@@ -139,10 +136,10 @@ class _ThreadLock:
def acquire(self):
if self._timeout is None:
- self._lock.acquire()
+ self._lock.acquire() # pylint: disable=consider-using-with
else:
_acquire_non_blocking(
- acquire=lambda: self._lock.acquire(False),
+ acquire=lambda: self._lock.acquire(False), # pylint: disable=consider-using-with
timeout=self._timeout,
retry_period=self._retry_period,
path=self._path,
@@ -162,7 +159,7 @@ class _LockFile:
def acquire(self):
if self._file is None:
- self._file = open(self._path, "w")
+ self._file = open(self._path, "w") # pylint: disable=consider-using-with
if self._timeout is None:
_lock_file_blocking(self._file)
else:
diff --git a/wcs/qommon/x509utils.py b/wcs/qommon/x509utils.py
index 274a45335..2b42ca5be 100644
--- a/wcs/qommon/x509utils.py
+++ b/wcs/qommon/x509utils.py
@@ -45,9 +45,11 @@ def _call_openssl(args):
Return a tuple made of the return code and the stdout output
"""
try:
- process = subprocess.Popen(args=[_openssl] + args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- output = process.communicate()[0]
- return process.returncode, output
+ with subprocess.Popen(
+ args=[_openssl] + args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
+ ) as process:
+ output = process.communicate()[0]
+ return process.returncode, output
except OSError:
return 1, None
diff --git a/wcs/wf/export_to_model.py b/wcs/wf/export_to_model.py
index fa3d5ce6e..18f4cb1d1 100644
--- a/wcs/wf/export_to_model.py
+++ b/wcs/wf/export_to_model.py
@@ -135,30 +135,28 @@ def transform_opendocument(instream, outstream, process):
format, parse context.xml with element tree and apply process to its root
node.
"""
- zin = zipfile.ZipFile(instream, mode='r')
- zout = zipfile.ZipFile(outstream, mode='w')
- new_images = {}
- assert 'content.xml' in zin.namelist()
- for filename in zin.namelist():
- # first pass to process meta.xml, content.xml and styles.xml
- if filename not in ('meta.xml', 'content.xml', 'styles.xml'):
- continue
- content = zin.read(filename)
- root = ET.fromstring(content)
- process(root, new_images)
- content = ET.tostring(root)
- zout.writestr(filename, content)
-
- for filename in zin.namelist():
- # second pass to copy/replace other files
- if filename in ('meta.xml', 'content.xml', 'styles.xml'):
- continue
- if filename in new_images:
- content = new_images[filename].get_content()
- else:
+ with zipfile.ZipFile(instream, mode='r') as zin, zipfile.ZipFile(outstream, mode='w') as zout:
+ new_images = {}
+ assert 'content.xml' in zin.namelist()
+ for filename in zin.namelist():
+ # first pass to process meta.xml, content.xml and styles.xml
+ if filename not in ('meta.xml', 'content.xml', 'styles.xml'):
+ continue
content = zin.read(filename)
- zout.writestr(filename, content)
- zout.close()
+ root = ET.fromstring(content)
+ process(root, new_images)
+ content = ET.tostring(root)
+ zout.writestr(filename, content)
+
+ for filename in zin.namelist():
+ # second pass to copy/replace other files
+ if filename in ('meta.xml', 'content.xml', 'styles.xml'):
+ continue
+ if filename in new_images:
+ content = new_images[filename].get_content()
+ else:
+ content = zin.read(filename)
+ zout.writestr(filename, content)
def is_opendocument(stream):
diff --git a/wcs/workflows.py b/wcs/workflows.py
index edd102be3..4b6416c0d 100644
--- a/wcs/workflows.py
+++ b/wcs/workflows.py
@@ -229,7 +229,7 @@ class AttachmentEvolutionPart:
def get_file_pointer(self):
if self.filename.startswith('uuid-'):
return None
- return open(self.filename, 'rb')
+ return open(self.filename, 'rb') # pylint: disable=consider-using-with
def __getstate__(self):
odict = self.__dict__.copy()