assets: factorize import/export code (#39425)
This commit is contained in:
parent
8ae63cc075
commit
f0e0de43e5
|
@ -0,0 +1,79 @@
|
|||
# combo - content management system
|
||||
# Copyright (C) 2020 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import json
|
||||
import os
|
||||
import tarfile
|
||||
|
||||
from django.core.files.storage import default_storage
|
||||
from django.utils.six import BytesIO
|
||||
|
||||
from .models import Asset
|
||||
|
||||
|
||||
def clean_assets_files():
|
||||
media_prefix = default_storage.path('')
|
||||
for basedir, dirnames, filenames in os.walk(media_prefix):
|
||||
for filename in filenames:
|
||||
os.remove('%s/%s' % (basedir, filename))
|
||||
|
||||
|
||||
def add_tar_content(tar, filename, content):
|
||||
file = tarfile.TarInfo(filename)
|
||||
fd = BytesIO()
|
||||
fd.write(content.encode('utf-8'))
|
||||
file.size = fd.tell()
|
||||
fd.seek(0)
|
||||
tar.addfile(file, fileobj=fd)
|
||||
fd.close()
|
||||
|
||||
|
||||
def untar_assets_files(tar, overwrite=False):
|
||||
media_prefix = default_storage.path('')
|
||||
data = {}
|
||||
for tarinfo in tar.getmembers():
|
||||
filepath = default_storage.path(tarinfo.name)
|
||||
if not overwrite and os.path.exists(filepath):
|
||||
continue
|
||||
if tarinfo.name == '_assets.json':
|
||||
json_assets = tar.extractfile(tarinfo).read()
|
||||
data = json.loads(json_assets.decode('utf-8'))
|
||||
elif tarinfo.name != '_site.json':
|
||||
tar.extract(tarinfo, path=media_prefix)
|
||||
return data
|
||||
|
||||
|
||||
def tar_assets_files(tar):
|
||||
media_prefix = default_storage.path('')
|
||||
for basedir, dirnames, filenames in os.walk(media_prefix):
|
||||
for filename in filenames:
|
||||
tar.add(os.path.join(basedir, filename),
|
||||
os.path.join(basedir, filename)[len(media_prefix):])
|
||||
export = {'assets': Asset.export_all_for_json()}
|
||||
add_tar_content(tar, '_assets.json', json.dumps(export, indent=2))
|
||||
|
||||
|
||||
def import_assets(fd, overwrite=False):
|
||||
tar = tarfile.open(mode='r', fileobj=fd)
|
||||
data = untar_assets_files(tar, overwrite=overwrite)
|
||||
Asset.load_serialized_objects(data.get('assets') or [])
|
||||
tar.close()
|
||||
|
||||
|
||||
def export_assets(fd):
|
||||
tar = tarfile.open(mode='w', fileobj=fd)
|
||||
tar_assets_files(tar)
|
||||
tar.close()
|
|
@ -34,7 +34,7 @@ import ckeditor
|
|||
from sorl.thumbnail.shortcuts import get_thumbnail
|
||||
|
||||
from combo.data.models import CellBase
|
||||
from combo.data.utils import import_site
|
||||
from combo.apps.assets.utils import import_assets, export_assets
|
||||
|
||||
from .forms import AssetUploadForm, AssetsImportForm
|
||||
from .models import Asset
|
||||
|
@ -338,20 +338,10 @@ class AssetsImport(FormView):
|
|||
def form_valid(self, form):
|
||||
overwrite = form.cleaned_data.get('overwrite')
|
||||
try:
|
||||
assets = tarfile.open(fileobj=form.cleaned_data['assets_file'])
|
||||
import_assets(form.cleaned_data['assets_file'], overwrite)
|
||||
except tarfile.TarError:
|
||||
messages.error(self.request, _('The assets file is not valid.'))
|
||||
return super(AssetsImport, self).form_valid(form)
|
||||
media_prefix = default_storage.path('')
|
||||
for tarinfo in assets.getmembers():
|
||||
filepath = default_storage.path(tarinfo.name)
|
||||
if not overwrite and os.path.exists(filepath):
|
||||
continue
|
||||
if tarinfo.name == '_assets.json':
|
||||
json_assets = assets.extractfile(tarinfo).read()
|
||||
import_site(json.loads(json_assets.decode('utf-8')))
|
||||
else:
|
||||
assets.extract(tarinfo, path=media_prefix)
|
||||
messages.success(self.request, _('The assets file has been imported.'))
|
||||
return super(AssetsImport, self).form_valid(form)
|
||||
|
||||
|
@ -360,22 +350,7 @@ assets_import = AssetsImport.as_view()
|
|||
|
||||
def assets_export(request, *args, **kwargs):
|
||||
fd = BytesIO()
|
||||
assets_file = tarfile.open('assets.tar', 'w', fileobj=fd)
|
||||
media_prefix = default_storage.path('')
|
||||
for basedir, dirnames, filenames in os.walk(media_prefix):
|
||||
for filename in filenames:
|
||||
assets_file.add(
|
||||
os.path.join(basedir, filename),
|
||||
os.path.join(basedir, filename)[len(media_prefix):])
|
||||
if Asset.objects.exists():
|
||||
json_file = tarfile.TarInfo('_assets.json')
|
||||
json_fd = BytesIO()
|
||||
export = {'assets': Asset.export_all_for_json(),}
|
||||
json_fd.write(json.dumps(export).encode('utf-8'))
|
||||
json_file.size = json_fd.tell()
|
||||
json_fd.seek(0)
|
||||
assets_file.addfile(json_file, fileobj=json_fd)
|
||||
assets_file.close()
|
||||
export_assets(fd)
|
||||
return HttpResponse(fd.getvalue(), content_type='application/x-tar')
|
||||
|
||||
|
||||
|
|
|
@ -1,15 +1,42 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import base64
|
||||
import os
|
||||
import tarfile
|
||||
|
||||
from django.urls import reverse
|
||||
from django.core.files.storage import default_storage
|
||||
from django.core.files import File
|
||||
from django.utils.six import BytesIO
|
||||
|
||||
import pytest
|
||||
|
||||
from combo.apps.assets.models import Asset
|
||||
from combo.apps.assets.utils import add_tar_content
|
||||
from combo.apps.assets.utils import clean_assets_files
|
||||
from combo.apps.assets.utils import export_assets
|
||||
from combo.apps.assets.utils import import_assets
|
||||
from combo.apps.assets.utils import untar_assets_files
|
||||
from combo.apps.assets.utils import tar_assets_files
|
||||
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def some_assets():
|
||||
Asset(key='banner', asset=File(BytesIO(b'test'), 'test.png')).save()
|
||||
Asset(key='favicon', asset=File(BytesIO(b'test2'), 'test2.png')).save()
|
||||
|
||||
|
||||
def count_asset_files():
|
||||
nb_assets = 0
|
||||
media_prefix = default_storage.path('')
|
||||
for basedir, dirnames, filenames in os.walk(media_prefix):
|
||||
nb_assets += len(filenames)
|
||||
return nb_assets
|
||||
|
||||
|
||||
def test_asset_set_api(app, john_doe):
|
||||
app.authorization = ('Basic', (john_doe.username, john_doe.username))
|
||||
resp = app.post_json(reverse('api-assets-set', kwargs={'key': 'plop'}), params={
|
||||
|
@ -42,3 +69,72 @@ def test_asset_set_api(app, john_doe):
|
|||
}
|
||||
}, status=400)
|
||||
assert resp.json.get('err') == 1
|
||||
clean_assets_files()
|
||||
|
||||
|
||||
def test_clean_assets_files(some_assets):
|
||||
assert count_asset_files() == 2
|
||||
clean_assets_files()
|
||||
assert count_asset_files() == 0
|
||||
|
||||
|
||||
def test_add_tar_content(tmpdir):
|
||||
filename = os.path.join(str(tmpdir), 'file.tar')
|
||||
tar = tarfile.open(filename, 'w')
|
||||
add_tar_content(tar, 'foo.txt', 'bar')
|
||||
tar.close()
|
||||
|
||||
tar = tarfile.open(filename, 'r')
|
||||
tarinfo = tar.getmember('foo.txt')
|
||||
assert tar.extractfile(tarinfo).read().decode('utf-8') == 'bar'
|
||||
|
||||
|
||||
def test_tar_untar_assets(some_assets):
|
||||
assert Asset.objects.count() == 2
|
||||
assert count_asset_files() == 2
|
||||
fd = BytesIO()
|
||||
|
||||
tar = tarfile.open(mode='w', fileobj=fd)
|
||||
tar_assets_files(tar)
|
||||
tar_bytes = fd.getvalue()
|
||||
tar.close()
|
||||
|
||||
path = default_storage.path('')
|
||||
os.remove('%s/assets/test.png' % path)
|
||||
open('%s/assets/test2.png' % path, 'w').write('foo')
|
||||
assert count_asset_files() == 1
|
||||
Asset.objects.all().delete()
|
||||
assert Asset.objects.count() == 0
|
||||
fd = BytesIO(tar_bytes)
|
||||
|
||||
tar = tarfile.open(mode='r', fileobj=fd)
|
||||
data = untar_assets_files(tar)
|
||||
assert [x['fields']['key'] for x in data['assets']] == ['banner', 'favicon']
|
||||
assert count_asset_files() == 2
|
||||
assert open('%s/assets/test.png' % path, 'r').read() == 'test'
|
||||
assert open('%s/assets/test2.png' % path, 'r').read() == 'foo'
|
||||
clean_assets_files()
|
||||
|
||||
|
||||
def test_import_export_assets(some_assets, tmpdir):
|
||||
filename = os.path.join(str(tmpdir), 'file.tar')
|
||||
assert Asset.objects.count() == 2
|
||||
assert count_asset_files() == 2
|
||||
fd = open(filename, 'wb')
|
||||
export_assets(fd)
|
||||
|
||||
path = default_storage.path('')
|
||||
os.remove('%s/assets/test.png' % path)
|
||||
open('%s/assets/test2.png' % path, 'w').write('foo')
|
||||
assert count_asset_files() == 1
|
||||
Asset.objects.all().delete()
|
||||
assert Asset.objects.count() == 0
|
||||
|
||||
fd = open(filename, 'rb')
|
||||
import_assets(fd, overwrite=True)
|
||||
assert count_asset_files() == 2
|
||||
assert open('%s/assets/test.png' % path, 'r').read() == 'test'
|
||||
assert open('%s/assets/test2.png' % path, 'r').read() == 'test2'
|
||||
clean_assets_files()
|
||||
assert count_asset_files() == 0
|
||||
clean_assets_files()
|
||||
|
|
Loading…
Reference in New Issue