misc: fix pylint consider-using-with (#55264)

This commit is contained in:
Lauréline Guérin 2021-07-01 15:19:32 +02:00
parent e7a045913a
commit a5a1ddfee7
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
38 changed files with 297 additions and 197 deletions

View File

@ -12,7 +12,6 @@ disable=
broad-except,
consider-using-dict-comprehension,
consider-using-set-comprehension,
consider-using-with,
cyclic-import,
duplicate-code,
fixme,

View File

@ -98,7 +98,8 @@ def test_admin_for_all(pub):
role = create_role(pub)
try:
open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w').close()
with open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w'):
pass # create empty file
resp = get_app(pub).get('/backoffice/', status=200)
# check there is a CSS class
assert resp.pyquery.find('body.admin-for-all')

View File

@ -623,6 +623,7 @@ def test_settings_idp(pub):
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
idp_metadata_filename = os.path.join(os.path.dirname(__file__), '..', 'idp_metadata.xml')
# pylint: disable=consider-using-with
urlopen.side_effect = lambda *args: open(idp_metadata_filename, 'rb')
resp = app.get('/backoffice/settings/identification/idp/idp/')
resp = resp.click('Create new from remote URL')

View File

@ -35,12 +35,13 @@ def pub(request, emails):
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''\
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''\
[api-secrets]
coucou = 1234
'''
)
)
return pub
@ -84,12 +85,13 @@ def no_request_pub(request):
pub.app_dir = os.path.join(pub.APP_DIR, 'example.net')
pub.set_config()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''
[wscall-secrets]
api.example.com = 1234
'''
)
)
return pub
@ -324,7 +326,8 @@ def test_is_url_signed_check_nonce(pub, local_user, freezer):
pub.site_options.add_section('api-secrets')
pub.site_options.set('api-secrets', ORIG, KEY)
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
# test clean_nonces do not bark when nonces directory is empty
if os.path.exists(os.path.join(pub.app_dir, 'nonces')):
shutil.rmtree(os.path.join(pub.app_dir, 'nonces'))

View File

@ -31,12 +31,13 @@ def pub(request, emails):
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''\
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''\
[api-secrets]
coucou = 1234
'''
)
)
return pub
@ -144,7 +145,8 @@ def test_reverse_geocoding(pub):
pub.site_options.add_section('options')
pub.site_options.set('options', 'nominatim_reverse_zoom_level', '16')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert (
urlopen.call_args[0][0]
@ -152,7 +154,8 @@ def test_reverse_geocoding(pub):
)
pub.site_options.set('options', 'nominatim_key', 'KEY')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert (
urlopen.call_args[0][0]
@ -162,7 +165,8 @@ def test_reverse_geocoding(pub):
pub.site_options.set(
'options', 'reverse_geocoding_service_url', 'http://reverse.example.net/?param=value'
)
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
assert (
urlopen.call_args[0][0]
@ -185,7 +189,8 @@ def test_geocoding(pub):
pub.site_options.add_section('options')
pub.site_options.set('options', 'map-bounds-top-left', '1.23;2.34')
pub.site_options.set('options', 'map-bounds-bottom-right', '2.34;3.45')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = get_app(pub).get('/api/geocoding?q=test')
assert urlopen.call_args[0][0] == (
'https://nominatim.entrouvert.org/search?viewbox=2.34,1.23,3.45,2.34&bounded=1&'
@ -195,7 +200,8 @@ def test_geocoding(pub):
pub.site_options.set('options', 'nominatim_key', 'KEY')
pub.site_options.set('options', 'map-bounds-top-left', '')
pub.site_options.set('options', 'map-bounds-bottom-right', '')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = get_app(pub).get('/api/geocoding?q=test')
assert (
urlopen.call_args[0][0]
@ -204,7 +210,8 @@ def test_geocoding(pub):
pub.site_options.set('options', 'map-bounds-top-left', '1.23;2.34')
pub.site_options.set('options', 'map-bounds-bottom-right', '2.34;3.45')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = get_app(pub).get('/api/geocoding?q=test')
assert urlopen.call_args[0][0] == (
'https://nominatim.entrouvert.org/search?key=KEY&viewbox=2.34,1.23,3.45,2.34&bounded=1&'
@ -212,7 +219,8 @@ def test_geocoding(pub):
)
pub.site_options.set('options', 'geocoding_service_url', 'http://reverse.example.net/?param=value')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = get_app(pub).get('/api/geocoding?q=test')
assert (
urlopen.call_args[0][0]

View File

@ -42,12 +42,13 @@ def pub(request, emails):
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''\
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''\
[api-secrets]
coucou = 1234
'''
)
)
return pub

View File

@ -28,12 +28,13 @@ def pub(request, emails):
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''\
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''\
[api-secrets]
coucou = 1234
'''
)
)
return pub

View File

@ -33,12 +33,13 @@ def pub(request, emails):
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''\
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''\
[api-secrets]
coucou = 1234
'''
)
)
return pub
@ -245,7 +246,8 @@ def test_api_ods_formdata_custom_view(pub, local_user):
# check it now gets the data
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
ods_sheet = ET.parse(zipf.open('content.xml'))
with zipf.open('content.xml') as fd:
ods_sheet = ET.parse(fd)
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 11
pub.custom_view_class.wipe()
@ -259,7 +261,8 @@ def test_api_ods_formdata_custom_view(pub, local_user):
resp = get_app(pub).get(sign_uri('/api/forms/test/ods/custom-view', user=local_user))
with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
ods_sheet = ET.parse(zipf.open('content.xml'))
with zipf.open('content.xml') as fd:
ods_sheet = ET.parse(fd)
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 21

View File

@ -1083,7 +1083,8 @@ def test_api_geojson_formdata(pub, local_user):
pub.load_site_options()
pub.site_options.add_section('api-http-auth-geojson')
pub.site_options.set('api-http-auth-geojson', 'user', 'password')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = app.get('/api/forms/test/geojson?email=%s' % local_user.email)
assert 'features' in resp.json
@ -1149,7 +1150,8 @@ def test_api_ods_formdata(pub, local_user):
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet'
with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
ods_sheet = ET.parse(zipf.open('content.xml'))
with zipf.open('content.xml') as fd:
ods_sheet = ET.parse(fd)
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 311
# check it's not subject to category permissions
@ -1573,7 +1575,8 @@ def test_api_ics_formdata_http_auth(pub, local_user, admin_user, ics_data):
pub.load_site_options()
pub.site_options.add_section('api-http-auth-ics')
pub.site_options.set('api-http-auth-ics', 'user', 'password')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
# check access is denied if the user has not the appropriate role
resp = app.get('/api/forms/test/ics/foobar?email=%s' % local_user.email, status=403)

View File

@ -42,12 +42,13 @@ def pub(request, emails):
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''\
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''\
[api-secrets]
coucou = 1234
'''
)
)
return pub

View File

@ -35,12 +35,13 @@ def pub(request, emails):
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''\
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''\
[api-secrets]
coucou = 1234
'''
)
)
return pub

View File

@ -33,12 +33,13 @@ def pub(request, emails):
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''\
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''\
[api-secrets]
coucou = 1234
'''
)
)
return pub

View File

@ -5527,11 +5527,12 @@ def test_backoffice_formdata_named_wscall(http_requests, pub):
def test_backoffice_session_var(pub):
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''[options]
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''[options]
query_string_allowed_vars = foo,bar
'''
)
)
user = create_user(pub)
create_environment(pub)

View File

@ -481,7 +481,8 @@ def test_backoffice_ods(pub):
assert resp.body[:2] == b'PK' # ods has a zip container
with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
ods_sheet = ET.parse(zipf.open('content.xml'))
with zipf.open('content.xml') as fd:
ods_sheet = ET.parse(fd)
# check the ods contains a link to the document
elem = ods_sheet.findall('.//{%s}a' % ods.NS['text'])[0]
assert (

View File

@ -464,7 +464,8 @@ def test_form_cancelurl(pub):
pub.site_options.add_section('api-secrets')
pub.site_options.set('api-secrets', 'example.org', 'xyz')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = get_app(pub).get('/test/?cancelurl=http://example.org/plop/')
resp = resp.form.submit('cancel')
assert resp.location == 'http://example.org/plop/'
@ -472,15 +473,18 @@ def test_form_cancelurl(pub):
pub.site_options.remove_section('api-secrets')
if not pub.site_options.has_section('options'):
pub.site_options.add_section('options')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
get_app(pub).get('/test/?cancelurl=http://example.org/plop/', status=400)
pub.site_options.set('options', 'relatable-hosts', 'example.com')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
get_app(pub).get('/test/?cancelurl=http://example.org/plop/', status=400)
pub.site_options.set('options', 'relatable-hosts', 'example.com, example.org')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
resp = get_app(pub).get('/test/?cancelurl=http://example.org/plop/')
resp = resp.form.submit('cancel')
assert resp.location == 'http://example.org/plop/'
@ -3006,11 +3010,12 @@ def test_form_page_session_var_prefill(pub):
assert resp.forms[0]['f0'].value == ''
# check it works
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''[options]
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''[options]
query_string_allowed_vars = foo,bar
'''
)
)
resp = get_app(pub).get('/?session_var_foo=hello')
assert urllib.parse.urlparse(resp.location).path == '/'
@ -3521,9 +3526,8 @@ def test_form_file_field_image_submit(pub):
formdef.store()
formdef.data_class().wipe()
image_content = open(
os.path.join(os.path.dirname(__file__), '..', 'image-with-gps-data.jpeg'), 'rb'
).read()
with open(os.path.join(os.path.dirname(__file__), '..', 'image-with-gps-data.jpeg'), 'rb') as fd:
image_content = fd.read()
upload = Upload('test.jpg', image_content, 'image/jpeg')
app = get_app(pub)
@ -3584,9 +3588,8 @@ def test_form_file_field_submit_document_type(pub):
resp = resp.form.submit('submit')
assert 'invalid file type' in resp.text
image_content = open(
os.path.join(os.path.dirname(__file__), '..', 'image-with-gps-data.jpeg'), 'rb'
).read()
with open(os.path.join(os.path.dirname(__file__), '..', 'image-with-gps-data.jpeg'), 'rb') as fd:
image_content = fd.read()
upload = Upload('test.jpg', image_content, 'image/jpeg')
resp = get_app(pub).get('/test/')
resp.form['f0$file'] = upload
@ -5985,12 +5988,13 @@ def test_item_field_autocomplete_json_source(http_requests, pub, error_email, em
data_source.data_source = {'type': 'json', 'value': 'http://remote.example.net/json'}
data_source.store()
formdef.data_class().wipe()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''\
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''\
[wscall-secrets]
remote.example.net = 1234
'''
)
)
app = get_app(pub)
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:

View File

@ -27,12 +27,13 @@ def pub(request):
pub.set_app_dir(req)
pub._set_request(req)
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''
[debug]
logger=true
'''
)
)
pub.load_site_options()

View File

@ -268,10 +268,13 @@ def test_internal_identifier_migration(pub):
formdef.fields = []
formdef.store()
obj = pickle.load(open(formdef.get_object_filename(), 'rb'))
with open(formdef.get_object_filename(), 'rb') as fd:
obj = pickle.load(fd)
del obj.internal_identifier
pickle.dump(obj, open(formdef.get_object_filename(), 'wb'))
assert pickle.load(open(formdef.get_object_filename(), 'rb')).internal_identifier is None
with open(formdef.get_object_filename(), 'wb') as fd:
pickle.dump(obj, fd)
with open(formdef.get_object_filename(), 'rb') as fd:
assert pickle.load(fd).internal_identifier is None
assert FormDef.get(formdef.id, ignore_migration=True).internal_identifier is None
formdef = FormDef.get(formdef.id)
@ -611,10 +614,12 @@ def test_pickle_2to3_conversion(pub):
return obj
formdef.__dict__ = deep_str2bytes(formdef.__dict__)
pickle.dump(formdef, open(formdef_filename, 'wb'), protocol=2)
with open(formdef_filename, 'wb') as fd:
pickle.dump(formdef, fd, protocol=2)
workflow.__dict__ = deep_str2bytes(workflow.__dict__)
pickle.dump(workflow, open(workflow_filename, 'wb'), protocol=2)
with open(workflow_filename, 'wb') as fd:
pickle.dump(workflow, fd, protocol=2)
formdef = FormDef.get(formdef_id)
assert formdef.fields[0].label == 'Test'

View File

@ -391,16 +391,19 @@ def test_deploy():
# update
cleanup()
pub_cfg = pickle.load(open(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'config.pck'), 'rb'))
with open(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'config.pck'), 'rb') as fd:
pub_cfg = pickle.load(fd)
assert pub_cfg['language'] == {'language': 'fr'}
del pub_cfg['language']
pickle.dump(pub_cfg, open(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'config.pck'), 'wb'))
with open(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'config.pck'), 'wb') as fd:
pickle.dump(pub_cfg, fd)
hobo_cmd.execute(
base_options,
sub_options,
['http://wcs.example.net/', os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'hobo.json')],
)
pub_cfg = pickle.load(open(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'config.pck'), 'rb'))
with open(os.path.join(alt_tempdir, 'tenants', 'wcs.example.net', 'config.pck'), 'rb') as fd:
pub_cfg = pickle.load(fd)
assert pub_cfg['language'] == {'language': 'fr'}

View File

@ -153,7 +153,8 @@ def test_finish_interrupted_request():
def test_get_tenants():
pub = create_temporary_pub()
open(os.path.join(pub.APP_DIR, 'xxx'), 'w').close()
with open(os.path.join(pub.APP_DIR, 'xxx'), 'w'):
pass # create empty file
os.mkdir(os.path.join(pub.APP_DIR, 'plop.invalid'))
hostnames = [x.hostname for x in pub.__class__.get_tenants()]
assert 'example.net' in hostnames

View File

@ -40,10 +40,13 @@ def test_migrate():
get_publisher().role_class.wipe()
role = get_publisher().role_class(name='Hello world')
role.store()
obj = pickle.load(open(role.get_object_filename(), 'rb'))
with open(role.get_object_filename(), 'rb') as fd:
obj = pickle.load(fd)
del obj.slug
pickle.dump(obj, open(role.get_object_filename(), 'wb'))
assert pickle.load(open(role.get_object_filename(), 'rb')).slug is None
with open(role.get_object_filename(), 'wb') as fd:
pickle.dump(obj, fd)
with open(role.get_object_filename(), 'rb') as fd:
assert pickle.load(fd).slug is None
assert get_publisher().role_class.get(role.id).slug == 'hello-world'

View File

@ -30,7 +30,8 @@ def pub():
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.site_options.set('options', 'working_day_calendar', '')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
return pub
@ -820,7 +821,8 @@ def test_is_working_day_settings(settings, pub):
assert t.render({'value': '2020-07-15'}) == 'True'
pub.site_options.set('options', 'working_day_calendar', 'foobar')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
t = Template('{{ value|is_working_day }}')
assert t.render({'value': '2020-07-15'}) == 'False'
t = Template('{{ value|is_working_day_with_saturday }}')
@ -828,7 +830,8 @@ def test_is_working_day_settings(settings, pub):
settings.WORKING_DAY_CALENDAR = 'foobar'
pub.site_options.set('options', 'working_day_calendar', 'workalendar.europe.France')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
t = Template('{{ value|is_working_day }}')
assert t.render({'value': '2020-07-15'}) == 'True'
t = Template('{{ value|is_working_day_with_saturday }}')
@ -906,7 +909,8 @@ def test_add_working_days_settings(settings, pub):
assert t.render({'value': '2020-07-13'}) == '2020-07-15'
pub.site_options.set('options', 'working_day_calendar', 'foobar')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
t = Template('{{ value|add_working_days:1 }}')
assert t.render({'value': '2020-07-13'}) == ''
t = Template('{{ value|add_working_days_with_saturday:1 }}')
@ -914,7 +918,8 @@ def test_add_working_days_settings(settings, pub):
settings.WORKING_DAY_CALENDAR = 'foobar'
pub.site_options.set('options', 'working_day_calendar', 'workalendar.europe.France')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
t = Template('{{ value|add_working_days:1 }}')
assert t.render({'value': '2020-07-13'}) == '2020-07-15'
t = Template('{{ value|add_working_days_with_saturday:1 }}')
@ -991,7 +996,8 @@ def test_adjust_to_working_day_settings(settings, pub):
assert t.render({'value': '2020-07-14'}) == '2020-07-15'
pub.site_options.set('options', 'working_day_calendar', 'foobar')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
t = Template('{{ value|adjust_to_working_day }}')
assert t.render({'value': '2020-07-13'}) == ''
t = Template('{{ value|adjust_to_working_day_with_saturday }}')
@ -999,7 +1005,8 @@ def test_adjust_to_working_day_settings(settings, pub):
settings.WORKING_DAY_CALENDAR = 'foobar'
pub.site_options.set('options', 'working_day_calendar', 'workalendar.europe.France')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
t = Template('{{ value|adjust_to_working_day }}')
assert t.render({'value': '2020-07-14'}) == '2020-07-15'
t = Template('{{ value|adjust_to_working_day_with_saturday }}')
@ -1065,7 +1072,8 @@ def test_age_in_working_days_settings(settings, pub):
assert t.render({'value': '2020-07-12'}) == '2'
pub.site_options.set('options', 'working_day_calendar', 'foobar')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
t = Template('{{ value|age_in_working_days:"2020-07-15" }}')
assert t.render({'value': '2020-07-12'}) == ''
t = Template('{{ value|age_in_working_days_with_saturday:"2020-07-15" }}')
@ -1073,7 +1081,8 @@ def test_age_in_working_days_settings(settings, pub):
settings.WORKING_DAY_CALENDAR = 'foobar'
pub.site_options.set('options', 'working_day_calendar', 'workalendar.europe.France')
pub.site_options.write(open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w'))
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
pub.site_options.write(fd)
t = Template('{{ value|age_in_working_days:"2020-07-15" }}')
assert t.render({'value': '2020-07-12'}) == '2'
t = Template('{{ value|age_in_working_days_with_saturday:"2020-07-15" }}')

View File

@ -32,8 +32,9 @@ def pub(request, emails):
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
'''
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
fd.write(
'''
[storage-remote]
label = remote storage
class = wcs.qommon.upload_storage.RemoteOpaqueUploadStorage
@ -51,7 +52,7 @@ crypto.example.net = 1234
[wscall-secrets]
crypto.example.net = 1234
'''
)
)
return pub

View File

@ -3915,7 +3915,8 @@ def test_export_to_model_django_template(pub):
item.perform(formdata)
with open(formdata.evolution[0].parts[0].filename, 'rb') as fd:
new_content = zipfile.ZipFile(fd).read('content.xml')
with zipfile.ZipFile(fd) as zout:
new_content = zout.read('content.xml')
assert b'>foo-export-to-template-with-django<' in new_content
formdef.name = 'Name with a \' simple quote'
@ -3923,7 +3924,8 @@ def test_export_to_model_django_template(pub):
item.perform(formdata)
with open(formdata.evolution[0].parts[1].filename, 'rb') as fd:
new_content = zipfile.ZipFile(fd).read('content.xml')
with zipfile.ZipFile(fd) as zout:
new_content = zout.read('content.xml')
assert b'>Name with a \' simple quote<' in new_content
formdef.name = 'A <> name'
@ -3931,7 +3933,8 @@ def test_export_to_model_django_template(pub):
item.perform(formdata)
with open(formdata.evolution[0].parts[2].filename, 'rb') as fd:
new_content = zipfile.ZipFile(fd).read('content.xml')
with zipfile.ZipFile(fd) as zout:
new_content = zout.read('content.xml')
assert b'>A &lt;&gt; name<' in new_content
@ -4010,7 +4013,8 @@ def test_export_to_model_form_details_section(pub, filename):
item.perform(formdata)
with open(formdata.evolution[0].parts[0].filename, 'rb') as fd:
new_content = force_text(zipfile.ZipFile(fd).read('content.xml'))
with zipfile.ZipFile(fd) as zout:
new_content = force_text(zout.read('content.xml'))
# section content has been removed
assert 'Titre de page' not in new_content
assert 'Titre' not in new_content
@ -4033,7 +4037,8 @@ def test_export_to_model_form_details_section(pub, filename):
if filename == 'template-form-details-no-styles.odt':
with open(formdata.evolution[0].parts[0].filename, 'rb') as fd:
new_styles = force_text(zipfile.ZipFile(fd).read('styles.xml'))
with zipfile.ZipFile(fd) as zout:
new_styles = force_text(zout.read('styles.xml'))
assert 'Field_20_Label' in new_styles

View File

@ -116,7 +116,8 @@ class RootDirectory(BackofficeRootDirectory):
admin_for_all_file_path = os.path.join(get_publisher().app_dir, 'ADMIN_FOR_ALL')
if not os.path.exists(os.path.join(admin_for_all_file_path)):
return False
admin_for_all_contents = open(admin_for_all_file_path).read()
with open(admin_for_all_file_path) as fd:
admin_for_all_contents = fd.read()
if not admin_for_all_contents:
# empty file, access is granted to everybody
return True

View File

@ -76,7 +76,8 @@ class CmdCheckHobos(Command):
hobo_json_path = os.path.join(tenant.directory, 'hobo.json')
if not os.path.exists(hobo_json_path):
continue
hobo_json = json.load(open(hobo_json_path))
with open(hobo_json_path) as fd:
hobo_json = json.load(fd)
try:
me = [service for service in hobo_json['services'] if service.get('this') is True][0]
except IndexError:
@ -107,7 +108,8 @@ class CmdCheckHobos(Command):
# get environment definition from stdin
self.all_services = json.load(sys.stdin)
else:
self.all_services = json.load(open(args[1]))
with open(args[1]) as fd:
self.all_services = json.load(fd)
try:
service = [
@ -138,7 +140,8 @@ class CmdCheckHobos(Command):
if service.get('template_name'):
skeleton_filepath = os.path.join(global_app_dir, 'skeletons', service.get('template_name'))
if os.path.exists(skeleton_filepath):
pub.import_zip(open(skeleton_filepath, 'rb'))
with open(skeleton_filepath, 'rb') as fd:
pub.import_zip(fd)
new_site = True
else:
print('updating instance in', pub.app_dir)
@ -183,9 +186,8 @@ class CmdCheckHobos(Command):
if theme_id and os.path.exists(self.THEMES_DIRECTORY):
for theme_module in os.listdir(self.THEMES_DIRECTORY):
try:
themes_json = json.load(
open(os.path.join(self.THEMES_DIRECTORY, theme_module, 'themes.json'))
)
with open(os.path.join(self.THEMES_DIRECTORY, theme_module, 'themes.json')) as fd:
themes_json = json.load(fd)
except IOError:
continue
if not isinstance(themes_json, dict): # compat

View File

@ -58,7 +58,8 @@ class CmdHoboNotify(Command):
# get environment definition from stdin
return json.load(sys.stdin)
else:
return json.load(open(args[0]))
with open(args[0]) as fd:
return json.load(fd)
@classmethod
def check_valid_notification(cls, notification):

View File

@ -46,5 +46,6 @@ class Command(TenantCommand):
if if_empty and not is_empty:
return
publisher.import_zip(open(filename, 'rb'))
with open(filename, 'rb') as fd:
publisher.import_zip(fd)
publisher.cleanup()

View File

@ -81,8 +81,9 @@ class Command(TenantCommand):
def get_rows(args):
for arg in args:
for row in json.load(open(arg)):
yield row
with open(arg) as fd:
for row in json.load(fd):
yield row
def get_status_ids_accepting_trigger(workflow, trigger):

View File

@ -238,7 +238,8 @@ class WcsPublisher(QommonPublisher):
del self.cfg['sp']
self.write_cfg()
continue
open(path, 'wb').write(data)
with open(path, 'wb') as fd:
fd.write(data)
if os.path.split(f)[0] in results:
results[os.path.split(f)[0]] += 1
@ -247,7 +248,8 @@ class WcsPublisher(QommonPublisher):
for f in z.namelist():
if os.path.dirname(f) == 'blockdefs_xml' and os.path.basename(f):
blockdef = BlockDef.import_from_xml(z.open(f), include_id=True)
with z.open(f) as fd:
blockdef = BlockDef.import_from_xml(fd, include_id=True)
blockdef.store()
results['blockdefs'] += 1
@ -256,7 +258,8 @@ class WcsPublisher(QommonPublisher):
for f in z.namelist():
if os.path.dirname(f) == 'workflows_xml' and os.path.basename(f):
workflow = Workflow.import_from_xml(z.open(f), include_id=True, check_datasources=False)
with z.open(f) as fd:
workflow = Workflow.import_from_xml(fd, include_id=True, check_datasources=False)
workflow.store()
results['workflows'] += 1
@ -268,12 +271,14 @@ class WcsPublisher(QommonPublisher):
carddefs = []
for f in z.namelist():
if os.path.dirname(f) == 'formdefs_xml' and os.path.basename(f):
formdef = FormDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
with z.open(f) as fd:
formdef = FormDef.import_from_xml(fd, include_id=True, check_datasources=False)
formdef.store()
formdefs.append(formdef)
results['formdefs'] += 1
if os.path.dirname(f) == 'carddefs_xml' and os.path.basename(f):
carddef = CardDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
with z.open(f) as fd:
carddef = CardDef.import_from_xml(fd, include_id=True, check_datasources=False)
carddef.store()
carddefs.append(carddef)
results['carddefs'] += 1
@ -282,7 +287,8 @@ class WcsPublisher(QommonPublisher):
roles = []
for f in z.namelist():
if os.path.dirname(f) == 'roles_xml' and os.path.basename(f):
role = self.role_class.import_from_xml(z.open(f), include_id=True)
with z.open(f) as fd:
role = self.role_class.import_from_xml(fd, include_id=True)
role.store()
roles.append(role)
results['roles'] += 1

View File

@ -61,60 +61,67 @@ class LoggerDirectory(Directory):
r += htmltext('<tr></thead>\n')
r += htmltext('<tbody>\n')
userlabels = {}
for d in logger.parse_logstream(open(logfilename)):
if not d:
continue
with open(logfilename) as fd:
for d in logger.parse_logstream(fd):
if not d:
continue
if d.get('user_id'):
user_color_key = d['user_id']
if user_color_key == 'anonymous':
user_color_key += d['ip']
if user_color_key not in user_color_keys:
user_color_keys[user_color_key] = ''.join(
['%x' % random.randint(0xC, 0xF) for x in range(3)]
if d.get('user_id'):
user_color_key = d['user_id']
if user_color_key == 'anonymous':
user_color_key += d['ip']
if user_color_key not in user_color_keys:
user_color_keys[user_color_key] = ''.join(
['%x' % random.randint(0xC, 0xF) for x in range(3)]
)
r += htmltext('<tr class="level-%s" style="background: #%s;">') % (
d['level'].lower(),
user_color_keys[user_color_key],
)
r += htmltext('<tr class="level-%s" style="background: #%s;">') % (
d['level'].lower(),
user_color_keys[user_color_key],
)
else:
r += htmltext('<tr class="level-%s">') % d['level'].lower()
else:
r += htmltext('<tr class="level-%s">') % d['level'].lower()
if last_date != d['date']:
r += htmltext(' <td class="time">%s&nbsp;%s</td>') % (d['date'], d['hour'][:-4])
last_date = d['date']
else:
r += htmltext(' <td class="time">%s</td>') % (d['hour'][:-4])
if last_date != d['date']:
r += htmltext(' <td class="time">%s&nbsp;%s</td>') % (d['date'], d['hour'][:-4])
last_date = d['date']
else:
r += htmltext(' <td class="time">%s</td>') % (d['hour'][:-4])
user_id = d.get('user_id')
if not user_id:
userlabel = None
elif user_id == 'anonymous':
userlabel = _('Anonymous')
ip = d['ip']
r += htmltext(' <td class="userlabel"><span title="%s">%s</span></td>') % (ip, userlabel)
elif user_id == 'unlogged':
userlabel = _('Unlogged')
ip = d['ip']
r += htmltext(' <td class="userlabel"><span title="%s">%s</span></td>') % (ip, userlabel)
elif user_id == 'bot':
userlabel = _('Bot')
r += htmltext(' <td class="userlabel">%s</td>') % userlabel
else:
userlabel = userlabels.get(user_id)
if not userlabel:
try:
user = get_publisher().user_class.get(user_id)
userlabel = htmltext(user.display_name.replace(str(' '), str('&nbsp;')))
except KeyError:
userlabel = _('Unknown')
userlabels[user_id] = userlabel
r += htmltext(' <td class="userlabel">%s</td>') % userlabel
if userlabel:
r += htmltext(' <td class="message">%s</td>') % d['message']
else:
r += htmltext('<td class="message" colspan="2">%s</td>') % d['message']
r += htmltext('</tr>\n')
user_id = d.get('user_id')
if not user_id:
userlabel = None
elif user_id == 'anonymous':
userlabel = _('Anonymous')
ip = d['ip']
r += htmltext(' <td class="userlabel"><span title="%s">%s</span></td>') % (
ip,
userlabel,
)
elif user_id == 'unlogged':
userlabel = _('Unlogged')
ip = d['ip']
r += htmltext(' <td class="userlabel"><span title="%s">%s</span></td>') % (
ip,
userlabel,
)
elif user_id == 'bot':
userlabel = _('Bot')
r += htmltext(' <td class="userlabel">%s</td>') % userlabel
else:
userlabel = userlabels.get(user_id)
if not userlabel:
try:
user = get_publisher().user_class.get(user_id)
userlabel = htmltext(user.display_name.replace(str(' '), str('&nbsp;')))
except KeyError:
userlabel = _('Unknown')
userlabels[user_id] = userlabel
r += htmltext(' <td class="userlabel">%s</td>') % userlabel
if userlabel:
r += htmltext(' <td class="message">%s</td>') % d['message']
else:
r += htmltext('<td class="message" colspan="2">%s</td>') % d['message']
r += htmltext('</tr>\n')
r += htmltext('</tbody>\n')
r += htmltext('</table>\n')
return r.getvalue()
@ -139,7 +146,8 @@ class LoggerDirectory(Directory):
if len(logfiles) > 1:
options = []
for lfile in logfiles:
firstline = open(os.path.join(get_publisher().app_dir, lfile)).readline()
with open(os.path.join(get_publisher().app_dir, lfile)) as fd:
firstline = fd.readline()
d = logger.readline(firstline)
if not d:
continue
@ -173,7 +181,8 @@ class LoggerDirectory(Directory):
response = get_response()
response.set_content_type('text/x-log', 'iso-8859-1')
response.set_header('content-disposition', 'attachment; filename=%s' % logfile)
return open(logfilename).read()
with open(logfilename) as fd:
return fd.read()
class ByUserPages(Directory):
@ -199,18 +208,19 @@ class ByUserPages(Directory):
r += htmltext('<tbody>')
logfilename = str(os.path.join(get_publisher().app_dir, get_publisher().APP_NAME + '.log'))
if os.path.exists(logfilename):
for line in open(logfilename):
d = logger.readline(line)
if not d or d['user_id'] != str(self.user.id):
continue
r += htmltext('<tr>')
if last_date != d['date']:
r += htmltext(' <td class="time">%s&nbsp;%s</td>') % (d['date'], d['hour'][:-4])
last_date = d['date']
else:
r += htmltext(' <td class="time">%s</td>') % (d['hour'][:-4])
r += htmltext(' <td><a href="%s">%s</a></td>') % (d['url'], d['message'])
r += htmltext('</tr>')
with open(logfilename) as fd:
for line in fd:
d = logger.readline(line)
if not d or d['user_id'] != str(self.user.id):
continue
r += htmltext('<tr>')
if last_date != d['date']:
r += htmltext(' <td class="time">%s&nbsp;%s</td>') % (d['date'], d['hour'][:-4])
last_date = d['date']
else:
r += htmltext(' <td class="time">%s</td>') % (d['hour'][:-4])
r += htmltext(' <td><a href="%s">%s</a></td>') % (d['url'], d['message'])
r += htmltext('</tr>')
r += htmltext('</tbody>')
r += htmltext('</table>')
return r.getvalue()

View File

@ -39,7 +39,8 @@ class TextsDirectory(Directory):
if not default:
filepath = os.path.join(get_publisher().DATA_DIR, 'texts', '%s.html' % key)
if os.path.exists(filepath):
return htmltext(open(filepath).read())
with open(filepath) as fd:
return htmltext(fd.read())
return ''
text = str(default) # make sure translation is applied
@ -113,7 +114,8 @@ class TextsDirectory(Directory):
if not default_text:
filepath = os.path.join(get_publisher().DATA_DIR, 'texts', '%s.html' % text_key)
if os.path.exists(str(filepath)):
default_text = open(str(filepath)).read()
with open(str(filepath)) as fd:
default_text = fd.read()
displayed_text = texts_cfg.get(cfg_key) or default_text

View File

@ -754,7 +754,8 @@ class _FileReader(Reader):
"""Reads templates from the filesystem."""
def __init__(self, fname):
self.text = open(fname, 'rb').read()
with open(fname, 'rb') as fd:
self.text = fd.read()
self._dir = os.path.dirname(fname)
def read_other(self, relative):

View File

@ -60,7 +60,8 @@ def is_idp_managing_user_roles():
def get_file_content(filename):
try:
return open(filename, 'r').read()
with open(filename, 'r') as fd:
return fd.read()
except Exception:
return None
@ -534,9 +535,11 @@ class AdminIDPDir(Directory):
cfg_idp = get_cfg('idp', {})
get_publisher().cfg['idp'] = cfg_idp
metadata = open(metadata_pathname).read()
with open(metadata_pathname) as fd:
metadata = fd.read()
if publickey_pathname:
publickey = open(publickey_pathname).read()
with open(publickey_pathname) as fd:
publickey = fd.read()
else:
publickey = None
@ -613,7 +616,8 @@ class AdminIDPUI(Directory):
r += htmltext('<div class="form">')
r += htmltext('<h3>%s</h3>') % _('Metadata')
r += htmltext('<pre>')
metadata = open(misc.get_abs_path(self.idp['metadata'])).read()
with open(misc.get_abs_path(self.idp['metadata'])) as fd:
metadata = fd.read()
try:
t = metadata.decode(str('utf8')).encode(get_publisher().site_charset)
except Exception:

View File

@ -36,6 +36,7 @@ class PagesDirectory(Directory):
page_file = os.path.join(pages_dir, component)
if not os.path.exists(page_file):
raise errors.TraversalError()
content = open(page_file).read()
with open(page_file) as fd:
content = fd.read()
template.html_top()
return content

View File

@ -310,7 +310,9 @@ class Saml2Directory(Directory):
assertion_fn = os.path.join(assertions_dir, assertion.iD)
if os.path.exists(assertion_fn):
return error_page('Assertion replay')
open(assertion_fn, 'w').close()
with open(assertion_fn, 'w'):
# create empty file
pass
try:
if assertion.subject.subjectConfirmation.method != 'urn:oasis:names:tc:SAML:2.0:cm:bearer':
@ -781,7 +783,8 @@ class Saml2Directory(Directory):
def metadata(self):
try:
metadata = force_text(open(misc.get_abs_path(get_cfg('sp')['saml2_metadata'])).read(), 'utf-8')
with open(misc.get_abs_path(get_cfg('sp')['saml2_metadata'])) as fd:
metadata = force_text(fd.read(), 'utf-8')
except KeyError:
raise errors.TraversalError()
response = get_response()
@ -791,8 +794,8 @@ class Saml2Directory(Directory):
def public_key(self):
response = get_response()
response.set_content_type('application/octet-stream')
publickey = open(misc.get_abs_path(get_cfg('sp')['publickey'])).read()
return publickey
with open(misc.get_abs_path(get_cfg('sp')['publickey'])) as fd:
return fd.read()
def error(self):
request = get_request()

View File

@ -84,11 +84,13 @@ class Metadata:
def get_new_or_old_keys(self, signing_pem_key, encryption_pem_key):
'''Return new or earlier version of PEM keys'''
dir = self.publisher.app_dir
_dir = self.publisher.app_dir
if not signing_pem_key and self.config.get('publickey'):
signing_pem_key = open(os.path.join(dir, 'public-key.pem'), 'r').read()
with open(os.path.join(_dir, 'public-key.pem'), 'r') as fd:
signing_pem_key = fd.read()
if not encryption_pem_key and self.config.get('encryption_publickey'):
encryption_pem_key = open(os.path.join(dir, 'encryption-public-key.pem'), 'r').read()
with open(os.path.join(_dir, 'encryption-public-key.pem'), 'r') as fd:
encryption_pem_key = fd.read()
return (signing_pem_key, encryption_pem_key)
def get_spsso_descriptor(self, signing_pem_key, encryption_pem_key, endpoints):

View File

@ -176,7 +176,9 @@ class Session(QommonSession, CaptchaSession, StorableObject):
def create_form_token(self):
token = super().create_form_token()
open(self.get_form_token_filepath(token), 'wb').close()
with open(self.get_form_token_filepath(token), 'wb'):
# create empty file
pass
return token
def has_form_token(self, token):
@ -308,7 +310,8 @@ class Session(QommonSession, CaptchaSession, StorableObject):
filename = os.path.join(dirname, value + '.json')
if not os.path.exists(filename):
return None
return misc.json_loads(open(filename).read())
with open(filename) as fd:
return misc.json_loads(fd.read())
def get_tempfile_path(self, token):
temp = self.get_tempfile(token)

View File

@ -221,21 +221,25 @@ def get_default_ezt_template():
get_publisher().app_dir, 'themes', current_theme, 'template.%s.ezt' % get_publisher().APP_NAME
)
if os.path.exists(filename):
return open(filename).read()
with open(filename) as fd:
return fd.read()
filename = os.path.join(
get_publisher().data_dir, 'themes', current_theme, 'template.%s.ezt' % get_publisher().APP_NAME
)
if os.path.exists(filename):
return open(filename).read()
with open(filename) as fd:
return fd.read()
filename = os.path.join(get_publisher().app_dir, 'themes', current_theme, 'template.ezt')
if os.path.exists(filename):
return open(filename).read()
with open(filename) as fd:
return fd.read()
filename = os.path.join(get_publisher().data_dir, 'themes', current_theme, 'template.ezt')
if os.path.exists(filename):
return open(filename).read()
with open(filename) as fd:
return fd.read()
return DEFAULT_TEMPLATE_EZT
@ -380,7 +384,8 @@ def decorate(body, response):
for dname in possible_dirnames:
filename = os.path.join(dname, fname)
if os.path.exists(filename):
template_ezt = open(filename).read()
with open(filename) as fd:
template_ezt = fd.read()
break
else:
continue