export_import: malformed bundle (#88130) #1264

Merged
lguerin merged 1 commits from wip/88130-application-missing-component into main 2024-03-15 14:21:52 +01:00
2 changed files with 299 additions and 137 deletions

View File

@ -7,7 +7,7 @@ import xml.etree.ElementTree as ET
import pytest
from wcs.api_export_import import klass_to_slug
from wcs.api_export_import import BundleDeclareJob, BundleImportJob, klass_to_slug
from wcs.applications import Application, ApplicationElement
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
@ -940,6 +940,50 @@ def test_export_import_bundle_import(pub):
assert formdef.disabled is True
assert formdef.workflow_roles == {'_receiver': extra_role.id}
# bad file format
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), b'garbage')
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
job = BundleImportJob.get(afterjob_url.split('/')[-2])
assert job.status == 'failed'
assert job.failure_label == 'Error: Invalid tar file.'
# missing manifest
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
foo_fd = io.BytesIO(json.dumps({'foo': 'bar'}, indent=2).encode())
tarinfo = tarfile.TarInfo('foo.json')
tarinfo.size = len(foo_fd.getvalue())
tar.addfile(tarinfo, fileobj=foo_fd)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), tar_io.getvalue())
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
job = BundleImportJob.get(afterjob_url.split('/')[-2])
assert job.status == 'failed'
assert job.failure_label == 'Error: Invalid tar file, missing manifest.'
# missing component
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
manifest_json = {
'application': 'Test',
'slug': 'test',
'elements': [{'type': 'forms', 'slug': 'foo', 'name': 'foo'}],
}
manifest_fd = io.BytesIO(json.dumps(manifest_json, indent=2).encode())
tarinfo = tarfile.TarInfo('manifest.json')
tarinfo.size = len(manifest_fd.getvalue())
tar.addfile(tarinfo, fileobj=manifest_fd)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-import/'), tar_io.getvalue())
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
job = BundleImportJob.get(afterjob_url.split('/')[-2])
assert job.status == 'failed'
assert job.failure_label == 'Error: Invalid tar file, missing component forms/foo.'
@pytest.mark.parametrize(
'category_class',
@ -1321,6 +1365,30 @@ def test_export_import_bundle_declare(pub):
== []
)
# bad file format
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-declare/'), b'garbage')
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
job = BundleDeclareJob.get(afterjob_url.split('/')[-2])
assert job.status == 'failed'
assert job.failure_label == 'Error: Invalid tar file.'
# missing manifest
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
foo_fd = io.BytesIO(json.dumps({'foo': 'bar'}, indent=2).encode())
tarinfo = tarfile.TarInfo('foo.json')
tarinfo.size = len(foo_fd.getvalue())
tar.addfile(tarinfo, fileobj=foo_fd)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-declare/'), tar_io.getvalue())
afterjob_url = resp.json['url']
resp = get_app(pub).put(sign_uri(afterjob_url))
assert resp.json['data']['status'] == 'failed'
job = BundleDeclareJob.get(afterjob_url.split('/')[-2])
assert job.status == 'failed'
assert job.failure_label == 'Error: Invalid tar file, missing manifest.'
def test_export_import_bundle_unlink(pub):
application = Application()
@ -1896,6 +1964,38 @@ def test_export_import_bundle_check(pub):
},
}
# bad file format
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-check/'), b'garbage')
assert resp.json['err_desc'] == 'Invalid tar file'
# missing manifest
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
foo_fd = io.BytesIO(json.dumps({'foo': 'bar'}, indent=2).encode())
tarinfo = tarfile.TarInfo('foo.json')
tarinfo.size = len(foo_fd.getvalue())
tar.addfile(tarinfo, fileobj=foo_fd)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-check/'), tar_io.getvalue())
assert resp.json['err']
assert resp.json['err_desc'] == 'Invalid tar file, missing manifest'
# missing component
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
manifest_json = {
'application': 'Test',
'slug': 'test',
'version_number': '42',
'elements': [{'type': 'forms', 'slug': 'foo', 'name': 'foo'}],
}
manifest_fd = io.BytesIO(json.dumps(manifest_json, indent=2).encode())
tarinfo = tarfile.TarInfo('manifest.json')
tarinfo.size = len(manifest_fd.getvalue())
tar.addfile(tarinfo, fileobj=manifest_fd)
resp = get_app(pub).put(sign_uri('/api/export-import/bundle-check/'), tar_io.getvalue())
assert resp.json['err']
assert resp.json['err_desc'] == 'Invalid tar file, missing component forms/foo'
def test_export_import_workflow_options(pub):
FormDef.wipe()

View File

@ -270,101 +270,118 @@ def object_dependencies(request, objects, slug):
@signature_required
def bundle_check(request):
tar_io = io.BytesIO(request.body)
with tarfile.open(fileobj=tar_io) as tar:
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
application_slug = manifest.get('slug')
application_version = manifest.get('version_number')
if not application_slug or not application_version:
return JsonResponse({'data': {}})
differences = []
unknown_elements = []
no_history_elements = []
legacy_elements = []
for element in manifest.get('elements'):
if element['type'] not in klasses or element['type'] == 'roles':
continue
element_klass = klasses[element['type']]
element_content = tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
tree = ET.fromstring(element_content)
if hasattr(element_klass, 'url_name'):
slug = xml_node_text(tree.find('url_name'))
else:
slug = xml_node_text(tree.find('slug'))
try:
with tarfile.open(fileobj=tar_io) as tar:
try:
obj = element_klass.get_by_slug(slug)
if obj is None:
raise KeyError
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
except KeyError:
# element not found, report this as unexisting
unknown_elements.append(
{
'type': element['type'],
'slug': element['slug'],
}
)
continue
applications = Application.select([Equal('slug', application_slug)])
legacy = False
if not applications:
legacy = True
else:
application = applications[0]
elements = ApplicationElement.select(
return JsonResponse({'err': 1, 'err_desc': _('Invalid tar file, missing manifest')})
application_slug = manifest.get('slug')
application_version = manifest.get('version_number')
if not application_slug or not application_version:
return JsonResponse({'data': {}})
differences = []
unknown_elements = []
no_history_elements = []
legacy_elements = []
for element in manifest.get('elements'):
if element['type'] not in klasses or element['type'] == 'roles':
continue
element_klass = klasses[element['type']]
try:
element_content = tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
except KeyError:
return JsonResponse(
{
'err': 1,
'err_desc': _(
'Invalid tar file, missing component %s/%s'
% (element['type'], element['slug'])
),
}
)
tree = ET.fromstring(element_content)
if hasattr(element_klass, 'url_name'):
slug = xml_node_text(tree.find('url_name'))
else:
slug = xml_node_text(tree.find('slug'))
try:
obj = element_klass.get_by_slug(slug)
if obj is None:
raise KeyError
except KeyError:
# element not found, report this as unexisting
unknown_elements.append(
{
'type': element['type'],
'slug': element['slug'],
}
)
continue
applications = Application.select([Equal('slug', application_slug)])
legacy = False
if not applications:
legacy = True
else:
application = applications[0]
elements = ApplicationElement.select(
[
Equal('application_id', application.id),
Equal('object_type', obj.xml_root_node),
Equal('object_id', obj.id),
]
)
if not elements:
legacy = True
if legacy:
# object exists, but not linked to the application
legacy_elements.append(
{
'type': element['type'],
'slug': element['slug'],
# information needed here, Relation objects may not exist yet in hobo
'text': obj.name,
'url': request.build_absolute_uri(
reverse(
'api-export-import-object-redirect',
kwargs={'objects': element['type'], 'slug': element['slug']},
)
),
}
)
continue
snapshots_for_app = get_publisher().snapshot_class.select(
[
Equal('application_id', application.id),
Equal('object_type', obj.xml_root_node),
Equal('object_id', obj.id),
]
)
if not elements:
legacy = True
if legacy:
# object exists, but not linked to the application
legacy_elements.append(
{
'type': element['type'],
'slug': element['slug'],
# information needed here, Relation objects may not exist yet in hobo
'text': obj.name,
'url': request.build_absolute_uri(
reverse(
'api-export-import-object-redirect',
kwargs={'objects': element['type'], 'slug': element['slug']},
)
),
}
)
continue
snapshots_for_app = get_publisher().snapshot_class.select(
[
Equal('object_type', obj.xml_root_node),
Equal('object_id', obj.id),
Equal('application_slug', application_slug),
Equal('application_version', application_version),
],
order_by='-timestamp',
)
if not snapshots_for_app:
# legacy, no snapshot for this bundle
no_history_elements.append(
{
'type': element['type'],
'slug': element['slug'],
}
)
continue
snapshot_for_app = snapshots_for_app[0]
last_snapshot = get_publisher().snapshot_class.select_object_history(obj)[0]
if snapshot_for_app.id != last_snapshot.id:
differences.append(
{
'type': element['type'],
'slug': element['slug'],
'url': '%shistory/compare?version1=%s&version2=%s'
% (obj.get_admin_url(), snapshot_for_app.id, last_snapshot.id),
}
Equal('application_slug', application_slug),
Equal('application_version', application_version),
],
order_by='-timestamp',
)
if not snapshots_for_app:
# legacy, no snapshot for this bundle
no_history_elements.append(
{
'type': element['type'],
'slug': element['slug'],
}
)
continue
snapshot_for_app = snapshots_for_app[0]
last_snapshot = get_publisher().snapshot_class.select_object_history(obj)[0]
if snapshot_for_app.id != last_snapshot.id:
differences.append(
{
'type': element['type'],
'slug': element['slug'],
'url': '%shistory/compare?version1=%s&version2=%s'
% (obj.get_admin_url(), snapshot_for_app.id, last_snapshot.id),
}
)
except tarfile.TarError:
return JsonResponse({'err': 1, 'err_desc': _('Invalid tar file')})
return JsonResponse(
{
@ -378,6 +395,10 @@ def bundle_check(request):
)
class BundleKeyError(Exception):
pass
class BundleImportJob(AfterJob):
def __init__(self, tar_content, **kwargs):
super().__init__(**kwargs)
@ -389,48 +410,68 @@ class BundleImportJob(AfterJob):
object_types = sorted(object_types, key=lambda a: 'categories' in a, reverse=True)
tar_io = io.BytesIO(self.tar_content)
with tarfile.open(fileobj=tar_io) as self.tar:
manifest = json.loads(self.tar.extractfile('manifest.json').read().decode())
self.application = Application.update_or_create_from_manifest(
manifest, self.tar, editable=False, install=False
)
error = None
try:
with tarfile.open(fileobj=tar_io) as self.tar:
try:
manifest = json.loads(self.tar.extractfile('manifest.json').read().decode())
except KeyError:
raise BundleKeyError(_('Invalid tar file, missing manifest.'))
self.application = Application.update_or_create_from_manifest(
manifest, self.tar, editable=False, install=False
)
# count number of actions
self.total_count = 0
self.total_count += len(
[
x
for x in manifest.get('elements')
if x.get('type') in ('forms', 'cards', 'blocks', 'workflows')
]
)
self.total_count += (
len([x for x in manifest.get('elements') if x.get('type') in object_types]) * 2
)
# count number of actions
self.total_count = 0
self.total_count += len(
[
x
for x in manifest.get('elements')
if x.get('type') in ('forms', 'cards', 'blocks', 'workflows')
]
)
self.total_count += (
len([x for x in manifest.get('elements') if x.get('type') in object_types]) * 2
)
# init cache of application elements, from imported manifest
self.application_elements = set()
# init cache of application elements, from imported manifest
self.application_elements = set()
# first pass on formdef/carddef/blockdef/workflows to create them empty
# (name and slug); so they can be found for sure in import pass
for _type in ('forms', 'cards', 'blocks', 'workflows'):
self.pre_install([x for x in manifest.get('elements') if x.get('type') == _type])
# first pass on formdef/carddef/blockdef/workflows to create them empty
# (name and slug); so they can be found for sure in import pass
for _type in ('forms', 'cards', 'blocks', 'workflows'):
self.pre_install([x for x in manifest.get('elements') if x.get('type') == _type])
# real installation pass
for _type in object_types:
self.install([x for x in manifest.get('elements') if x.get('type') == _type])
# real installation pass
for _type in object_types:
self.install([x for x in manifest.get('elements') if x.get('type') == _type])
# again, to remove [pre-install] in dependencies labels
for _type in object_types:
self.install([x for x in manifest.get('elements') if x.get('type') == _type], finalize=True)
# again, to remove [pre-install] in dependencies labels
for _type in object_types:
self.install(
[x for x in manifest.get('elements') if x.get('type') == _type], finalize=True
)
# remove obsolete application elements
self.unlink_obsolete_objects()
# remove obsolete application elements
self.unlink_obsolete_objects()
except tarfile.TarError:
error = _('Invalid tar file.')
except BundleKeyError as e:
error = str(e)
if error:
self.status = 'failed'
self.failure_label = str(_('Error: %s') % error)
def pre_install(self, elements):
for element in elements:
element_klass = klasses[element['type']]
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
try:
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
except KeyError:
raise BundleKeyError(
'Invalid tar file, missing component %s/%s.' % (element['type'], element['slug'])
)
tree = ET.fromstring(element_content)
if hasattr(element_klass, 'url_name'):
slug = xml_node_text(tree.find('url_name'))
@ -472,7 +513,12 @@ class BundleImportJob(AfterJob):
imported_positions = {}
for element in elements:
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
try:
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
except KeyError:
raise BundleKeyError(
'Invalid tar file, missing component %s/%s.' % (element['type'], element['slug'])
)
new_object = element_klass.import_from_xml_tree(
ET.fromstring(element_content), include_id=False, check_datasources=False
)
@ -587,24 +633,40 @@ class BundleDeclareJob(BundleImportJob):
object_types = [x for x in klasses if x != 'roles']
tar_io = io.BytesIO(self.tar_content)
with tarfile.open(fileobj=tar_io) as self.tar:
manifest = json.loads(self.tar.extractfile('manifest.json').read().decode())
self.application = Application.update_or_create_from_manifest(
manifest, self.tar, editable=True, install=True
)
error = None
try:
with tarfile.open(fileobj=tar_io) as self.tar:
try:
manifest = json.loads(self.tar.extractfile('manifest.json').read().decode())
except KeyError:
raise BundleKeyError(_('Invalid tar file, missing manifest.'))
else:
self.application = Application.update_or_create_from_manifest(
manifest, self.tar, editable=True, install=True
)
# count number of actions
self.total_count = len([x for x in manifest.get('elements') if x.get('type') in object_types])
# count number of actions
self.total_count = len(
[x for x in manifest.get('elements') if x.get('type') in object_types]
)
# init cache of application elements, from manifest
self.application_elements = set()
# init cache of application elements, from manifest
self.application_elements = set()
# declare elements
for type in object_types:
self.declare([x for x in manifest.get('elements') if x.get('type') == type])
# declare elements
for type in object_types:
self.declare([x for x in manifest.get('elements') if x.get('type') == type])
# remove obsolete application elements
self.unlink_obsolete_objects()
# remove obsolete application elements
self.unlink_obsolete_objects()
except tarfile.TarError:
error = _('Invalid tar file.')
except BundleKeyError as e:
error = str(e)
if error:
self.status = 'failed'
self.failure_label = str(_('Error: %s') % error)
def declare(self, elements):
for element in elements: