misc: fix pylint consider-using-with (#53406)

This commit is contained in:
Lauréline Guérin 2021-04-29 18:11:24 +02:00
parent 0af0024e32
commit b6e849811a
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
41 changed files with 686 additions and 749 deletions

View File

@ -12,7 +12,6 @@ disable=
broad-except,
consider-using-dict-comprehension,
consider-using-set-comprehension,
consider-using-with,
cyclic-import,
duplicate-code,
fixme,

View File

@ -115,15 +115,13 @@ def test_admin_for_all(pub):
resp = get_app(pub).get('/backoffice/settings/', status=200)
# check it doesn't work with a non-empty ADMIN_FOR_ALL file
fd = open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w')
fd.write('x.x.x.x')
fd.close()
with open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w') as fd:
fd.write('x.x.x.x')
resp = get_app(pub).get('/backoffice/settings/', status=302)
# check it works if the file contains the user IP address
fd = open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w')
fd.write('127.0.0.1')
fd.close()
with open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w') as fd:
fd.write('127.0.0.1')
resp = get_app(pub).get('/backoffice/settings/', status=200)
# check it's also ok if the user is logged in but doesn't have the

View File

@ -251,9 +251,8 @@ def test_data_sources_view(pub):
# check json
json_file_path = os.path.join(pub.app_dir, 'test.json')
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
data_source.data_source = {'type': 'json', 'value': 'file://%s' % json_file_path}
data_source.store()
@ -263,9 +262,8 @@ def test_data_sources_view(pub):
assert 'foo' in resp.text
# with other attributes
json_file = open(json_file_path, 'w')
json.dump({'results': [{'pk': '1', 'label': 'foo'}, {'pk': '2'}]}, json_file)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump({'results': [{'pk': '1', 'label': 'foo'}, {'pk': '2'}]}, json_file)
data_source.data_attribute = 'results'
data_source.id_attribute = 'pk'
@ -288,17 +286,16 @@ def test_data_sources_view(pub):
# check geojson
geojson_file_path = os.path.join(pub.app_dir, 'test.geojson')
geojson_file = open(geojson_file_path, 'w')
json.dump(
{
'features': [
{'properties': {'id': '1', 'text': 'foo', 'label': 'foo'}},
{'properties': {'id': '2', 'text': 'bar', 'label': 'bar'}},
]
},
geojson_file,
)
geojson_file.close()
with open(geojson_file_path, 'w') as geojson_file:
json.dump(
{
'features': [
{'properties': {'id': '1', 'text': 'foo', 'label': 'foo'}},
{'properties': {'id': '2', 'text': 'bar', 'label': 'bar'}},
]
},
geojson_file,
)
data_source.data_source = {'type': 'geojson', 'value': 'file://%s' % geojson_file_path}
data_source.store()
with HttpRequestsMocking():

View File

@ -2439,18 +2439,18 @@ def test_form_archive(pub):
resp = resp.click(href='archive')
resp = resp.form.submit('submit')
assert resp.content_type == 'application/x-wcs-archive'
tf = tarfile.open(fileobj=io.BytesIO(resp.body))
assert 'formdef' in [x.name for x in tf.getmembers()]
assert len(tf.getmembers()) == 8 # 7 formdata + 1 formdef
with tarfile.open(fileobj=io.BytesIO(resp.body)) as tf:
assert 'formdef' in [x.name for x in tf.getmembers()]
assert len(tf.getmembers()) == 8 # 7 formdata + 1 formdef
# second archive, it shouldn't get anything (but the formdef)
resp = app.get('/backoffice/forms/1/')
resp = resp.click(href='archive')
resp = resp.form.submit('submit')
assert resp.content_type == 'application/x-wcs-archive'
tf = tarfile.open(fileobj=io.BytesIO(resp.body))
assert 'formdef' in [x.name for x in tf.getmembers()]
assert len(tf.getmembers()) == 1 # 0 formdata + 1 formdef
with tarfile.open(fileobj=io.BytesIO(resp.body)) as tf:
assert 'formdef' in [x.name for x in tf.getmembers()]
assert len(tf.getmembers()) == 1 # 0 formdata + 1 formdef
def test_form_overwrite(pub):

View File

@ -115,8 +115,8 @@ def test_settings_export_import(pub):
assert 'completed' in resp.text
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
zipf = zipfile.ZipFile(zip_content, 'a')
filelist = zipf.namelist()
with zipfile.ZipFile(zip_content, 'a') as zipf:
filelist = zipf.namelist()
assert len(filelist) == 0
# check afterjob ajax call
@ -166,8 +166,8 @@ def test_settings_export_import(pub):
resp = resp.follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
zipf = zipfile.ZipFile(zip_content, 'a')
filelist = zipf.namelist()
with zipfile.ZipFile(zip_content, 'a') as zipf:
filelist = zipf.namelist()
assert 'formdefs/1' not in filelist
assert 'formdefs_xml/1' in filelist
assert 'carddefs/1' not in filelist
@ -249,8 +249,8 @@ def test_settings_export_import(pub):
resp = resp.follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
zipf = zipfile.ZipFile(zip_content, 'a')
filelist = zipf.namelist()
with zipfile.ZipFile(zip_content, 'a') as zipf:
filelist = zipf.namelist()
assert 'formdefs_xml/%s' % formdef.id in filelist
assert 'workflows_xml/%s' % workflow.id in filelist
assert 'roles_xml/%s' % role.id not in filelist
@ -283,8 +283,8 @@ def test_settings_export_import(pub):
resp = resp.follow()
resp = resp.click('Download Export')
zip_content = io.BytesIO(resp.body)
zipf = zipfile.ZipFile(zip_content, 'a')
filelist = zipf.namelist()
with zipfile.ZipFile(zip_content, 'a') as zipf:
filelist = zipf.namelist()
assert len([x for x in filelist if 'roles_xml/' in x]) == 0
# check an error is displayed if such an import is then used and roles are
@ -305,11 +305,13 @@ def test_settings_themes(pub):
# create mock theme
os.mkdir(os.path.join(pub.app_dir, 'themes'))
os.mkdir(os.path.join(pub.app_dir, 'themes', 'test'))
fd = open(os.path.join(pub.app_dir, 'themes', 'test', 'desc.xml'), 'w')
fd.write(
'<?xml version="1.0"?>' '<theme name="test" version="1.0">' ' <label>Test Theme</label>' '</theme>'
)
fd.close()
with open(os.path.join(pub.app_dir, 'themes', 'test', 'desc.xml'), 'w') as fd:
fd.write(
'<?xml version="1.0"?>'
'<theme name="test" version="1.0">'
' <label>Test Theme</label>'
'</theme>'
)
resp = app.get('/backoffice/settings/themes')
assert 'biglist themes' in resp.text
@ -831,17 +833,17 @@ def test_settings_theme_download_upload(pub):
resp = app.get('/backoffice/settings/themes')
resp = resp.click('download', index=0)
assert resp.headers['content-type'] == 'application/zip'
zip_content = io.BytesIO(resp.body)
zipf = zipfile.ZipFile(zip_content, 'a')
filelist = zipf.namelist()
assert 'alto/icon.png' in filelist
assert 'alto/desc.xml' in filelist
assert 'alto/template.ezt' in filelist
assert 'alto/wcs.css' in filelist
# modify it
zipf.writestr('alto/foobar.txt', 'XXX')
zipf.close()
zip_content = io.BytesIO(resp.body)
with zipfile.ZipFile(zip_content, 'a') as zipf:
filelist = zipf.namelist()
assert 'alto/icon.png' in filelist
assert 'alto/desc.xml' in filelist
assert 'alto/template.ezt' in filelist
assert 'alto/wcs.css' in filelist
# modify it
zipf.writestr('alto/foobar.txt', 'XXX')
# upload it
resp = app.get('/backoffice/settings/themes')

View File

@ -244,8 +244,8 @@ def test_api_ods_formdata_custom_view(pub, local_user):
# check it now gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
ods_sheet = ET.parse(zipf.open('content.xml'))
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 11
pub.custom_view_class.wipe()
@ -258,8 +258,8 @@ def test_api_ods_formdata_custom_view(pub, local_user):
custom_view.store()
resp = get_app(pub).get(sign_uri('/api/forms/test/ods/custom-view', user=local_user))
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
ods_sheet = ET.parse(zipf.open('content.xml'))
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 21

View File

@ -45,12 +45,13 @@ def pub(request, emails):
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''\
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''\
[api-secrets]
coucou = 1234
'''
)
)
return pub
@ -965,8 +966,8 @@ def test_api_ods_formdata(pub, local_user):
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet'
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
ods_sheet = ET.parse(zipf.open('content.xml'))
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 311

View File

@ -66,14 +66,13 @@ def pub(request, emails):
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
fd.write(
'''
[api-secrets]
coucou = 1234
'''
)
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''
[api-secrets]
coucou = 1234
'''
)
return pub
@ -528,9 +527,8 @@ def test_backoffice_listing_order(pub):
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'default-sort-order', '-last_update_time')
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
pub.site_options.write(fd)
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = app.get('/backoffice/management/form-title/')
assert resp.text.count('data-link') == 17
@ -605,9 +603,8 @@ def test_backoffice_channel_column(pub):
if not pub.site_options.has_section('variables'):
pub.site_options.add_section('variables')
pub.site_options.set('variables', 'welco_url', 'xxx')
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
pub.site_options.write(fd)
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
create_superuser(pub)
create_environment(pub)
@ -1885,9 +1882,8 @@ def test_backoffice_map(pub):
pub.site_options.add_section('options')
pub.site_options.set('options', 'map-tile-urltemplate', 'https://{s}.tile.example.net/{z}/{x}/{y}.png')
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
pub.site_options.write(fd)
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = app.get('/backoffice/management/form-title/')
resp = resp.click('Plot on a Map')
@ -2085,17 +2081,17 @@ def test_backoffice_download_as_zip(pub):
resp = app.get('/backoffice/management/form-title/%s/' % number31.id)
resp = resp.click('Download all files as .zip')
zip_content = io.BytesIO(resp.body)
zipf = zipfile.ZipFile(zip_content, 'a')
filelist = zipf.namelist()
assert set(filelist) == {'1_bar', '2_bar'}
for zipinfo in zipf.infolist():
content = zipf.read(zipinfo)
if zipinfo.filename == '1_bar':
assert content == b'hello world'
elif zipinfo.filename == '2_bar':
assert content == b'hello world2'
else:
assert False # unknown zip part
with zipfile.ZipFile(zip_content, 'a') as zipf:
filelist = zipf.namelist()
assert set(filelist) == {'1_bar', '2_bar'}
for zipinfo in zipf.infolist():
content = zipf.read(zipinfo)
if zipinfo.filename == '1_bar':
assert content == b'hello world'
elif zipinfo.filename == '2_bar':
assert content == b'hello world2'
else:
assert False # unknown zip part
def test_backoffice_sidebar_user_template(pub):
@ -3011,9 +3007,8 @@ def test_global_listing(pub):
if not pub.site_options.has_section('variables'):
pub.site_options.add_section('variables')
pub.site_options.set('variables', 'welco_url', 'xxx')
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
pub.site_options.write(fd)
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = app.get('/backoffice/management/listing?limit=500')
formdata = formdef.data_class().select(lambda x: x.status == 'wf-new')[0]
@ -3611,9 +3606,8 @@ def test_per_user_view(pub):
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'per-user-view', 'true')
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
pub.site_options.write(fd)
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = app.get('/backoffice/management/').follow()
assert 'Per User View' in resp.text

View File

@ -35,14 +35,13 @@ def pub(request, emails):
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
fd.write(
'''
[api-secrets]
coucou = 1234
'''
)
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''
[api-secrets]
coucou = 1234
'''
)
return pub

View File

@ -31,14 +31,13 @@ def pub(request, emails):
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
fd.write(
'''
[api-secrets]
coucou = 1234
'''
)
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''
[api-secrets]
coucou = 1234
'''
)
return pub

View File

@ -37,14 +37,13 @@ def pub(request, emails):
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
fd.write(
'''
[api-secrets]
coucou = 1234
'''
)
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''
[api-secrets]
coucou = 1234
'''
)
return pub
@ -236,9 +235,8 @@ def test_backoffice_csv_export_channel(pub):
if not pub.site_options.has_section('variables'):
pub.site_options.add_section('variables')
pub.site_options.set('variables', 'welco_url', 'xxx')
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
pub.site_options.write(fd)
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
create_superuser(pub)
@ -272,9 +270,8 @@ def test_backoffice_csv_export_channel(pub):
def test_backoffice_csv_export_anonymised(pub):
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
pub.site_options.write(fd)
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
create_superuser(pub)
@ -483,8 +480,8 @@ def test_backoffice_ods(pub):
assert 'filename=form-title.ods' in resp.headers['content-disposition']
assert resp.body[:2] == b'PK' # ods has a zip container
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
ods_sheet = ET.parse(zipf.open('content.xml'))
with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
ods_sheet = ET.parse(zipf.open('content.xml'))
# check the ods contains a link to the document
elem = ods_sheet.findall('.//{%s}a' % ods.NS['text'])[0]
assert (

View File

@ -37,14 +37,13 @@ def pub(request, emails):
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
fd.write(
'''
[api-secrets]
coucou = 1234
'''
)
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''
[api-secrets]
coucou = 1234
'''
)
return pub

View File

@ -56,22 +56,21 @@ from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, logi
def assert_equal_zip(stream1, stream2):
z1 = zipfile.ZipFile(stream1)
z2 = zipfile.ZipFile(stream2)
assert set(z1.namelist()) == set(z2.namelist())
for name in z1.namelist():
if name == 'styles.xml':
continue
if name in ['content.xml', 'meta.xml']:
t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name)))
try:
# >= python 3.8: tostring preserves attribute order; use canonicalize to sort them
t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2)
except AttributeError:
pass
else:
t1, t2 = z1.read(name), z2.read(name)
assert t1 == t2, 'file "%s" differs' % name
with zipfile.ZipFile(stream1) as z1, zipfile.ZipFile(stream2) as z2:
assert set(z1.namelist()) == set(z2.namelist())
for name in z1.namelist():
if name == 'styles.xml':
continue
if name in ['content.xml', 'meta.xml']:
t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name)))
try:
# >= python 3.8: tostring preserves attribute order; use canonicalize to sort them
t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2)
except AttributeError:
pass
else:
t1, t2 = z1.read(name), z2.read(name)
assert t1 == t2, 'file "%s" differs' % name
def pytest_generate_tests(metafunc):

View File

@ -60,22 +60,21 @@ def teardown_module(module):
def assert_equal_zip(stream1, stream2):
z1 = zipfile.ZipFile(stream1)
z2 = zipfile.ZipFile(stream2)
assert set(z1.namelist()) == set(z2.namelist())
for name in z1.namelist():
if name == 'styles.xml':
continue
if name in ['content.xml', 'meta.xml']:
t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name)))
try:
# >= python 3.8: tostring preserves attribute order; use canonicalize to sort them
t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2)
except AttributeError:
pass
else:
t1, t2 = z1.read(name), z2.read(name)
assert t1 == t2, 'file "%s" differs' % name
with zipfile.ZipFile(stream1) as z1, zipfile.ZipFile(stream2) as z2:
assert set(z1.namelist()) == set(z2.namelist())
for name in z1.namelist():
if name == 'styles.xml':
continue
if name in ['content.xml', 'meta.xml']:
t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name)))
try:
# >= python 3.8: tostring preserves attribute order; use canonicalize to sort them
t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2)
except AttributeError:
pass
else:
t1, t2 = z1.read(name), z2.read(name)
assert t1 == t2, 'file "%s" differs' % name
def test_formdata_attachment_download(pub):

View File

@ -123,9 +123,8 @@ def test_load_old_pickle():
test.description = 'Hello world'
os.mkdir(os.path.join(pub.app_dir, 'categories'))
fd = open(os.path.join(pub.app_dir, 'categories', '1'), 'wb')
pickle.dump(test, fd)
fd.close()
with open(os.path.join(pub.app_dir, 'categories', '1'), 'wb') as fd:
pickle.dump(test, fd)
test2 = Category.get(1)
assert test.id == test2.id

View File

@ -29,12 +29,13 @@ def pub(request):
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''
[wscall-secrets]
api.example.com = 1234
'''
)
)
pub.load_site_options()
@ -192,44 +193,38 @@ def test_json_datasource(pub, requests_pub, http_requests):
# invalid json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'wb')
json_file.write(codecs.encode(b'foobar', 'zlib_codec'))
json_file.close()
with open(json_file_path, 'wb') as json_file:
json_file.write(codecs.encode(b'foobar', 'zlib_codec'))
assert data_sources.get_items(datasource) == []
# empty json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({}, json_file)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump({}, json_file)
assert data_sources.get_items(datasource) == []
# unrelated json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump('foobar', json_file)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump('foobar', json_file)
assert data_sources.get_items(datasource) == []
# another unrelated json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': 'foobar'}, json_file)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump({'data': 'foobar'}, json_file)
assert data_sources.get_items(datasource) == []
# json file not using dictionaries
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [['1', 'foo'], ['2', 'bar']]}, json_file)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump({'data': [['1', 'foo'], ['2', 'bar']]}, json_file)
assert data_sources.get_items(datasource) == []
# a good json file
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar'}),
@ -241,12 +236,11 @@ def test_json_datasource(pub, requests_pub, http_requests):
# a json file with additional keys
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump(
{'data': [{'id': '1', 'text': 'foo', 'more': 'xxx'}, {'id': '2', 'text': 'bar', 'more': 'yyy'}]},
json_file,
)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump(
{'data': [{'id': '1', 'text': 'foo', 'more': 'xxx'}, {'id': '2', 'text': 'bar', 'more': 'yyy'}]},
json_file,
)
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'}),
@ -299,9 +293,8 @@ def test_json_datasource(pub, requests_pub, http_requests):
# a json file with integer as 'id'
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': 1, 'text': 'foo'}, {'id': 2, 'text': 'bar'}]}, json_file)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump({'data': [{'id': 1, 'text': 'foo'}, {'id': 2, 'text': 'bar'}]}, json_file)
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': 1, 'text': 'foo'}),
('2', 'bar', '2', {'id': 2, 'text': 'bar'}),
@ -313,9 +306,8 @@ def test_json_datasource(pub, requests_pub, http_requests):
# a json file with empty or no text values
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '1', 'text': ''}, {'id': '2'}]}, json_file)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump({'data': [{'id': '1', 'text': ''}, {'id': '2'}]}, json_file)
assert data_sources.get_items(datasource) == [
('1', '', '1', {'id': '1', 'text': ''}),
('2', '2', '2', {'id': '2', 'text': '2'}),
@ -327,18 +319,16 @@ def test_json_datasource(pub, requests_pub, http_requests):
# a json file with empty or no id
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '', 'text': 'foo'}, {'text': 'bar'}, {'id': None}]}, json_file)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump({'data': [{'id': '', 'text': 'foo'}, {'text': 'bar'}, {'id': None}]}, json_file)
assert data_sources.get_items(datasource) == []
assert data_sources.get_structured_items(datasource) == []
# specify data_attribute
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'data_attribute': 'results'}
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'results': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump({'results': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo'},
{'id': '2', 'text': 'bar'},
@ -351,9 +341,8 @@ def test_json_datasource(pub, requests_pub, http_requests):
# specify id_attribute
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'id_attribute': 'pk'}
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'pk': '1', 'text': 'foo'}, {'pk': '2', 'text': 'bar'}]}, json_file)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump({'data': [{'pk': '1', 'text': 'foo'}, {'pk': '2', 'text': 'bar'}]}, json_file)
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'pk': '1'},
{'id': '2', 'text': 'bar', 'pk': '2'},
@ -366,9 +355,8 @@ def test_json_datasource(pub, requests_pub, http_requests):
# specify text_attribute
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'text_attribute': 'label'}
get_request().datasources_cache = {}
json_file = open(json_file_path, 'w')
json.dump({'data': [{'id': '1', 'label': 'foo'}, {'id': '2', 'label': 'bar'}]}, json_file)
json_file.close()
with open(json_file_path, 'w') as json_file:
json.dump({'data': [{'id': '1', 'label': 'foo'}, {'id': '2', 'label': 'bar'}]}, json_file)
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'label': 'foo'},
{'id': '2', 'text': 'bar', 'label': 'bar'},
@ -508,45 +496,40 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# invalid geojson file
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'wb')
geojson_file.write(codecs.encode(b'foobar', 'zlib_codec'))
geojson_file.close()
with open(geojson_file_path, 'wb') as geojson_file:
geojson_file.write(codecs.encode(b'foobar', 'zlib_codec'))
assert data_sources.get_items(datasource) == []
# empty geojson file
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump({}, geojson_file)
geojson_file.close()
with open(geojson_file_path, 'w') as geojson_file:
json.dump({}, geojson_file)
assert data_sources.get_items(datasource) == []
# unrelated geojson file
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump('foobar', geojson_file)
geojson_file.close()
with open(geojson_file_path, 'w') as geojson_file:
json.dump('foobar', geojson_file)
assert data_sources.get_items(datasource) == []
# another unrelated geojson file
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump({'features': 'foobar'}, geojson_file)
geojson_file.close()
with open(geojson_file_path, 'w') as geojson_file:
json.dump({'features': 'foobar'}, geojson_file)
assert data_sources.get_items(datasource) == []
# a good geojson file
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump(
{
'features': [
{'properties': {'id': '1', 'text': 'foo'}},
{'properties': {'id': '2', 'text': 'bar'}},
]
},
geojson_file,
)
geojson_file.close()
with open(geojson_file_path, 'w') as geojson_file:
json.dump(
{
'features': [
{'properties': {'id': '1', 'text': 'foo'}},
{'properties': {'id': '2', 'text': 'bar'}},
]
},
geojson_file,
)
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'properties': {'id': '1', 'text': 'foo'}}),
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'properties': {'id': '2', 'text': 'bar'}}),
@ -558,17 +541,16 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# a geojson file with additional keys
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump(
{
'features': [
{'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}},
{'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}},
]
},
geojson_file,
)
geojson_file.close()
with open(geojson_file_path, 'w') as geojson_file:
json.dump(
{
'features': [
{'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}},
{'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}},
]
},
geojson_file,
)
assert data_sources.get_items(datasource) == [
(
'1',
@ -671,12 +653,16 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# a geojson file with integer as 'id'
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump(
{'features': [{'properties': {'id': 1, 'text': 'foo'}}, {'properties': {'id': 2, 'text': 'bar'}}]},
geojson_file,
)
geojson_file.close()
with open(geojson_file_path, 'w') as geojson_file:
json.dump(
{
'features': [
{'properties': {'id': 1, 'text': 'foo'}},
{'properties': {'id': 2, 'text': 'bar'}},
]
},
geojson_file,
)
assert data_sources.get_items(datasource) == [
('1', 'foo', '1', {'id': 1, 'text': 'foo', 'properties': {'id': 1, 'text': 'foo'}}),
('2', 'bar', '2', {'id': 2, 'text': 'bar', 'properties': {'id': 2, 'text': 'bar'}}),
@ -688,11 +674,10 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# a geojson file with empty or no text values
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump(
{'features': [{'properties': {'id': '1', 'text': ''}}, {'properties': {'id': '2'}}]}, geojson_file
)
geojson_file.close()
with open(geojson_file_path, 'w') as geojson_file:
json.dump(
{'features': [{'properties': {'id': '1', 'text': ''}}, {'properties': {'id': '2'}}]}, geojson_file
)
assert data_sources.get_items(datasource) == [
('1', '1', '1', {'id': '1', 'text': '1', 'properties': {'id': '1', 'text': ''}}),
('2', '2', '2', {'id': '2', 'text': '2', 'properties': {'id': '2'}}),
@ -704,35 +689,33 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# a geojson file with empty or no id
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump(
{
'features': [
{'properties': {'id': '', 'text': 'foo'}},
{'properties': {'text': 'bar'}},
{'properties': {'id': None}},
]
},
geojson_file,
)
geojson_file.close()
with open(geojson_file_path, 'w') as geojson_file:
json.dump(
{
'features': [
{'properties': {'id': '', 'text': 'foo'}},
{'properties': {'text': 'bar'}},
{'properties': {'id': None}},
]
},
geojson_file,
)
assert data_sources.get_items(datasource) == []
assert data_sources.get_structured_items(datasource) == []
# specify id_property
datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'id_property': 'gid'}
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump(
{
'features': [
{'properties': {'gid': '1', 'text': 'foo'}},
{'properties': {'gid': '2', 'text': 'bar'}},
]
},
geojson_file,
)
geojson_file.close()
with open(geojson_file_path, 'w') as geojson_file:
json.dump(
{
'features': [
{'properties': {'gid': '1', 'text': 'foo'}},
{'properties': {'gid': '2', 'text': 'bar'}},
]
},
geojson_file,
)
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'properties': {'gid': '1', 'text': 'foo'}},
{'id': '2', 'text': 'bar', 'properties': {'gid': '2', 'text': 'bar'}},
@ -745,17 +728,16 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# check with feature IDs
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump(
{
'features': [
{'id': '1', 'properties': {'text': 'foo'}},
{'id': '2', 'properties': {'text': 'bar'}},
]
},
geojson_file,
)
geojson_file.close()
with open(geojson_file_path, 'w') as geojson_file:
json.dump(
{
'features': [
{'id': '1', 'properties': {'text': 'foo'}},
{'id': '2', 'properties': {'text': 'bar'}},
]
},
geojson_file,
)
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': 'foo', 'properties': {'text': 'foo'}},
{'id': '2', 'text': 'bar', 'properties': {'text': 'bar'}},
@ -768,17 +750,16 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
'label_template_property': '{{ id }}: {{ text }}',
}
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump(
{
'features': [
{'properties': {'id': '1', 'text': 'foo'}},
{'properties': {'id': '2', 'text': 'bar'}},
]
},
geojson_file,
)
geojson_file.close()
with open(geojson_file_path, 'w') as geojson_file:
json.dump(
{
'features': [
{'properties': {'id': '1', 'text': 'foo'}},
{'properties': {'id': '2', 'text': 'bar'}},
]
},
geojson_file,
)
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': '1: foo', 'properties': {'id': '1', 'text': 'foo'}},
{'id': '2', 'text': '2: bar', 'properties': {'id': '2', 'text': 'bar'}},
@ -801,17 +782,16 @@ def test_geojson_datasource(pub, requests_pub, http_requests):
# unknown property or empty value
datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'label_template_property': '{{ label }}'}
get_request().datasources_cache = {}
geojson_file = open(geojson_file_path, 'w')
json.dump(
{
'features': [
{'properties': {'id': '1', 'text': 'foo', 'label': ''}},
{'properties': {'id': '2', 'text': 'bar'}},
]
},
geojson_file,
)
geojson_file.close()
with open(geojson_file_path, 'w') as geojson_file:
json.dump(
{
'features': [
{'properties': {'id': '1', 'text': 'foo', 'label': ''}},
{'properties': {'id': '2', 'text': 'bar'}},
]
},
geojson_file,
)
assert data_sources.get_structured_items(datasource) == [
{'id': '1', 'text': '1', 'properties': {'id': '1', 'text': 'foo', 'label': ''}},
{'id': '2', 'text': '2', 'properties': {'id': '2', 'text': 'bar'}},

View File

@ -217,9 +217,8 @@ def test_dict_index():
def test_ezt_script(pub):
os.mkdir(os.path.join(pub.app_dir, 'scripts'))
fd = open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w')
fd.write('''result = "Hello %s" % ("world" if not args else args[0])''')
fd.close()
with open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') as fd:
fd.write('''result = "Hello %s" % ("world" if not args else args[0])''')
vars = {'script': ScriptsSubstitutionProxy()}
template = Template()

View File

@ -375,11 +375,10 @@ def test_configure_authentication_methods(http_requests):
def test_deploy():
cleanup()
WcsPublisher.APP_DIR = alt_tempdir
fd = open(os.path.join(alt_tempdir, 'hobo.json'), 'w')
hobo_json = copy.deepcopy(HOBO_JSON)
del hobo_json['services'][1] # authentic
fd.write(json.dumps(HOBO_JSON))
fd.close()
with open(os.path.join(alt_tempdir, 'hobo.json'), 'w') as fd:
hobo_json = copy.deepcopy(HOBO_JSON)
del hobo_json['services'][1] # authentic
fd.write(json.dumps(HOBO_JSON))
hobo_cmd = CmdCheckHobos()
base_options = {}
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
@ -406,11 +405,10 @@ def test_deploy():
def test_configure_postgresql():
cleanup()
WcsPublisher.APP_DIR = alt_tempdir
fd = open(os.path.join(alt_tempdir, 'hobo.json'), 'w')
hobo_json = copy.deepcopy(HOBO_JSON)
del hobo_json['services'][1] # authentic
fd.write(json.dumps(HOBO_JSON))
fd.close()
with open(os.path.join(alt_tempdir, 'hobo.json'), 'w') as fd:
hobo_json = copy.deepcopy(HOBO_JSON)
del hobo_json['services'][1] # authentic
fd.write(json.dumps(HOBO_JSON))
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
@ -424,10 +422,9 @@ def test_configure_postgresql():
)
assert os.path.exists(os.path.join(alt_tempdir, 'wcs.example.net'))
fd = open(os.path.join(alt_tempdir, 'wcs.example.net', 'site-options.cfg'), 'w')
fd.write('[options]\n')
fd.write('postgresql = true\n')
fd.close()
with open(os.path.join(alt_tempdir, 'wcs.example.net', 'site-options.cfg'), 'w') as fd:
fd.write('[options]\n')
fd.write('postgresql = true\n')
cleanup()

View File

@ -209,25 +209,22 @@ def test_script_substitution_variable():
assert variables['script'].hello_world()
os.mkdir(os.path.join(pub.app_dir, 'scripts'))
fd = open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w')
fd.write('"""docstring"""\nresult = "hello world"')
fd.close()
with open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') as fd:
fd.write('"""docstring"""\nresult = "hello world"')
assert variables['script'].hello_world() == 'hello world'
assert Script('hello_world').__doc__ == 'docstring'
os.mkdir(os.path.join(pub.APP_DIR, 'scripts'))
fd = open(os.path.join(pub.APP_DIR, 'scripts', 'hello_world.py'), 'w')
fd.write('result = "hello global world"')
fd.close()
with open(os.path.join(pub.APP_DIR, 'scripts', 'hello_world.py'), 'w') as fd:
fd.write('result = "hello global world"')
assert variables['script'].hello_world() == 'hello world'
os.unlink(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'))
assert variables['script'].hello_world() == 'hello global world'
fd = open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w')
fd.write('result = site_url')
fd.close()
with open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') as fd:
fd.write('result = site_url')
assert variables['script'].hello_world() == 'http://example.net'
@ -404,9 +401,8 @@ def test_pagination():
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.set('options', 'default-page-size', '500')
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
pub.site_options.write(fd)
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
assert '(1-101/1000)' in get_texts(pagination_links(0, 101, 1000))
assert '(1-500/1000)' in get_texts(pagination_links(0, 500, 1000))
assert '(1-500/1000)' in get_texts(pagination_links(0, 501, 1000)) # 500 is the max
@ -705,6 +701,24 @@ def test_related_field_repr():
def test_find_vc_version():
import wcs.qommon.admin.menu
def mocked_popen(*args, **kwargs):
class Process:
returncode = 0
def communicate(self):
return (
b'''Desired=Unknown/Install/Remove/Purge/Hold
| Status=Not/Inst/Conf-files/Unpacked/halF-conf/Half-inst/trig-aWait/Trig-pend
|/ Err?=(none)/Reinst-required (Status,Err: uppercase=bad)
||/ Name Version Architecture Description
+++-==============-===============-============-=================================================
ii wcs 5.71-1~eob100+1 all web application to design and set up online forms
''',
'',
)
return Process()
with mock.patch('os.path.exists') as os_path_exists, mock.patch('subprocess.Popen') as popen:
def mocked_os_path_exists(path):
@ -712,25 +726,9 @@ def test_find_vc_version():
os_path_exists.side_effect = mocked_os_path_exists
def mocked_popen(*args, **kwargs):
class Process:
returncode = 0
def communicate(self):
return (
b'''Desired=Unknown/Install/Remove/Purge/Hold
| Status=Not/Inst/Conf-files/Unpacked/halF-conf/Half-inst/trig-aWait/Trig-pend
|/ Err?=(none)/Reinst-required (Status,Err: uppercase=bad)
||/ Name Version Architecture Description
+++-==============-===============-============-=================================================
ii wcs 5.71-1~eob100+1 all web application to design and set up online forms
''',
'',
)
return Process()
popen.side_effect = mocked_popen
handle = mock.MagicMock()
handle.__enter__.side_effect = mocked_popen
popen.return_value = handle
version = wcs.qommon.admin.menu._find_vc_version()
assert version == 'wcs 5.71-1~eob100+1 (Debian)'

View File

@ -177,9 +177,8 @@ def test_import_config_zip():
pub.write_cfg()
c = io.BytesIO()
z = zipfile.ZipFile(c, 'w')
z.writestr('config.pck', pickle.dumps({'language': {'language': 'fr'}, 'whatever': ['a', 'b', 'c']}))
z.close()
with zipfile.ZipFile(c, 'w') as z:
z.writestr('config.pck', pickle.dumps({'language': {'language': 'fr'}, 'whatever': ['a', 'b', 'c']}))
c.seek(0)
pub.import_zip(c)
@ -188,11 +187,10 @@ def test_import_config_zip():
assert pub.cfg['sp'] == {'what': 'ever'}
c = io.BytesIO()
z = zipfile.ZipFile(c, 'w')
z.writestr(
'config.json', json.dumps({'language': {'language': 'en'}, 'whatever2': ['a', 'b', {'c': 'd'}]})
)
z.close()
with zipfile.ZipFile(c, 'w') as z:
z.writestr(
'config.json', json.dumps({'language': {'language': 'en'}, 'whatever2': ['a', 'b', {'c': 'd'}]})
)
c.seek(0)
pub.import_zip(c)

View File

@ -81,19 +81,16 @@ def setup_idps(pub, idp_number=1):
'role': lasso.PROVIDER_ROLE_IDP,
}
filename = pub.cfg['idp'][base_id]['metadata']
fd = open(os.path.join(pub.app_dir, filename), 'w')
fd.write(metadata)
fd.close()
with open(os.path.join(pub.app_dir, filename), 'w') as fd:
fd.write(metadata)
filename = pub.cfg['idp'][base_id]['publickey']
fd = open(os.path.join(pub.app_dir, filename), 'w')
fd.write(idp_publickey)
fd.close()
with open(os.path.join(pub.app_dir, filename), 'w') as fd:
fd.write(idp_publickey)
filename = pub.cfg['idp'][base_id]['publickey'].replace('public', 'private')
fd = open(os.path.join(pub.app_dir, filename), 'w')
fd.write(idp_privatekey)
fd.close()
with open(os.path.join(pub.app_dir, filename), 'w') as fd:
fd.write(idp_privatekey)
pub.write_cfg()

View File

@ -3506,8 +3506,8 @@ def test_export_to_model_image(pub):
item.perform(formdata)
assert formdata.evolution[-1].parts[-1].base_filename == 'template.odt'
zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
with zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r') as zfile:
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
# check the image has been replaced by the one from the formdata
assert zinfo.file_size == len(image_data)
@ -3521,8 +3521,8 @@ def test_export_to_model_image(pub):
item.perform(formdata)
zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
with zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r') as zfile:
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
# check the original image has been left
assert zinfo.file_size == 580
@ -3574,8 +3574,8 @@ def test_export_to_model_backoffice_field(pub):
fbo1 = formdata.data['bo1']
assert fbo1.base_filename == 'template.odt'
assert fbo1.content_type == 'application/octet-stream'
zfile = zipfile.ZipFile(fbo1.get_file())
assert b'foo-export-to-bofile' in zfile.read('content.xml')
with zipfile.ZipFile(fbo1.get_file()) as zfile:
assert b'foo-export-to-bofile' in zfile.read('content.xml')
# no more 'bo1' backoffice field: do nothing
formdata = formdef.data_class()()

View File

@ -108,19 +108,18 @@ def create_temporary_pub(sql_mode=False, templates_mode=False, lazy_mode=False):
created = True
# always reset site-options.cfg
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
fd.write('[wscall-secrets]\n')
fd.write('idp.example.net = BAR\n')
fd.write('\n')
fd.write('[options]\n')
fd.write('formdef-captcha-option = true\n')
fd.write('formdef-appearance-keywords = true\n')
fd.write('workflow-resubmit-action = true\n')
if lazy_mode:
fd.write('force-lazy-mode = true\n')
if sql_mode:
fd.write('postgresql = true\n')
fd.close()
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write('[wscall-secrets]\n')
fd.write('idp.example.net = BAR\n')
fd.write('\n')
fd.write('[options]\n')
fd.write('formdef-captcha-option = true\n')
fd.write('formdef-appearance-keywords = true\n')
fd.write('workflow-resubmit-action = true\n')
if lazy_mode:
fd.write('force-lazy-mode = true\n')
if sql_mode:
fd.write('postgresql = true\n')
# make sure site options are not cached
pub.site_options = None

View File

@ -1519,11 +1519,13 @@ class FormDefPage(Directory):
all_forms = [x for x in all_forms if x.last_update_time < date]
self.fd = io.BytesIO()
t = tarfile.open('wcs.tar.gz', 'w:gz', fileobj=self.fd)
t.add(self.formdef.get_object_filename(), 'formdef')
for formdata in all_forms:
t.add(formdata.get_object_filename(), '%s/%s' % (self.formdef.url_name, str(formdata.id)))
t.close()
with tarfile.open('wcs.tar.gz', 'w:gz', fileobj=self.fd) as t:
t.add(self.formdef.get_object_filename(), 'formdef')
for formdata in all_forms:
t.add(
formdata.get_object_filename(),
'%s/%s' % (self.formdef.url_name, str(formdata.id)),
)
if form.get_widget('keep').parse() is False:
for f in all_forms:

View File

@ -848,12 +848,11 @@ class SettingsDirectory(QommonSettingsDirectory):
parent_theme_directory = os.path.dirname(theme_directory)
c = io.BytesIO()
z = zipfile.ZipFile(c, 'w')
for base, dummy, filenames in os.walk(theme_directory):
basetheme = base[len(parent_theme_directory) + 1 :]
for filename in filenames:
z.write(os.path.join(base, filename), os.path.join(basetheme, filename))
z.close()
with zipfile.ZipFile(c, 'w') as z:
for base, dummy, filenames in os.walk(theme_directory):
basetheme = base[len(parent_theme_directory) + 1 :]
for filename in filenames:
z.write(os.path.join(base, filename), os.path.join(basetheme, filename))
response = get_response()
response.set_content_type('application/zip')
@ -896,36 +895,36 @@ class SettingsDirectory(QommonSettingsDirectory):
def install_theme_from_file(self, fp):
try:
z = zipfile.ZipFile(fp, 'r')
with zipfile.ZipFile(fp, 'r') as z:
theme_dir = os.path.join(get_publisher().app_dir, 'themes')
filename_list = [x for x in z.namelist() if x[0] != '/' and x[-1] != '/']
if len(filename_list) == 0:
get_session().message = ('error', _('Empty theme file.'))
return redirect('themes')
theme_name = filename_list[0].split('/')[0]
if ('%s/desc.xml' % theme_name) not in filename_list:
get_session().message = ('error', _('Theme is missing a desc.xml file.'))
return redirect('themes')
desc_xml = z.read('%s/desc.xml' % theme_name)
theme_dict = template.get_theme_dict(io.StringIO(force_text(desc_xml)))
if theme_dict.get('name') != theme_name:
get_session().message = ('error', _('desc.xml is missing a name attribute.'))
return redirect('themes')
if os.path.exists(os.path.join(theme_dir, theme_name)):
shutil.rmtree(os.path.join(theme_dir, theme_name))
for f in z.namelist():
if f[-1] == '/':
continue
path = os.path.join(theme_dir, f)
data = z.read(f)
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
with open(path, 'wb') as _f:
_f.write(data)
return redirect('themes')
except Exception as e:
get_session().message = ('error', _('Failed to read theme file. (%s)') % str(e))
return redirect('themes')
theme_dir = os.path.join(get_publisher().app_dir, 'themes')
filename_list = [x for x in z.namelist() if x[0] != '/' and x[-1] != '/']
if len(filename_list) == 0:
get_session().message = ('error', _('Empty theme file.'))
return redirect('themes')
theme_name = filename_list[0].split('/')[0]
if ('%s/desc.xml' % theme_name) not in filename_list:
get_session().message = ('error', _('Theme is missing a desc.xml file.'))
return redirect('themes')
desc_xml = z.read('%s/desc.xml' % theme_name)
theme_dict = template.get_theme_dict(io.StringIO(force_text(desc_xml)))
if theme_dict.get('name') != theme_name:
get_session().message = ('error', _('desc.xml is missing a name attribute.'))
return redirect('themes')
if os.path.exists(os.path.join(theme_dir, theme_name)):
shutil.rmtree(os.path.join(theme_dir, theme_name))
for f in z.namelist():
if f[-1] == '/':
continue
path = os.path.join(theme_dir, f)
data = z.read(f)
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
open(path, 'wb').write(data)
z.close()
return redirect('themes')
def install_theme_from_url(self, url):
try:
@ -1028,83 +1027,82 @@ class SettingsDirectory(QommonSettingsDirectory):
def export(self, job):
c = io.BytesIO()
z = zipfile.ZipFile(c, 'w')
for d in self.dirs:
if d not in (
'categories',
'carddef_categories',
'wscalls',
'mail-templates',
'apiaccess',
):
continue
path = os.path.join(self.app_dir, d)
if not os.path.exists(path):
continue
for f in os.listdir(path):
if f in ('.indexes', '.max_id'):
with zipfile.ZipFile(c, 'w') as z:
for d in self.dirs:
if d not in (
'categories',
'carddef_categories',
'wscalls',
'mail-templates',
'apiaccess',
):
continue
z.write(os.path.join(path, f), os.path.join(d, f))
if 'datasources' in self.dirs:
for ds in NamedDataSource.select():
if ds.external == 'agenda':
path = os.path.join(self.app_dir, d)
if not os.path.exists(path):
continue
node = ds.export_to_xml(include_id=True)
misc.indent_xml(node)
z.writestr(
os.path.join('datasources', str(ds.id)),
ET.tostring(node),
)
if 'formdefs' in self.dirs:
for formdef in FormDef.select():
node = formdef.export_to_xml(include_id=True)
misc.indent_xml(node)
z.writestr(
os.path.join('formdefs_xml', str(formdef.id)),
b'<?xml version="1.0"?>\n' + ET.tostring(node),
)
if 'carddefs' in self.dirs:
for formdef in CardDef.select():
node = formdef.export_to_xml(include_id=True)
misc.indent_xml(node)
z.writestr(
os.path.join('carddefs_xml', str(formdef.id)),
b'<?xml version="1.0"?>\n' + ET.tostring(node),
)
if 'workflows' in self.dirs:
for workflow in Workflow.select():
node = workflow.export_to_xml(include_id=True)
misc.indent_xml(node)
z.writestr(
os.path.join('workflows_xml', str(workflow.id)),
b'<?xml version="1.0"?>\n' + ET.tostring(node),
)
if 'blockdefs' in self.dirs:
for blockdef in BlockDef.select():
node = blockdef.export_to_xml(include_id=True)
misc.indent_xml(node)
z.writestr(
os.path.join('blockdefs_xml', str(blockdef.id)),
b'<?xml version="1.0"?>\n' + ET.tostring(node),
)
if 'roles' in self.dirs:
for role in get_publisher().role_class.select():
node = role.export_to_xml(include_id=True)
misc.indent_xml(node)
z.writestr(
os.path.join('roles_xml', str(role.id)),
b'<?xml version="1.0"?>\n' + ET.tostring(node),
)
for f in os.listdir(path):
if f in ('.indexes', '.max_id'):
continue
z.write(os.path.join(path, f), os.path.join(d, f))
if 'datasources' in self.dirs:
for ds in NamedDataSource.select():
if ds.external == 'agenda':
continue
node = ds.export_to_xml(include_id=True)
misc.indent_xml(node)
z.writestr(
os.path.join('datasources', str(ds.id)),
ET.tostring(node),
)
if 'formdefs' in self.dirs:
for formdef in FormDef.select():
node = formdef.export_to_xml(include_id=True)
misc.indent_xml(node)
z.writestr(
os.path.join('formdefs_xml', str(formdef.id)),
b'<?xml version="1.0"?>\n' + ET.tostring(node),
)
if 'carddefs' in self.dirs:
for formdef in CardDef.select():
node = formdef.export_to_xml(include_id=True)
misc.indent_xml(node)
z.writestr(
os.path.join('carddefs_xml', str(formdef.id)),
b'<?xml version="1.0"?>\n' + ET.tostring(node),
)
if 'workflows' in self.dirs:
for workflow in Workflow.select():
node = workflow.export_to_xml(include_id=True)
misc.indent_xml(node)
z.writestr(
os.path.join('workflows_xml', str(workflow.id)),
b'<?xml version="1.0"?>\n' + ET.tostring(node),
)
if 'blockdefs' in self.dirs:
for blockdef in BlockDef.select():
node = blockdef.export_to_xml(include_id=True)
misc.indent_xml(node)
z.writestr(
os.path.join('blockdefs_xml', str(blockdef.id)),
b'<?xml version="1.0"?>\n' + ET.tostring(node),
)
if 'roles' in self.dirs:
for role in get_publisher().role_class.select():
node = role.export_to_xml(include_id=True)
misc.indent_xml(node)
z.writestr(
os.path.join('roles_xml', str(role.id)),
b'<?xml version="1.0"?>\n' + ET.tostring(node),
)
if self.settings:
z.write(os.path.join(self.app_dir, 'config.pck'), 'config.pck')
for f in os.listdir(self.app_dir):
if f.startswith('idp-') and os.path.splitext(f)[-1] in ('.pem', '.xml'):
z.write(os.path.join(self.app_dir, f), f)
if os.path.exists(os.path.join(self.app_dir, 'config')):
for f in os.listdir(os.path.join(self.app_dir, 'config')):
z.write(os.path.join(self.app_dir, 'config', f), os.path.join('config', f))
z.close()
if self.settings:
z.write(os.path.join(self.app_dir, 'config.pck'), 'config.pck')
for f in os.listdir(self.app_dir):
if f.startswith('idp-') and os.path.splitext(f)[-1] in ('.pem', '.xml'):
z.write(os.path.join(self.app_dir, f), f)
if os.path.exists(os.path.join(self.app_dir, 'config')):
for f in os.listdir(os.path.join(self.app_dir, 'config')):
z.write(os.path.join(self.app_dir, 'config', f), os.path.join('config', f))
job.file_content = c.getvalue()
job.store()

View File

@ -275,10 +275,10 @@ def graphviz(workflow, url_prefix='', select=None, svg=True, include=False):
out = out.getvalue()
if svg:
try:
process = Popen(['dot', '-Tsvg'], stdin=PIPE, stdout=PIPE)
out = process.communicate(force_bytes(out))[0]
if process.returncode != 0:
return ''
with Popen(['dot', '-Tsvg'], stdin=PIPE, stdout=PIPE) as process:
out = process.communicate(force_bytes(out))[0]
if process.returncode != 0:
return ''
except OSError:
return ''
out = graphviz_post_treatment(out, revert_colours, include=include)

View File

@ -2997,24 +2997,22 @@ class FormBackOfficeStatusPage(FormStatusPage):
def download_as_zip(self):
formdata = self.filled
zip_content = io.BytesIO()
zip_file = zipfile.ZipFile(zip_content, 'w')
counter = {'value': 0}
def add_zip_file(upload):
def add_zip_file(upload, zip_file):
counter['value'] += 1
filename = '%s_%s' % (counter['value'], upload.base_filename)
zip_file.writestr(filename, upload.get_content())
for value in formdata.data.values():
if isinstance(value, PicklableUpload):
add_zip_file(value)
if isinstance(value, dict) and isinstance(value.get('data'), list):
for subvalue in value.get('data'):
for subvalue_elem in subvalue.values():
if isinstance(subvalue_elem, PicklableUpload):
add_zip_file(subvalue_elem)
zip_file.close()
with zipfile.ZipFile(zip_content, 'w') as zip_file:
for value in formdata.data.values():
if isinstance(value, PicklableUpload):
add_zip_file(value, zip_file)
if isinstance(value, dict) and isinstance(value.get('data'), list):
for subvalue in value.get('data'):
for subvalue_elem in subvalue.values():
if isinstance(subvalue_elem, PicklableUpload):
add_zip_file(subvalue_elem, zip_file)
response = get_response()
response.set_content_type('application/zip')

View File

@ -48,17 +48,15 @@ class CmdBackup(Command):
os.mkdir(backup_dir)
backup_filepath = os.path.join(backup_dir, 'backup-%s%s%s-%s%s%s.tar.gz' % time.localtime()[:6])
backup = tarfile.open(backup_filepath, mode='w:gz')
for basename, dirnames, filenames in os.walk(pub.app_dir):
if 'backups' in dirnames: # do not recurse in backup directory
idx = dirnames.index('backups')
dirnames[idx : idx + 1] = []
for filename in filenames:
backup.add(
os.path.join(basename, filename), os.path.join(basename, filename)[len(pub.app_dir) :]
)
backup.close()
with tarfile.open(backup_filepath, mode='w:gz') as backup:
for basename, dirnames, filenames in os.walk(pub.app_dir):
if 'backups' in dirnames: # do not recurse in backup directory
idx = dirnames.index('backups')
dirnames[idx : idx + 1] = []
for filename in filenames:
backup.add(
os.path.join(basename, filename), os.path.join(basename, filename)[len(pub.app_dir) :]
)
CmdBackup.register()

View File

@ -100,13 +100,12 @@ class Command(BaseCommand):
sql.SqlUser.fix_sequences()
if errors:
error_log = open('error_user.log', 'w')
for user, trace in errors:
error_log.write('user_id %s\n' % user.id)
error_log.write(trace)
error_log.write('-' * 80)
error_log.write('\n\n')
error_log.close()
with open('error_user.log', 'w') as error_log:
for user, trace in errors:
error_log.write('user_id %s\n' % user.id)
error_log.write(trace)
error_log.write('-' * 80)
error_log.write('\n\n')
print('There were some errors, see error_user.log for details.')
def store_forms(self):
@ -138,15 +137,14 @@ class Command(BaseCommand):
sql_data_class.fix_sequences()
if errors:
error_log = open('error_formdata.log', 'w')
for formdata, trace in errors:
error_log.write(
'%s %s - %s\n' % (formdata.formdef, formdata.id, localstrftime(formdata.receipt_time))
)
error_log.write(trace)
error_log.write('-' * 80)
error_log.write('\n\n')
error_log.close()
with open('error_formdata.log', 'w') as error_log:
for formdata, trace in errors:
error_log.write(
'%s %s - %s\n' % (formdata.formdef, formdata.id, localstrftime(formdata.receipt_time))
)
error_log.write(trace)
error_log.write('-' * 80)
error_log.write('\n\n')
print('There were some errors, see error_formdata.log.')
def update_progress(self, progress, num_columns=120):

View File

@ -50,12 +50,11 @@ class CmdRestore(Command):
return 1
backup_filepath = sub_options.filename
backup = tarfile.open(backup_filepath, mode='r:*')
for tarinfo in backup:
if os.path.normpath(tarinfo.name).startswith('..'):
continue
backup.extract(tarinfo, pub.app_dir)
backup.close()
with tarfile.open(backup_filepath, mode='r:*') as backup:
for tarinfo in backup:
if os.path.normpath(tarinfo.name).startswith('..'):
continue
backup.extract(tarinfo, pub.app_dir)
CmdRestore.register()

View File

@ -162,7 +162,6 @@ class WcsPublisher(QommonPublisher):
self.set_session_manager(self.session_manager_class(session_class=self.session_class))
def import_zip(self, fd):
z = zipfile.ZipFile(fd)
results = {
'formdefs': 0,
'carddefs': 0,
@ -203,129 +202,129 @@ class WcsPublisher(QommonPublisher):
rv[key] = value
return rv
for f in z.namelist():
if f in ('.indexes', '.max_id'):
continue
if os.path.dirname(f) in (
'formdefs_xml',
'carddefs_xml',
'workflows_xml',
'blockdefs_xml',
'roles_xml',
):
continue
path = os.path.join(self.app_dir, f)
if not os.path.exists(os.path.dirname(path)):
os.mkdir(os.path.dirname(path))
if not os.path.basename(f):
# skip directories
continue
data = z.read(f)
if f in ('config.pck', 'config.json'):
results['settings'] = 1
if f == 'config.pck':
d = pickle.loads(data)
else:
d = json.loads(force_text(data), object_hook=_decode_dict)
if 'sp' in self.cfg:
current_sp = self.cfg['sp']
else:
current_sp = None
self.cfg = d
if current_sp:
self.cfg['sp'] = current_sp
elif 'sp' in self.cfg:
del self.cfg['sp']
self.write_cfg()
continue
open(path, 'wb').write(data)
if os.path.split(f)[0] in results:
results[os.path.split(f)[0]] += 1
with zipfile.ZipFile(fd) as z:
for f in z.namelist():
if f in ('.indexes', '.max_id'):
continue
if os.path.dirname(f) in (
'formdefs_xml',
'carddefs_xml',
'workflows_xml',
'blockdefs_xml',
'roles_xml',
):
continue
path = os.path.join(self.app_dir, f)
if not os.path.exists(os.path.dirname(path)):
os.mkdir(os.path.dirname(path))
if not os.path.basename(f):
# skip directories
continue
data = z.read(f)
if f in ('config.pck', 'config.json'):
results['settings'] = 1
if f == 'config.pck':
d = pickle.loads(data)
else:
d = json.loads(force_text(data), object_hook=_decode_dict)
if 'sp' in self.cfg:
current_sp = self.cfg['sp']
else:
current_sp = None
self.cfg = d
if current_sp:
self.cfg['sp'] = current_sp
elif 'sp' in self.cfg:
del self.cfg['sp']
self.write_cfg()
continue
open(path, 'wb').write(data)
if os.path.split(f)[0] in results:
results[os.path.split(f)[0]] += 1
# second pass, fields blocks
from wcs.blocks import BlockDef
# second pass, fields blocks
from wcs.blocks import BlockDef
for f in z.namelist():
if os.path.dirname(f) == 'blockdefs_xml' and os.path.basename(f):
blockdef = BlockDef.import_from_xml(z.open(f), include_id=True)
blockdef.store()
results['blockdefs'] += 1
for f in z.namelist():
if os.path.dirname(f) == 'blockdefs_xml' and os.path.basename(f):
blockdef = BlockDef.import_from_xml(z.open(f), include_id=True)
blockdef.store()
results['blockdefs'] += 1
# third pass, workflows
from wcs.workflows import Workflow
# third pass, workflows
from wcs.workflows import Workflow
for f in z.namelist():
if os.path.dirname(f) == 'workflows_xml' and os.path.basename(f):
workflow = Workflow.import_from_xml(z.open(f), include_id=True, check_datasources=False)
workflow.store()
results['workflows'] += 1
for f in z.namelist():
if os.path.dirname(f) == 'workflows_xml' and os.path.basename(f):
workflow = Workflow.import_from_xml(z.open(f), include_id=True, check_datasources=False)
workflow.store()
results['workflows'] += 1
# fourth pass, forms and cards
from wcs.carddef import CardDef
from wcs.formdef import FormDef
# fourth pass, forms and cards
from wcs.carddef import CardDef
from wcs.formdef import FormDef
formdefs = []
carddefs = []
for f in z.namelist():
if os.path.dirname(f) == 'formdefs_xml' and os.path.basename(f):
formdef = FormDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
formdef.store()
formdefs.append(formdef)
results['formdefs'] += 1
if os.path.dirname(f) == 'carddefs_xml' and os.path.basename(f):
carddef = CardDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
carddef.store()
carddefs.append(carddef)
results['carddefs'] += 1
# sixth pass, roles
roles = []
for f in z.namelist():
if os.path.dirname(f) == 'roles_xml' and os.path.basename(f):
role = self.role_class.import_from_xml(z.open(f), include_id=True)
role.store()
roles.append(role)
results['roles'] += 1
# rebuild indexes for imported objects
for k, v in results.items():
if k == 'settings':
continue
if v == 0:
continue
klass = None
if k == 'formdefs':
from .formdef import FormDef
klass = FormDef
elif k == 'carddefs':
from .carddef import CardDef
klass = CardDef
elif k == 'blockdefs':
klass = BlockDef
elif k == 'categories':
from .categories import Category
klass = Category
elif k == 'roles':
klass = self.role_class
elif k == 'workflows':
klass = Workflow
if klass:
klass.rebuild_indexes()
if k == 'formdefs':
# in case of formdefs, we store them anew in case SQL changes
# are required.
for formdef in formdefs or FormDef.select():
formdefs = []
carddefs = []
for f in z.namelist():
if os.path.dirname(f) == 'formdefs_xml' and os.path.basename(f):
formdef = FormDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
formdef.store()
elif k == 'carddefs':
# ditto for cards
for carddef in carddefs or CardDef.select():
formdefs.append(formdef)
results['formdefs'] += 1
if os.path.dirname(f) == 'carddefs_xml' and os.path.basename(f):
carddef = CardDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
carddef.store()
carddefs.append(carddef)
results['carddefs'] += 1
# sixth pass, roles
roles = []
for f in z.namelist():
if os.path.dirname(f) == 'roles_xml' and os.path.basename(f):
role = self.role_class.import_from_xml(z.open(f), include_id=True)
role.store()
roles.append(role)
results['roles'] += 1
# rebuild indexes for imported objects
for k, v in results.items():
if k == 'settings':
continue
if v == 0:
continue
klass = None
if k == 'formdefs':
from .formdef import FormDef
klass = FormDef
elif k == 'carddefs':
from .carddef import CardDef
klass = CardDef
elif k == 'blockdefs':
klass = BlockDef
elif k == 'categories':
from .categories import Category
klass = Category
elif k == 'roles':
klass = self.role_class
elif k == 'workflows':
klass = Workflow
if klass:
klass.rebuild_indexes()
if k == 'formdefs':
# in case of formdefs, we store them anew in case SQL changes
# are required.
for formdef in formdefs or FormDef.select():
formdef.store()
elif k == 'carddefs':
# ditto for cards
for carddef in carddefs or CardDef.select():
carddef.store()
z.close()
return results
def initialize_sql(self):

View File

@ -45,12 +45,12 @@ def _find_vc_version():
if os.path.exists('/etc/debian_version'):
# debian case
try:
process = subprocess.Popen(
with subprocess.Popen(
['dpkg', '-l', package], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
version = process.communicate()[0].splitlines()[-1].split()[2]
if process.returncode == 0:
return "%s %s (Debian)" % (package, version.decode())
) as process:
version = process.communicate()[0].splitlines()[-1].split()[2]
if process.returncode == 0:
return "%s %s (Debian)" % (package, version.decode())
except Exception:
pass
return None
@ -66,13 +66,13 @@ def _find_vc_version():
if os.path.exists(os.path.join(srcdir, '.git')):
try:
process = subprocess.Popen(
with subprocess.Popen(
['git', 'log', '--pretty=oneline', '-1'], stdout=subprocess.PIPE, cwd=srcdir
)
output = process.communicate()[0]
) as process:
output = process.communicate()[0]
rev = str(output.split()[0].decode('ascii'))
process = subprocess.Popen(['git', 'branch'], stdout=subprocess.PIPE, cwd=srcdir)
output = process.communicate()[0]
with subprocess.Popen(['git', 'branch'], stdout=subprocess.PIPE, cwd=srcdir) as process:
output = process.communicate()[0]
starred_line = [x for x in output.splitlines() if x.startswith(b'*')][0]
branch = str(starred_line.split()[1].decode('ascii'))
url = "https://repos.entrouvert.org/%s.git/commit/?id=%s" % (package, rev)

View File

@ -660,7 +660,8 @@ class UploadedFile:
file_path = self.build_file_path()
if not os.path.exists(self.dir_path()):
os.mkdir(self.dir_path())
open(file_path, 'wb').write(content)
with open(file_path, 'wb') as f:
f.write(content)
def dir_path(self):
return os.path.join(get_publisher().app_dir, self.directory)
@ -669,7 +670,7 @@ class UploadedFile:
return os.path.join(get_publisher().app_dir, self.directory, self.filename)
def get_file(self):
return open(self.build_file_path(), 'rb')
return open(self.build_file_path(), 'rb') # pylint: disable=consider-using-with
def get_content(self):
return self.get_file().read()

View File

@ -679,8 +679,7 @@ def get_thumbnail(filepath, content_type=None):
except subprocess.CalledProcessError:
raise ThumbnailError()
else:
fp = open(filepath, 'rb')
fp = open(filepath, 'rb') # pylint: disable=consider-using-with
try:
image = Image.open(fp)
try:

View File

@ -157,22 +157,21 @@ class Workbook:
return ET.tostring(self.get_content_node(), 'utf-8')
def save(self, output):
z = zipfile.ZipFile(output, 'w')
z.writestr('content.xml', self.get_content())
z.writestr('styles.xml', self.get_styles())
z.writestr('mimetype', 'application/vnd.oasis.opendocument.spreadsheet')
z.writestr(
'META-INF/manifest.xml',
'''<?xml version="1.0" encoding="UTF-8"?>
<manifest:manifest xmlns:manifest="urn:oasis:names:tc:opendocument:xmlns:manifest:1.0">
<manifest:file-entry manifest:full-path="/" manifest:media-type="application/vnd.oasis.opendocument.spreadsheet"/>
<manifest:file-entry manifest:full-path="styles.xml" manifest:media-type="text/xml"/>
<manifest:file-entry manifest:full-path="content.xml" manifest:media-type="text/xml"/>
<manifest:file-entry manifest:full-path="META-INF/manifest.xml" manifest:media-type="text/xml"/>
<manifest:file-entry manifest:full-path="mimetype" manifest:media-type="text/plain"/>
</manifest:manifest>''',
)
z.close()
with zipfile.ZipFile(output, 'w') as z:
z.writestr('content.xml', self.get_content())
z.writestr('styles.xml', self.get_styles())
z.writestr('mimetype', 'application/vnd.oasis.opendocument.spreadsheet')
z.writestr(
'META-INF/manifest.xml',
'''<?xml version="1.0" encoding="UTF-8"?>
<manifest:manifest xmlns:manifest="urn:oasis:names:tc:opendocument:xmlns:manifest:1.0">
<manifest:file-entry manifest:full-path="/" manifest:media-type="application/vnd.oasis.opendocument.spreadsheet"/>
<manifest:file-entry manifest:full-path="styles.xml" manifest:media-type="text/xml"/>
<manifest:file-entry manifest:full-path="content.xml" manifest:media-type="text/xml"/>
<manifest:file-entry manifest:full-path="META-INF/manifest.xml" manifest:media-type="text/xml"/>
<manifest:file-entry manifest:full-path="mimetype" manifest:media-type="text/plain"/>
</manifest:manifest>''',
)
class WorkSheet:

View File

@ -501,7 +501,7 @@ class StorableObject:
def get_filename(cls, filename, ignore_errors=False, ignore_migration=False, **kwargs):
fd = None
try:
fd = open(force_bytes(filename, 'utf-8'), 'rb')
fd = open(force_bytes(filename, 'utf-8'), 'rb') # pylint: disable=consider-using-with
o = cls.storage_load(fd, **kwargs)
except IOError:
if ignore_errors:

View File

@ -42,7 +42,7 @@ class PicklableUpload(Upload):
return self.__dict__.get('fp')
elif getattr(self, 'qfilename', None):
basedir = os.path.join(get_publisher().app_dir, 'uploads')
self.fp = open(os.path.join(basedir, self.qfilename), 'rb')
self.fp = open(os.path.join(basedir, self.qfilename), 'rb') # pylint: disable=consider-using-with
return self.fp
return None
@ -125,11 +125,10 @@ class UploadStorage:
upload.__class__ = PicklableUpload
dirname = os.path.join(get_publisher().app_dir, 'tempfiles')
filename = os.path.join(dirname, upload.token)
fd = open(filename, 'wb')
upload.get_file_pointer().seek(0)
fd.write(upload.get_file_pointer().read())
upload.size = fd.tell()
fd.close()
with open(filename, 'wb') as fd:
upload.get_file_pointer().seek(0)
fd.write(upload.get_file_pointer().read())
upload.size = fd.tell()
def get_tempfile(self, temp_data):
value = PicklableUpload(temp_data['orig_filename'], temp_data['content_type'], temp_data['charset'])
@ -138,7 +137,7 @@ class UploadStorage:
filename = os.path.join(dirname, temp_data['unsigned_token'])
value.token = temp_data['token']
value.file_size = os.path.getsize(filename)
value.fp = open(filename, 'rb')
value.fp = open(filename, 'rb') # pylint: disable=consider-using-with
return value
def save(self, upload):

View File

@ -65,15 +65,12 @@ _locks = weakref.WeakValueDictionary()
def lock_file(path, **kwargs):
_locks_lock.acquire()
try:
with _locks_lock:
lock = _locks.get(path)
if lock is None:
lock = _create_lock_file(path, **kwargs)
_locks[path] = lock
return lock
finally:
_locks_lock.release()
def _create_lock_file(path, **kwargs):
@ -139,10 +136,10 @@ class _ThreadLock:
def acquire(self):
if self._timeout is None:
self._lock.acquire()
self._lock.acquire() # pylint: disable=consider-using-with
else:
_acquire_non_blocking(
acquire=lambda: self._lock.acquire(False),
acquire=lambda: self._lock.acquire(False), # pylint: disable=consider-using-with
timeout=self._timeout,
retry_period=self._retry_period,
path=self._path,
@ -162,7 +159,7 @@ class _LockFile:
def acquire(self):
if self._file is None:
self._file = open(self._path, "w")
self._file = open(self._path, "w") # pylint: disable=consider-using-with
if self._timeout is None:
_lock_file_blocking(self._file)
else:

View File

@ -45,9 +45,11 @@ def _call_openssl(args):
Return a tuple made of the return code and the stdout output
"""
try:
process = subprocess.Popen(args=[_openssl] + args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = process.communicate()[0]
return process.returncode, output
with subprocess.Popen(
args=[_openssl] + args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
) as process:
output = process.communicate()[0]
return process.returncode, output
except OSError:
return 1, None

View File

@ -135,30 +135,28 @@ def transform_opendocument(instream, outstream, process):
format, parse context.xml with element tree and apply process to its root
node.
"""
zin = zipfile.ZipFile(instream, mode='r')
zout = zipfile.ZipFile(outstream, mode='w')
new_images = {}
assert 'content.xml' in zin.namelist()
for filename in zin.namelist():
# first pass to process meta.xml, content.xml and styles.xml
if filename not in ('meta.xml', 'content.xml', 'styles.xml'):
continue
content = zin.read(filename)
root = ET.fromstring(content)
process(root, new_images)
content = ET.tostring(root)
zout.writestr(filename, content)
for filename in zin.namelist():
# second pass to copy/replace other files
if filename in ('meta.xml', 'content.xml', 'styles.xml'):
continue
if filename in new_images:
content = new_images[filename].get_content()
else:
with zipfile.ZipFile(instream, mode='r') as zin, zipfile.ZipFile(outstream, mode='w') as zout:
new_images = {}
assert 'content.xml' in zin.namelist()
for filename in zin.namelist():
# first pass to process meta.xml, content.xml and styles.xml
if filename not in ('meta.xml', 'content.xml', 'styles.xml'):
continue
content = zin.read(filename)
zout.writestr(filename, content)
zout.close()
root = ET.fromstring(content)
process(root, new_images)
content = ET.tostring(root)
zout.writestr(filename, content)
for filename in zin.namelist():
# second pass to copy/replace other files
if filename in ('meta.xml', 'content.xml', 'styles.xml'):
continue
if filename in new_images:
content = new_images[filename].get_content()
else:
content = zin.read(filename)
zout.writestr(filename, content)
def is_opendocument(stream):

View File

@ -229,7 +229,7 @@ class AttachmentEvolutionPart:
def get_file_pointer(self):
if self.filename.startswith('uuid-'):
return None
return open(self.filename, 'rb')
return open(self.filename, 'rb') # pylint: disable=consider-using-with
def __getstate__(self):
odict = self.__dict__.copy()