combo/tests/test_assets.py

160 lines
4.8 KiB
Python

import base64
import os
import tarfile
from io import BytesIO
import pytest
from django.core.files import File
from django.core.files.storage import default_storage
from django.urls import reverse
from combo.apps.assets.models import Asset
from combo.apps.assets.utils import (
add_tar_content,
clean_assets_files,
export_assets,
import_assets,
tar_assets_files,
untar_assets_files,
)
pytestmark = pytest.mark.django_db
@pytest.fixture
def some_assets():
Asset(key='banner', asset=File(BytesIO(b'test'), 'test.png')).save()
Asset(key='favicon', asset=File(BytesIO(b'test2'), 'test2.png')).save()
def count_asset_files():
nb_assets = 0
media_prefix = default_storage.path('')
for dummy, dummy, filenames in os.walk(media_prefix):
nb_assets += len(filenames)
return nb_assets
def test_asset_set_api(app, john_doe):
app.authorization = ('Basic', (john_doe.username, john_doe.username))
resp = app.post_json(
reverse('api-assets-set', kwargs={'key': 'plop'}),
params={
'asset': {
'content': base64.encodebytes(b'plop').decode('ascii'),
'content_type': 'text/plain',
'filename': 'plop.txt',
}
},
)
assert Asset.objects.get(key='plop').asset.read() == b'plop'
resp = app.post_json(
reverse('api-assets-set', kwargs={'key': 'plop'}),
params={
'asset': {
'content': base64.encodebytes(b'plop2').decode('ascii'),
'content_type': 'text/plain',
'filename': 'plop.txt',
}
},
)
assert resp.json.get('err') == 0
assert resp.json.get('url') == 'http://testserver/assets/plop'
assert Asset.objects.get(key='plop').asset.read() == b'plop2'
resp = app.post_json(reverse('api-assets-set', kwargs={'key': 'plop'}), params={}, status=400)
assert resp.json.get('err') == 1
for invalid_value in (None, 'not base 64', 'éléphant'):
resp = app.post_json(
reverse('api-assets-set', kwargs={'key': 'plop'}),
params={
'asset': {
'content': invalid_value,
'content_type': 'text/plain',
'filename': 'plop.txt',
}
},
status=400,
)
assert resp.json.get('err') == 1
clean_assets_files()
def test_clean_assets_files(some_assets):
assert count_asset_files() == 2
clean_assets_files()
assert count_asset_files() == 0
def test_add_tar_content(tmpdir):
filename = os.path.join(str(tmpdir), 'file.tar')
with tarfile.open(filename, 'w') as tar:
add_tar_content(tar, 'foo.txt', 'bar')
with tarfile.open(filename, 'r') as tar:
tarinfo = tar.getmember('foo.txt')
assert tar.extractfile(tarinfo).read().decode('utf-8') == 'bar'
def test_tar_untar_assets(some_assets):
assert Asset.objects.count() == 2
assert count_asset_files() == 2
fd = BytesIO()
with tarfile.open(mode='w', fileobj=fd) as tar:
tar_assets_files(tar)
tar_bytes = fd.getvalue()
path = default_storage.path('')
os.remove('%s/assets/test.png' % path)
with open('%s/assets/test2.png' % path, 'w') as fd:
fd.write('foo')
assert count_asset_files() == 1
Asset.objects.all().delete()
assert Asset.objects.count() == 0
fd = BytesIO(tar_bytes)
with tarfile.open(mode='r', fileobj=fd) as tar:
data = untar_assets_files(tar)
assert [x['fields']['key'] for x in data['assets']] == ['banner', 'favicon']
assert count_asset_files() == 2
with open('%s/assets/test.png' % path) as fd:
assert fd.read() == 'test'
with open('%s/assets/test2.png' % path) as fd:
assert fd.read() == 'foo'
clean_assets_files()
def test_import_export_assets(some_assets, tmpdir):
filename = os.path.join(str(tmpdir), 'file.tar')
assert Asset.objects.count() == 2
assert count_asset_files() == 2
with open(filename, 'wb') as fd:
export_assets(fd)
path = default_storage.path('')
os.remove('%s/assets/test.png' % path)
with open('%s/assets/test2.png' % path, 'w') as fd:
fd.write('foo')
assert count_asset_files() == 1
Asset.objects.all().delete()
assert Asset.objects.count() == 0
with open(filename, 'rb') as fd:
import_assets(fd, overwrite=True)
assert count_asset_files() == 2
with open('%s/assets/test.png' % path) as fd:
assert fd.read() == 'test'
with open('%s/assets/test2.png' % path) as fd:
assert fd.read() == 'test2'
clean_assets_files()
assert count_asset_files() == 0
clean_assets_files()
def test_assets_export_size_view(app, some_assets):
resp = app.get(reverse('combo-manager-assets-export-size'))
assert resp.text.split() == ['(9', 'bytes)']