misc: fix consider-using-with pylint error (#56288)

This commit is contained in:
Lauréline Guérin 2021-08-20 17:20:42 +02:00
parent ea10ae6b98
commit fcf2778e99
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
10 changed files with 120 additions and 101 deletions

View File

@ -66,13 +66,11 @@ def tar_assets_files(tar):
def import_assets(fd, overwrite=False):
tar = tarfile.open(mode='r', fileobj=fd)
data = untar_assets_files(tar, overwrite=overwrite)
with tarfile.open(mode='r', fileobj=fd) as tar:
data = untar_assets_files(tar, overwrite=overwrite)
Asset.load_serialized_objects(data.get('assets') or [])
tar.close()
def export_assets(fd):
tar = tarfile.open(mode='w', fileobj=fd)
tar_assets_files(tar)
tar.close()
with tarfile.open(mode='w', fileobj=fd) as tar:
tar_assets_files(tar)

View File

@ -37,17 +37,18 @@ class Command(BaseCommand):
def handle(self, *args, **options):
if options['format_json']:
if options['output'] and options['output'] != '-':
output = open(options['output'], 'w')
with open(options['output'], 'w') as output:
json.dump(export_site(), output, indent=2)
else:
output = sys.stdout
json.dump(export_site(), output, indent=2)
else:
if options['output'] and options['output'] != '-':
try:
output = open(options['output'], 'wb')
except OSError as e:
raise CommandError(e)
export_site_tar(output)
output.close()
else:
raise CommandError(_('TAR format require output filename parameter'))
json.dump(export_site(), sys.stdout, indent=2)
return
if options['output'] and options['output'] != '-':
try:
with open(options['output'], 'wb') as output:
export_site_tar(output)
except OSError as e:
raise CommandError(e)
return
raise CommandError(_('TAR format require output filename parameter'))

View File

@ -36,32 +36,36 @@ class Command(BaseCommand):
parser.add_argument('--overwrite', action='store_true', default=False, help='Overwrite asset files')
def handle(self, filename, *args, **options):
def _import_site_json(fd):
import_site(json.load(fd), if_empty=options['if_empty'], clean=options['clean'])
def _import_site_tar(fd):
import_site_tar(
fd,
if_empty=options['if_empty'],
clean=options['clean'],
overwrite=options['overwrite'],
)
with contextlib.ExitStack() as stack:
if filename == '-':
format = 'json'
fd = sys.stdin
else:
try:
if filename == '-':
_import_site_json(sys.stdin)
return
try:
fd = stack.enter_context(open(filename, 'rb'))
with open(filename, 'rb') as _f:
fd = stack.enter_context(_f)
try:
with tarfile.open(mode='r', fileobj=fd):
pass
except tarfile.TarError:
with open(filename) as json_fd:
_import_site_json(json_fd)
else:
with open(filename, 'rb') as tar_fd:
_import_site_tar(tar_fd)
except OSError as e:
raise CommandError(e)
try:
tarfile.open(mode='r', fileobj=fd)
except tarfile.TarError as e:
format = 'json'
fd = open(filename)
else:
format = 'tar'
fd = open(filename, 'rb')
try:
if format == 'json':
import_site(json.load(fd), if_empty=options['if_empty'], clean=options['clean'])
else:
import_site_tar(
fd,
if_empty=options['if_empty'],
clean=options['clean'],
overwrite=options['overwrite'],
)
except ImportSiteError as e:
raise CommandError(e)

View File

@ -139,30 +139,28 @@ def import_site(data, if_empty=False, clean=False, request=None):
def export_site_tar(fd, export_kwargs=None):
tar = tarfile.open(mode='w', fileobj=fd)
data = export_site(**(export_kwargs or {}))
del data['assets']
add_tar_content(tar, '_site.json', json.dumps(data, indent=2))
tar_assets_files(tar)
tar.close()
with tarfile.open(mode='w', fileobj=fd) as tar:
data = export_site(**(export_kwargs or {}))
del data['assets']
add_tar_content(tar, '_site.json', json.dumps(data, indent=2))
tar_assets_files(tar)
def import_site_tar(fd, if_empty=False, clean=False, overwrite=False, request=None):
tar = tarfile.open(mode='r', fileobj=fd)
try:
tarinfo = tar.getmember('_site.json')
except KeyError:
raise ImportSiteError(_('TAR file should provide _site.json file'))
with tarfile.open(mode='r', fileobj=fd) as tar:
try:
tarinfo = tar.getmember('_site.json')
except KeyError:
raise ImportSiteError(_('TAR file should provide _site.json file'))
if if_empty and (Page.objects.count() or MapLayer.objects.count()):
return
if if_empty and (Page.objects.count() or MapLayer.objects.count()):
return
if clean:
clean_assets_files()
if clean:
clean_assets_files()
json_site = tar.extractfile(tarinfo).read()
data = json.loads(json_site.decode('utf-8'))
data.update(untar_assets_files(tar, overwrite=overwrite))
pages = import_site(data, if_empty=if_empty, clean=clean, request=request)
tar.close()
json_site = tar.extractfile(tarinfo).read()
data = json.loads(json_site.decode('utf-8'))
data.update(untar_assets_files(tar, overwrite=overwrite))
pages = import_site(data, if_empty=if_empty, clean=clean, request=request)
return pages

View File

@ -113,7 +113,8 @@ class SiteImportView(FormView):
def form_valid(self, form):
fd = self.request.FILES['site_file'].file
try:
tarfile.open(mode='r', fileobj=fd)
with tarfile.open(mode='r', fileobj=fd):
pass
except tarfile.TarError:
try:
fd.seek(0)

View File

@ -381,4 +381,5 @@ local_settings_file = os.environ.get(
'COMBO_SETTINGS_FILE', os.path.join(os.path.dirname(__file__), 'local_settings.py')
)
if os.path.exists(local_settings_file):
exec(open(local_settings_file).read())
with open(local_settings_file) as fd:
exec(fd.read())

View File

@ -88,13 +88,12 @@ def test_clean_assets_files(some_assets):
def test_add_tar_content(tmpdir):
filename = os.path.join(str(tmpdir), 'file.tar')
tar = tarfile.open(filename, 'w')
add_tar_content(tar, 'foo.txt', 'bar')
tar.close()
with tarfile.open(filename, 'w') as tar:
add_tar_content(tar, 'foo.txt', 'bar')
tar = tarfile.open(filename, 'r')
tarinfo = tar.getmember('foo.txt')
assert tar.extractfile(tarinfo).read().decode('utf-8') == 'bar'
with tarfile.open(filename, 'r') as tar:
tarinfo = tar.getmember('foo.txt')
assert tar.extractfile(tarinfo).read().decode('utf-8') == 'bar'
def test_tar_untar_assets(some_assets):
@ -102,25 +101,27 @@ def test_tar_untar_assets(some_assets):
assert count_asset_files() == 2
fd = BytesIO()
tar = tarfile.open(mode='w', fileobj=fd)
tar_assets_files(tar)
tar_bytes = fd.getvalue()
tar.close()
with tarfile.open(mode='w', fileobj=fd) as tar:
tar_assets_files(tar)
tar_bytes = fd.getvalue()
path = default_storage.path('')
os.remove('%s/assets/test.png' % path)
open('%s/assets/test2.png' % path, 'w').write('foo')
with open('%s/assets/test2.png' % path, 'w') as fd:
fd.write('foo')
assert count_asset_files() == 1
Asset.objects.all().delete()
assert Asset.objects.count() == 0
fd = BytesIO(tar_bytes)
tar = tarfile.open(mode='r', fileobj=fd)
data = untar_assets_files(tar)
with tarfile.open(mode='r', fileobj=fd) as tar:
data = untar_assets_files(tar)
assert [x['fields']['key'] for x in data['assets']] == ['banner', 'favicon']
assert count_asset_files() == 2
assert open('%s/assets/test.png' % path).read() == 'test'
assert open('%s/assets/test2.png' % path).read() == 'foo'
with open('%s/assets/test.png' % path) as fd:
assert fd.read() == 'test'
with open('%s/assets/test2.png' % path) as fd:
assert fd.read() == 'foo'
clean_assets_files()
@ -128,21 +129,24 @@ def test_import_export_assets(some_assets, tmpdir):
filename = os.path.join(str(tmpdir), 'file.tar')
assert Asset.objects.count() == 2
assert count_asset_files() == 2
fd = open(filename, 'wb')
export_assets(fd)
with open(filename, 'wb') as fd:
export_assets(fd)
path = default_storage.path('')
os.remove('%s/assets/test.png' % path)
open('%s/assets/test2.png' % path, 'w').write('foo')
with open('%s/assets/test2.png' % path, 'w') as fd:
fd.write('foo')
assert count_asset_files() == 1
Asset.objects.all().delete()
assert Asset.objects.count() == 0
fd = open(filename, 'rb')
import_assets(fd, overwrite=True)
with open(filename, 'rb') as fd:
import_assets(fd, overwrite=True)
assert count_asset_files() == 2
assert open('%s/assets/test.png' % path).read() == 'test'
assert open('%s/assets/test2.png' % path).read() == 'test2'
with open('%s/assets/test.png' % path) as fd:
assert fd.read() == 'test'
with open('%s/assets/test2.png' % path) as fd:
assert fd.read() == 'test2'
clean_assets_files()
assert count_asset_files() == 0
clean_assets_files()

View File

@ -389,21 +389,24 @@ def test_import_export_tar(tmpdir, some_assets):
assert Page.objects.count() == 1
assert Asset.objects.count() == 3
Asset.objects.get(key='banner').asset.name == 'assets/test.png'
assert open('%s/assets/test.png' % default_storage.path('')).read() == 'original content'
with open('%s/assets/test.png' % default_storage.path('')) as fd:
assert fd.read() == 'original content'
populate_site()
call_command('import_site', filename, '--overwrite')
assert Page.objects.count() == 1
assert Asset.objects.count() == 3
Asset.objects.get(key='banner').asset.name == 'assets/test.png'
assert open('%s/assets/test.png' % default_storage.path('')).read() == 'test'
with open('%s/assets/test.png' % default_storage.path('')) as fd:
assert fd.read() == 'test'
populate_site()
call_command('import_site', filename, '--if-empty')
assert Page.objects.count() == 1
assert Asset.objects.count() == 2
Asset.objects.get(key='banner').asset.name == 'assets/test3.png'
assert open('%s/assets/test.png' % default_storage.path('')).read() == 'original content'
with open('%s/assets/test.png' % default_storage.path('')) as fd:
assert fd.read() == 'original content'
Asset.objects.get(key='logo').asset.name == 'assets/logo.png'
assert os.path.isfile('%s/assets/logo.png' % default_storage.path(''))
@ -412,7 +415,8 @@ def test_import_export_tar(tmpdir, some_assets):
assert Page.objects.count() == 0
assert Asset.objects.count() == 2
Asset.objects.get(key='banner').asset.name == 'assets/test.png'
assert open('%s/assets/test.png' % default_storage.path('')).read() == 'test'
with open('%s/assets/test.png' % default_storage.path('')) as fd:
assert fd.read() == 'test'
assert not Asset.objects.filter(key='logo')
assert not os.path.isfile('%s/assets/logo.png' % default_storage.path(''))
@ -426,7 +430,9 @@ def test_import_export_tar(tmpdir, some_assets):
with pytest.raises(CommandError, match=r'No such file or directory'):
call_command('import_site', '%s/noway/foo.tar' % tmpdir)
tarfile.open(filename, 'w').close() # empty tar file
with tarfile.open(filename, 'w'):
# empty tar file
pass
with pytest.raises(CommandError, match=r'TAR file should provide _site.json file'):
call_command('import_site', filename)

View File

@ -513,9 +513,8 @@ def test_page_edit_picture(app, admin_user):
resp = app.get('/manage/pages/%s/' % page.id)
resp = resp.click(href='.*/picture/')
resp.form['picture'] = Upload(
'black.jpeg', open(os.path.join(TESTS_DATA_DIR, 'black.jpeg'), mode='rb').read(), 'image/jpeg'
)
with open(os.path.join(TESTS_DATA_DIR, 'black.jpeg'), mode='rb') as fd:
resp.form['picture'] = Upload('black.jpeg', fd.read(), 'image/jpeg')
resp = resp.form.submit()
assert resp.location.endswith('/manage/pages/%s/' % page.id)
resp = resp.follow()
@ -932,7 +931,8 @@ def test_site_export_import_tar(app, admin_user):
cell.save()
Asset(key='collectivity:banner', asset=File(BytesIO(b'test'), 'test.png')).save()
path = default_storage.path('')
assert open('%s/assets/test.png' % path).read() == 'test'
with open('%s/assets/test.png' % path) as fd:
assert fd.read() == 'test'
app = login(app)
resp = app.get('/manage/')
@ -947,7 +947,8 @@ def test_site_export_import_tar(app, admin_user):
assert Page.objects.count() == 0
assert TextCell.objects.count() == 0
assert Asset.objects.filter(key='collectivity:banner').count() == 0
open('%s/assets/test.png' % path, 'w').write('foo')
with open('%s/assets/test.png' % path, 'w') as fd:
fd.write('foo')
app = login(app)
resp = app.get('/manage/')
resp = resp.click('Import Site')
@ -957,7 +958,8 @@ def test_site_export_import_tar(app, admin_user):
assert PageSnapshot.objects.all().count() == 1
assert TextCell.objects.count() == 1
assert Asset.objects.filter(key='collectivity:banner').count() == 1
assert open('%s/assets/test.png' % path).read() == 'foo'
with open('%s/assets/test.png' % path) as fd:
assert fd.read() == 'foo'
os.remove('%s/assets/test.png' % path)
app = login(app)
@ -965,7 +967,8 @@ def test_site_export_import_tar(app, admin_user):
resp = resp.click('Import Site')
resp.form['site_file'] = Upload('site-export.json', site_export, 'application/json')
resp = resp.form.submit()
assert open('%s/assets/test.png' % path).read() == 'test'
with open('%s/assets/test.png' % path) as fd:
assert fd.read() == 'test'
def test_site_export_import_missing_group(app, admin_user):
@ -1610,7 +1613,8 @@ def test_asset_management(app, admin_user):
thumbnail_filename = re.findall('src="/media/(.*thumb.*)"', resp.text)[0]
thumbnail_path = default_storage.path(thumbnail_filename)
assert os.path.exists(thumbnail_path)
thumbnail_contents = open(thumbnail_path, mode='rb').read()
with open(thumbnail_path, mode='rb') as fd:
thumbnail_contents = fd.read()
# check overwriting
resp = resp.click('Overwrite')
@ -1627,7 +1631,8 @@ def test_asset_management(app, admin_user):
resp.click('test.png')
assert re.findall('src="/media/(.*thumb.*)"', resp.text)[0] == thumbnail_filename
assert os.path.exists(thumbnail_path)
thumbnail_contents_new = open(thumbnail_path, mode='rb').read()
with open(thumbnail_path, mode='rb') as fd:
thumbnail_contents_new = fd.read()
assert thumbnail_contents_new != thumbnail_contents
# try to overwrite with a different mimetype

View File

@ -352,7 +352,8 @@ def test_import_export_management_commands():
try:
sys.stdout = StringIO()
cmd.handle(output='-', format_json=True)
assert sys.stdout.getvalue() == open(export_filename).read()
with open(export_filename) as fd:
assert sys.stdout.getvalue() == fd.read()
finally:
sys.stdout = stdout