export_import: invalid bundle (#88068)

This commit is contained in:
Lauréline Guérin 2024-03-14 09:54:48 +01:00
parent 1896c33f29
commit 2c30eec6ac
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
2 changed files with 165 additions and 103 deletions

View File

@ -29,6 +29,7 @@ from rest_framework.generics import GenericAPIView
from rest_framework.response import Response
from chrono.agendas.models import Agenda, Category, EventsType, Resource, UnavailabilityCalendar
from chrono.api.utils import APIError
from chrono.apps.export_import.models import Application, ApplicationElement
from chrono.manager.utils import import_site
@ -231,96 +232,102 @@ class BundleCheck(GenericAPIView):
def put(self, request, *args, **kwargs):
tar_io = io.BytesIO(request.read())
with tarfile.open(fileobj=tar_io) as tar:
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
application_slug = manifest.get('slug')
application_version = manifest.get('version_number')
if not application_slug or not application_version:
return Response({'data': {}})
differences = []
unknown_elements = []
no_history_elements = []
legacy_elements = []
content_types = ContentType.objects.get_for_models(
*[v for k, v in klasses.items() if k != 'roles']
)
for element in manifest.get('elements'):
component_type = element['type']
if component_type not in klasses or component_type == 'roles':
continue
klass = klasses[component_type]
component_type = klasses_translation.get(component_type, component_type)
try:
with tarfile.open(fileobj=tar_io) as tar:
try:
component = klass.objects.get(slug=element['slug'])
except klass.DoesNotExist:
unknown_elements.append(
{
'type': component_type,
'slug': element['slug'],
}
)
continue
elements_qs = ApplicationElement.objects.filter(
application__slug=application_slug,
content_type=content_types[klass],
object_id=component.pk,
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
except KeyError:
raise APIError(_('Invalid tar file, missing manifest'))
application_slug = manifest.get('slug')
application_version = manifest.get('version_number')
if not application_slug or not application_version:
return Response({'data': {}})
differences = []
unknown_elements = []
no_history_elements = []
legacy_elements = []
content_types = ContentType.objects.get_for_models(
*[v for k, v in klasses.items() if k != 'roles']
)
if not elements_qs.exists():
# object exists, but not linked to the application
legacy_elements.append(
{
'type': component.application_component_type,
'slug': str(component.slug),
# information needed here, Relation objects may not exist yet in hobo
'text': component.label,
'url': reverse(
'api-export-import-component-redirect',
kwargs={
'slug': str(component.slug),
'component_type': component.application_component_type,
},
),
}
for element in manifest.get('elements'):
component_type = element['type']
if component_type not in klasses or component_type == 'roles':
continue
klass = klasses[component_type]
component_type = klasses_translation.get(component_type, component_type)
try:
component = klass.objects.get(slug=element['slug'])
except klass.DoesNotExist:
unknown_elements.append(
{
'type': component_type,
'slug': element['slug'],
}
)
continue
elements_qs = ApplicationElement.objects.filter(
application__slug=application_slug,
content_type=content_types[klass],
object_id=component.pk,
)
continue
snapshot_for_app = (
klass.get_snapshot_model()
.objects.filter(
instance=component,
application_slug=application_slug,
application_version=application_version,
)
.order_by('timestamp')
.last()
)
if not snapshot_for_app:
# no snapshot for this bundle
no_history_elements.append(
{
'type': element['type'],
'slug': element['slug'],
}
)
continue
last_snapshot = (
klass.get_snapshot_model().objects.filter(instance=component).latest('timestamp')
)
if snapshot_for_app.pk != last_snapshot.pk:
differences.append(
{
'type': element['type'],
'slug': element['slug'],
'url': '%s?version1=%s&version2=%s'
% (
request.build_absolute_uri(
reverse(compare_urls[component_type], args=[component.pk])
if not elements_qs.exists():
# object exists, but not linked to the application
legacy_elements.append(
{
'type': component.application_component_type,
'slug': str(component.slug),
# information needed here, Relation objects may not exist yet in hobo
'text': component.label,
'url': reverse(
'api-export-import-component-redirect',
kwargs={
'slug': str(component.slug),
'component_type': component.application_component_type,
},
),
snapshot_for_app.pk,
last_snapshot.pk,
),
}
}
)
continue
snapshot_for_app = (
klass.get_snapshot_model()
.objects.filter(
instance=component,
application_slug=application_slug,
application_version=application_version,
)
.order_by('timestamp')
.last()
)
if not snapshot_for_app:
# no snapshot for this bundle
no_history_elements.append(
{
'type': element['type'],
'slug': element['slug'],
}
)
continue
last_snapshot = (
klass.get_snapshot_model().objects.filter(instance=component).latest('timestamp')
)
if snapshot_for_app.pk != last_snapshot.pk:
differences.append(
{
'type': element['type'],
'slug': element['slug'],
'url': '%s?version1=%s&version2=%s'
% (
request.build_absolute_uri(
reverse(compare_urls[component_type], args=[component.pk])
),
snapshot_for_app.pk,
last_snapshot.pk,
),
}
)
except tarfile.TarError:
raise APIError(_('Invalid tar file'))
return Response(
{
@ -344,25 +351,32 @@ class BundleImport(GenericAPIView):
def put(self, request, *args, **kwargs):
tar_io = io.BytesIO(request.read())
components = {}
with tarfile.open(fileobj=tar_io) as tar:
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
self.application = Application.update_or_create_from_manifest(
manifest,
tar,
editable=not self.install,
)
for element in manifest.get('elements'):
component_type = element['type']
if component_type not in klasses or component_type == 'roles':
continue
component_type = klasses_translation.get(component_type, component_type)
if component_type not in components:
components[component_type] = []
component_content = (
tar.extractfile('%s/%s' % (element['type'], element['slug'])).read().decode()
try:
with tarfile.open(fileobj=tar_io) as tar:
try:
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
except KeyError:
raise APIError(_('Invalid tar file, missing manifest'))
self.application = Application.update_or_create_from_manifest(
manifest,
tar,
editable=not self.install,
)
components[component_type].append(json.loads(component_content).get('data'))
for element in manifest.get('elements'):
component_type = element['type']
if component_type not in klasses or element['type'] == 'roles':
continue
component_type = klasses_translation.get(component_type, component_type)
if component_type not in components:
components[component_type] = []
component_content = (
tar.extractfile('%s/%s' % (element['type'], element['slug'])).read().decode()
)
components[component_type].append(json.loads(component_content).get('data'))
except tarfile.TarError:
raise APIError(_('Invalid tar file'))
# init cache of application elements, from manifest
self.application_elements = set()
# import agendas

View File

@ -584,6 +584,22 @@ def test_bundle_import(app, admin_user):
assert last_snapshot.application_slug == 'test'
assert last_snapshot.application_version == '42.1'
# bad file format
resp = app.put('/api/export-import/bundle-import/', b'garbage')
assert resp.json['err']
assert resp.json['err_desc'] == 'Invalid tar file'
# missing manifest
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
foo_fd = io.BytesIO(json.dumps({'foo': 'bar'}, indent=2).encode())
tarinfo = tarfile.TarInfo('foo.json')
tarinfo.size = len(foo_fd.getvalue())
tar.addfile(tarinfo, fileobj=foo_fd)
resp = app.put('/api/export-import/bundle-import/', tar_io.getvalue())
assert resp.json['err']
assert resp.json['err_desc'] == 'Invalid tar file, missing manifest'
def test_bundle_declare(app, admin_user):
app.authorization = ('Basic', ('admin', 'admin'))
@ -622,6 +638,22 @@ def test_bundle_declare(app, admin_user):
assert application.visible is True
assert ApplicationElement.objects.count() == 4 # category, events_type, unavailability_calendar, resource
# bad file format
resp = app.put('/api/export-import/bundle-declare/', b'garbage')
assert resp.json['err']
assert resp.json['err_desc'] == 'Invalid tar file'
# missing manifest
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
foo_fd = io.BytesIO(json.dumps({'foo': 'bar'}, indent=2).encode())
tarinfo = tarfile.TarInfo('foo.json')
tarinfo.size = len(foo_fd.getvalue())
tar.addfile(tarinfo, fileobj=foo_fd)
resp = app.put('/api/export-import/bundle-declare/', tar_io.getvalue())
assert resp.json['err']
assert resp.json['err_desc'] == 'Invalid tar file, missing manifest'
def test_bundle_unlink(app, admin_user, bundle):
app.authorization = ('Basic', ('admin', 'admin'))
@ -944,3 +976,19 @@ def test_bundle_check(app, admin_user):
'legacy_elements': [],
}
}
# bad file format
resp = app.put('/api/export-import/bundle-check/', b'garbage')
assert resp.json['err']
assert resp.json['err_desc'] == 'Invalid tar file'
# missing manifest
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
foo_fd = io.BytesIO(json.dumps({'foo': 'bar'}, indent=2).encode())
tarinfo = tarfile.TarInfo('foo.json')
tarinfo.size = len(foo_fd.getvalue())
tar.addfile(tarinfo, fileobj=foo_fd)
resp = app.put('/api/export-import/bundle-check/', tar_io.getvalue())
assert resp.json['err']
assert resp.json['err_desc'] == 'Invalid tar file, missing manifest'