chrono/chrono/apps/export_import/api_views.py

465 lines
18 KiB
Python

# chrono - content management system
# Copyright (C) 2016-2023 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import tarfile
from django.contrib.auth.models import Group
from django.contrib.contenttypes.models import ContentType
from django.http import Http404
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from rest_framework import permissions
from rest_framework.generics import GenericAPIView
from rest_framework.response import Response
from chrono.agendas.models import Agenda, Category, EventsType, Resource, UnavailabilityCalendar
from chrono.api.utils import APIErrorBadRequest
from chrono.apps.export_import.models import Application, ApplicationElement
from chrono.manager.utils import import_site
klasses = {
klass.application_component_type: klass
for klass in [Agenda, Category, EventsType, Resource, UnavailabilityCalendar]
}
klasses['roles'] = Group
klasses_translation = {
'agendas_categories': 'categories', # categories type is already used in wcs for FormDef Category
}
klasses_translation_reverse = {v: k for k, v in klasses_translation.items()}
compare_urls = {
'agendas': 'chrono-manager-agenda-history-compare',
'categories': 'chrono-manager-category-history-compare',
'events_types': 'chrono-manager-events-type-history-compare',
'resources': 'chrono-manager-resource-history-compare',
'unavailability_calendars': 'chrono-manager-unavailability-calendar-history-compare',
}
def get_klass_from_component_type(component_type):
try:
return klasses[component_type]
except KeyError:
raise Http404
class Index(GenericAPIView):
permission_classes = (permissions.IsAdminUser,)
def get(self, request, *args, **kwargs):
data = []
for klass in klasses.values():
if klass == Group:
data.append(
{
'id': 'roles',
'text': _('Roles'),
'singular': _('Role'),
'urls': {
'list': request.build_absolute_uri(
reverse(
'api-export-import-components-list',
kwargs={'component_type': 'roles'},
)
),
},
'minor': True,
}
)
continue
component_type = {
'id': klass.application_component_type,
'text': klass.application_label_plural,
'singular': klass.application_label_singular,
'urls': {
'list': request.build_absolute_uri(
reverse(
'api-export-import-components-list',
kwargs={'component_type': klass.application_component_type},
)
),
},
}
if klass not in [Agenda]:
component_type['minor'] = True
data.append(component_type)
return Response({'data': data})
index = Index.as_view()
def get_component_bundle_entry(request, component):
if isinstance(component, Group):
return {
'id': component.role.slug if hasattr(component, 'role') else component.id,
'text': component.name,
'type': 'roles',
'urls': {},
# include uuid in object reference, this is not used for applification API but is useful
# for authentic creating its role summary page.
'uuid': component.role.uuid if hasattr(component, 'role') else None,
}
return {
'id': str(component.slug),
'text': component.label,
'type': component.application_component_type,
'urls': {
'export': request.build_absolute_uri(
reverse(
'api-export-import-component-export',
kwargs={
'component_type': component.application_component_type,
'slug': str(component.slug),
},
)
),
'dependencies': request.build_absolute_uri(
reverse(
'api-export-import-component-dependencies',
kwargs={
'component_type': component.application_component_type,
'slug': str(component.slug),
},
)
),
'redirect': request.build_absolute_uri(
reverse(
'api-export-import-component-redirect',
kwargs={
'component_type': component.application_component_type,
'slug': str(component.slug),
},
)
),
},
}
class ListComponents(GenericAPIView):
permission_classes = (permissions.IsAdminUser,)
def get(self, request, *args, **kwargs):
klass = get_klass_from_component_type(kwargs['component_type'])
order_by = 'slug'
if klass == Group:
order_by = 'name'
response = [get_component_bundle_entry(request, x) for x in klass.objects.order_by(order_by)]
return Response({'data': response})
list_components = ListComponents.as_view()
class ExportComponent(GenericAPIView):
permission_classes = (permissions.IsAdminUser,)
def get(self, request, slug, *args, **kwargs):
klass = get_klass_from_component_type(kwargs['component_type'])
serialisation = get_object_or_404(klass, slug=slug).export_json()
return Response({'data': serialisation})
export_component = ExportComponent.as_view()
class ComponentDependencies(GenericAPIView):
permission_classes = (permissions.IsAdminUser,)
def get(self, request, slug, *args, **kwargs):
klass = get_klass_from_component_type(kwargs['component_type'])
component = get_object_or_404(klass, slug=slug)
def dependency_dict(element):
return get_component_bundle_entry(request, element)
dependencies = [dependency_dict(x) for x in component.get_dependencies() if x]
return Response({'err': 0, 'data': dependencies})
component_dependencies = ComponentDependencies.as_view()
def component_redirect(request, component_type, slug):
klass = get_klass_from_component_type(component_type)
component = get_object_or_404(klass, slug=slug)
if component_type not in klasses or component_type == 'roles':
raise Http404
if (
'compare' in request.GET
and request.GET.get('application')
and request.GET.get('version1')
and request.GET.get('version2')
):
component_type = klasses_translation.get(component_type, component_type)
return redirect(
'%s?version1=%s&version2=%s&application=%s'
% (
reverse(compare_urls[component_type], args=[component.pk]),
request.GET['version1'],
request.GET['version2'],
request.GET['application'],
)
)
if klass == Agenda:
return redirect(reverse('chrono-manager-agenda-view', kwargs={'pk': component.pk}))
if klass == Category:
return redirect(reverse('chrono-manager-category-list'))
if klass == EventsType:
return redirect(reverse('chrono-manager-events-type-list'))
if klass == Resource:
return redirect(reverse('chrono-manager-resource-view', kwargs={'pk': component.pk}))
if klass == UnavailabilityCalendar:
return redirect(reverse('chrono-manager-unavailability-calendar-view', kwargs={'pk': component.pk}))
raise Http404
class BundleCheck(GenericAPIView):
permission_classes = (permissions.IsAdminUser,)
def post(self, request, *args, **kwargs):
bundle = request.FILES['bundle']
try:
with tarfile.open(fileobj=bundle) as tar:
try:
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
except KeyError:
raise APIErrorBadRequest(_('Invalid tar file, missing manifest'))
application_slug = manifest.get('slug')
application_version = manifest.get('version_number')
if not application_slug or not application_version:
return Response({'data': {}})
differences = []
unknown_elements = []
no_history_elements = []
legacy_elements = []
content_types = ContentType.objects.get_for_models(
*[v for k, v in klasses.items() if k != 'roles']
)
for element in manifest.get('elements'):
component_type = element['type']
if component_type not in klasses or component_type == 'roles':
continue
klass = klasses[component_type]
component_type = klasses_translation.get(component_type, component_type)
try:
component = klass.objects.get(slug=element['slug'])
except klass.DoesNotExist:
unknown_elements.append(
{
'type': component_type,
'slug': element['slug'],
}
)
continue
elements_qs = ApplicationElement.objects.filter(
application__slug=application_slug,
content_type=content_types[klass],
object_id=component.pk,
)
if not elements_qs.exists():
# object exists, but not linked to the application
legacy_elements.append(
{
'type': component.application_component_type,
'slug': str(component.slug),
# information needed here, Relation objects may not exist yet in hobo
'text': component.label,
'url': reverse(
'api-export-import-component-redirect',
kwargs={
'slug': str(component.slug),
'component_type': component.application_component_type,
},
),
}
)
continue
snapshot_for_app = (
klass.get_snapshot_model()
.objects.filter(
instance=component,
application_slug=application_slug,
application_version=application_version,
)
.order_by('timestamp')
.last()
)
if not snapshot_for_app:
# no snapshot for this bundle
no_history_elements.append(
{
'type': element['type'],
'slug': element['slug'],
}
)
continue
last_snapshot = (
klass.get_snapshot_model().objects.filter(instance=component).latest('timestamp')
)
if snapshot_for_app.pk != last_snapshot.pk:
differences.append(
{
'type': element['type'],
'slug': element['slug'],
'url': '%s?version1=%s&version2=%s'
% (
request.build_absolute_uri(
reverse(compare_urls[component_type], args=[component.pk])
),
snapshot_for_app.pk,
last_snapshot.pk,
),
}
)
except tarfile.TarError:
raise APIErrorBadRequest(_('Invalid tar file'))
return Response(
{
'data': {
'differences': differences,
'unknown_elements': unknown_elements,
'no_history_elements': no_history_elements,
'legacy_elements': legacy_elements,
}
}
)
bundle_check = BundleCheck.as_view()
class BundleImport(GenericAPIView):
permission_classes = (permissions.IsAdminUser,)
install = True
def post(self, request, *args, **kwargs):
bundle = request.FILES['bundle']
components = {}
try:
with tarfile.open(fileobj=bundle) as tar:
try:
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
except KeyError:
raise APIErrorBadRequest(_('Invalid tar file, missing manifest'))
self.application = Application.update_or_create_from_manifest(
manifest,
tar,
editable=not self.install,
)
for element in manifest.get('elements'):
component_type = element['type']
if component_type not in klasses or element['type'] == 'roles':
continue
component_type = klasses_translation.get(component_type, component_type)
if component_type not in components:
components[component_type] = []
try:
component_content = (
tar.extractfile('%s/%s' % (element['type'], element['slug'])).read().decode()
)
except KeyError:
raise APIErrorBadRequest(
_(
'Invalid tar file, missing component %s/%s'
% (element['type'], element['slug'])
)
)
components[component_type].append(json.loads(component_content).get('data'))
except tarfile.TarError:
raise APIErrorBadRequest(_('Invalid tar file'))
# init cache of application elements, from manifest
self.application_elements = set()
# import agendas
self.do_something(components)
# create application elements
self.link_objects(components)
# remove obsolete application elements
self.unlink_obsolete_objects()
return Response({'err': 0})
def do_something(self, components):
if components:
import_site(components)
def link_objects(self, components):
for component_type, component_list in components.items():
component_type = klasses_translation_reverse.get(component_type, component_type)
klass = klasses[component_type]
for component in component_list:
try:
existing_component = klass.objects.get(slug=component['slug'])
except klass.DoesNotExist:
pass
else:
element = ApplicationElement.update_or_create_for_object(
self.application, existing_component
)
self.application_elements.add(element.content_object)
if self.install is True:
existing_component.take_snapshot(
comment=_('Application (%s)') % self.application,
application=self.application,
)
def unlink_obsolete_objects(self):
known_elements = ApplicationElement.objects.filter(application=self.application)
for element in known_elements:
if element.content_object not in self.application_elements:
element.delete()
bundle_import = BundleImport.as_view()
class BundleDeclare(BundleImport):
install = False
def do_something(self, components):
# no installation on declare
pass
bundle_declare = BundleDeclare.as_view()
class BundleUnlink(GenericAPIView):
permission_classes = (permissions.IsAdminUser,)
def post(self, request, *args, **kwargs):
if request.POST.get('application'):
try:
application = Application.objects.get(slug=request.POST['application'])
except Application.DoesNotExist:
pass
else:
application.delete()
return Response({'err': 0})
bundle_unlink = BundleUnlink.as_view()