wcs/wcs/api_export_import.py

713 lines
27 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2021 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import io
import json
import tarfile
import xml.etree.ElementTree as ET
from django.http import Http404, HttpResponse, HttpResponseForbidden, JsonResponse
from django.shortcuts import redirect
from django.urls import NoReverseMatch, reverse
from quixote import get_publisher, get_response
from wcs.api_utils import is_url_signed
from wcs.applications import Application, ApplicationElement
from wcs.blocks import BlockDef, BlockdefImportError
from wcs.carddef import CardDef
from wcs.categories import (
BlockCategory,
CardDefCategory,
Category,
CommentTemplateCategory,
DataSourceCategory,
MailTemplateCategory,
WorkflowCategory,
)
from wcs.comment_templates import CommentTemplate
from wcs.data_sources import NamedDataSource, NamedDataSourceImportError
from wcs.formdef import FormDef, FormdefImportError
from wcs.mail_templates import MailTemplate
from wcs.sql import Equal, Role
from wcs.workflows import Workflow, WorkflowImportError
from wcs.wscalls import NamedWsCall, NamedWsCallImportError
from .qommon import _
from .qommon.afterjobs import AfterJob
from .qommon.misc import indent_xml, xml_node_text
klasses = {
'blocks': BlockDef,
'blocks-categories': BlockCategory,
'cards': CardDef,
'cards-categories': CardDefCategory,
'data-sources': NamedDataSource,
'data-sources-categories': DataSourceCategory,
'forms-categories': Category,
'forms': FormDef,
'roles': Role,
'mail-templates-categories': MailTemplateCategory,
'mail-templates': MailTemplate,
'comment-templates-categories': CommentTemplateCategory,
'comment-templates': CommentTemplate,
'workflows-categories': WorkflowCategory,
'workflows': Workflow,
'wscalls': NamedWsCall,
}
klass_to_slug = {y: x for x, y in klasses.items()}
category_classes = [
Category,
CardDefCategory,
BlockCategory,
WorkflowCategory,
MailTemplateCategory,
CommentTemplateCategory,
DataSourceCategory,
]
def signature_required(func):
def f(*args, **kwargs):
if not is_url_signed():
return HttpResponseForbidden()
return func(*args, **kwargs)
return f
@signature_required
def index(request):
response = [
{'id': 'forms', 'text': _('Forms'), 'singular': _('Form')},
{'id': 'cards', 'text': _('Cards'), 'singular': _('Card')},
{'id': 'workflows', 'text': _('Workflows'), 'singular': _('Workflow')},
{'id': 'blocks', 'text': _('Blocks'), 'singular': _('Block of fields'), 'minor': True},
{'id': 'data-sources', 'text': _('Data Sources'), 'singular': _('Data Source'), 'minor': True},
{'id': 'mail-templates', 'text': _('Mail Templates'), 'singular': _('Mail Template'), 'minor': True},
{
'id': 'comment-templates',
'text': _('Comment Templates'),
'singular': _('Comment Template'),
'minor': True,
},
{'id': 'wscalls', 'text': _('Webservice Calls'), 'singular': _('Webservice Call'), 'minor': True},
{
'id': 'blocks-categories',
'text': _('Categories (blocks)'),
'singular': _('Category (block)'),
'minor': True,
},
{
'id': 'cards-categories',
'text': _('Categories (cards)'),
'singular': _('Category (cards)'),
'minor': True,
},
{
'id': 'forms-categories',
'text': _('Categories (forms)'),
'singular': _('Category (forms)'),
'minor': True,
},
{
'id': 'workflows-categories',
'text': _('Categories (workflows)'),
'singular': _('Category (workflows)'),
'minor': True,
},
{
'id': 'mail-templates-categories',
'text': _('Categories (mail templates)'),
'singular': _('Category (mail templates)'),
'minor': True,
},
{
'id': 'comment-templates-categories',
'text': _('Categories (comment templates)'),
'singular': _('Category (comment templates)'),
'minor': True,
},
{
'id': 'data-sources-categories',
'text': _('Categories (data sources)'),
'singular': _('Category (data Sources)'),
'minor': True,
},
{
'id': 'roles',
'text': _('Roles'),
'singular': _('Role'),
'minor': True,
},
]
for obj in response:
obj['urls'] = {
'list': request.build_absolute_uri(
reverse('api-export-import-objects-list', kwargs={'objects': obj['id']})
),
}
return JsonResponse({'data': response})
def export_object_ref(request, obj):
slug = obj.slug
objects = klass_to_slug[obj.__class__]
try:
urls = {
'export': request.build_absolute_uri(
reverse('api-export-import-object-export', kwargs={'objects': objects, 'slug': slug})
),
'dependencies': request.build_absolute_uri(
reverse('api-export-import-object-dependencies', kwargs={'objects': objects, 'slug': slug})
),
}
except NoReverseMatch:
return None
urls.update(
{
'redirect': request.build_absolute_uri(
reverse('api-export-import-object-redirect', kwargs={'objects': objects, 'slug': slug})
)
}
)
data = {
'id': slug,
'text': obj.name,
'type': objects,
'urls': urls,
}
if hasattr(obj, 'category_id'):
data['category'] = obj.category.name if (obj.category_id and obj.category) else None
if objects == 'roles':
# include uuid in object reference, this is not used for applification API but is useful
# for authentic creating its role summary page.
data['uuid'] = obj.uuid
data['urls'] = {}
return data
@signature_required
def objects_list(request, objects):
klass = klasses.get(objects)
if not klass:
raise Http404()
object_refs = [export_object_ref(request, x) for x in klass.select()]
return JsonResponse({'data': [x for x in object_refs if x]})
def get_object(objects, slug):
klass = klasses.get(objects)
if not klass:
raise Http404()
return klass.get_by_slug(slug, ignore_errors=True)
@signature_required
def object_export(request, objects, slug):
obj = get_object(objects, slug)
if obj is None:
raise Http404()
if hasattr(obj, 'export_for_application'):
content, content_type = obj.export_for_application()
else:
etree = obj.export_to_xml(include_id=True)
indent_xml(etree)
content = ET.tostring(etree)
content_type = 'text/xml'
return HttpResponse(content, content_type=content_type)
def object_redirect(request, objects, slug):
obj = get_object(objects, slug)
if obj is None or objects == 'roles':
raise Http404()
url = obj.get_admin_url()
if (
'compare' in request.GET
and request.GET.get('application')
and request.GET.get('version1')
and request.GET.get('version2')
):
url += 'history/compare?version1=%s&version2=%s&application=%s' % (
request.GET['version1'],
request.GET['version2'],
request.GET['application'],
)
return redirect(url)
@signature_required
def object_dependencies(request, objects, slug):
obj = get_object(objects, slug)
if obj is None:
raise Http404()
dependencies = []
if hasattr(obj, 'get_dependencies'):
for dependency in obj.get_dependencies():
if dependency is None:
continue
object_ref = export_object_ref(request, dependency)
if object_ref:
dependencies.append(object_ref)
return JsonResponse({'data': dependencies})
@signature_required
def bundle_check(request):
bundle = request.FILES['bundle']
try:
with tarfile.open(fileobj=bundle) as tar:
try:
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
except KeyError:
return JsonResponse({'err': 1, 'err_desc': _('Invalid tar file, missing manifest')})
application_slug = manifest.get('slug')
application_version = manifest.get('version_number')
if not application_slug or not application_version:
return JsonResponse({'data': {}})
differences = []
unknown_elements = []
no_history_elements = []
legacy_elements = []
for element in manifest.get('elements'):
if element['type'] not in klasses or element['type'] == 'roles':
continue
element_klass = klasses[element['type']]
try:
element_content = tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
except KeyError:
return JsonResponse(
{
'err': 1,
'err_desc': _(
'Invalid tar file, missing component %s/%s'
% (element['type'], element['slug'])
),
}
)
tree = ET.fromstring(element_content)
if hasattr(element_klass, 'url_name'):
slug = xml_node_text(tree.find('url_name'))
else:
slug = xml_node_text(tree.find('slug'))
try:
obj = element_klass.get_by_slug(slug)
if obj is None:
raise KeyError
except KeyError:
# element not found, report this as unexisting
unknown_elements.append(
{
'type': element['type'],
'slug': element['slug'],
}
)
continue
applications = Application.select([Equal('slug', application_slug)])
legacy = False
if not applications:
legacy = True
else:
application = applications[0]
elements = ApplicationElement.select(
[
Equal('application_id', application.id),
Equal('object_type', obj.xml_root_node),
Equal('object_id', obj.id),
]
)
if not elements:
legacy = True
if legacy:
# object exists, but not linked to the application
legacy_elements.append(
{
'type': element['type'],
'slug': element['slug'],
# information needed here, Relation objects may not exist yet in hobo
'text': obj.name,
'url': request.build_absolute_uri(
reverse(
'api-export-import-object-redirect',
kwargs={'objects': element['type'], 'slug': element['slug']},
)
),
}
)
continue
snapshots_for_app = get_publisher().snapshot_class.select(
[
Equal('object_type', obj.xml_root_node),
Equal('object_id', obj.id),
Equal('application_slug', application_slug),
Equal('application_version', application_version),
],
order_by='-timestamp',
)
if not snapshots_for_app:
# legacy, no snapshot for this bundle
no_history_elements.append(
{
'type': element['type'],
'slug': element['slug'],
}
)
continue
snapshot_for_app = snapshots_for_app[0]
last_snapshot = get_publisher().snapshot_class.select_object_history(obj)[0]
if snapshot_for_app.id != last_snapshot.id:
differences.append(
{
'type': element['type'],
'slug': element['slug'],
'url': '%shistory/compare?version1=%s&version2=%s'
% (obj.get_admin_url(), snapshot_for_app.id, last_snapshot.id),
}
)
except tarfile.TarError:
return JsonResponse({'err': 1, 'err_desc': _('Invalid tar file')})
return JsonResponse(
{
'data': {
'differences': differences,
'unknown_elements': unknown_elements,
'no_history_elements': no_history_elements,
'legacy_elements': legacy_elements,
}
}
)
class BundleKeyError(Exception):
pass
class BundleImportJob(AfterJob):
def __init__(self, tar_content, **kwargs):
super().__init__(**kwargs)
self.tar_content = tar_content
def execute(self):
object_types = [x for x in klasses if x != 'roles']
# be sure categories are imported first
object_types = sorted(object_types, key=lambda a: 'categories' in a, reverse=True)
tar_io = io.BytesIO(self.tar_content)
error = None
try:
with tarfile.open(fileobj=tar_io) as self.tar:
try:
manifest = json.loads(self.tar.extractfile('manifest.json').read().decode())
except KeyError:
raise BundleKeyError(_('Invalid tar file, missing manifest.'))
self.application = Application.update_or_create_from_manifest(
manifest, self.tar, editable=False, install=False
)
# count number of actions
self.total_count = 0
self.total_count += len(
[
x
for x in manifest.get('elements')
if x.get('type') in ('forms', 'cards', 'blocks', 'workflows')
]
)
self.total_count += (
len([x for x in manifest.get('elements') if x.get('type') in object_types]) * 2
)
# init cache of application elements, from imported manifest
self.application_elements = set()
# first pass on formdef/carddef/blockdef/workflows to create them empty
# (name and slug); so they can be found for sure in import pass
for _type in ('forms', 'cards', 'blocks', 'workflows'):
self.pre_install([x for x in manifest.get('elements') if x.get('type') == _type])
# real installation pass
for _type in object_types:
self.install([x for x in manifest.get('elements') if x.get('type') == _type])
# again, to remove [pre-install] in dependencies labels
for _type in object_types:
self.install(
[x for x in manifest.get('elements') if x.get('type') == _type], finalize=True
)
# remove obsolete application elements
self.unlink_obsolete_objects()
except (
BlockdefImportError,
FormdefImportError,
WorkflowImportError,
NamedDataSourceImportError,
NamedWsCallImportError,
) as e:
error = str(e)
except tarfile.TarError:
error = _('Invalid tar file.')
except BundleKeyError as e:
error = str(e)
if error:
self.status = 'failed'
self.failure_label = str(_('Error: %s') % error)
def pre_install(self, elements):
for element in elements:
element_klass = klasses[element['type']]
try:
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
except KeyError:
raise BundleKeyError(
'Invalid tar file, missing component %s/%s.' % (element['type'], element['slug'])
)
tree = ET.fromstring(element_content)
if hasattr(element_klass, 'url_name'):
slug = xml_node_text(tree.find('url_name'))
else:
slug = xml_node_text(tree.find('slug'))
try:
existing_object = element_klass.get_by_slug(slug)
except KeyError:
pass
else:
if existing_object:
self.increment_count()
continue
new_object = element_klass()
new_object.slug = slug
new_object.name = '[pre-import] %s' % xml_node_text(tree.find('name'))
new_object.store(comment=_('Application (%s) initial installation') % self.application.name)
self.link_object(new_object)
self.increment_count()
if get_response():
# process pre-import after jobs earlier, so there are no multiple jobs for
# the same object afterwards.
get_response().process_after_jobs()
def install(self, elements, finalize=False):
if not elements:
return
element_klass = klasses[elements[0]['type']]
if not finalize and element_klass in category_classes:
# for categories, keep positions before install
objects_by_slug = {i.slug: i for i in element_klass.select()}
initial_positions = {
i.slug: i.position if i.position is not None else 10000 for i in objects_by_slug.values()
}
imported_positions = {}
for element in elements:
try:
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
except KeyError:
raise BundleKeyError(
'Invalid tar file, missing component %s/%s.' % (element['type'], element['slug'])
)
new_object = element_klass.import_from_xml_tree(
ET.fromstring(element_content),
include_id=False,
check_datasources=False,
check_deprecated=True,
)
if not finalize and element_klass in category_classes:
# for categories, keep positions of imported objects
imported_positions[new_object.slug] = (
new_object.position if new_object.position is not None else 10000
)
try:
existing_object = element_klass.get_by_slug(new_object.slug)
if existing_object is None:
raise KeyError()
except KeyError:
new_object.store(
comment=_('Application (%s)') % self.application.name, application=self.application
)
self.link_object(new_object)
self.increment_count()
continue
# replace
new_object.id = existing_object.id
if finalize:
last_snapshot = get_publisher().snapshot_class.get_latest(
new_object.xml_root_node, new_object.id
)
if '[pre-import]' not in last_snapshot.get_serialization():
self.increment_count()
continue
if element['type'] in ('forms', 'cards'):
# keep internal references
new_object.table_name = existing_object.table_name
if element['type'] in ('forms', 'cards') and not existing_object.name.startswith('[pre-import]'):
# keep some settings when updating
for attr in (
'workflow_options',
'workflow_roles',
'roles',
'required_authentication_contexts',
'backoffice_submission_roles',
'publication_date',
'expiration_date',
'disabled',
):
setattr(new_object, attr, getattr(existing_object, attr))
comment = _('Application (%s) update')
if existing_object.name.startswith('[pre-import]'):
comment = _('Application (%s) complete initial installation')
if finalize:
comment = _('Application (%s) finalize initial installation')
new_object.store(comment=comment % self.application.name, application=self.application)
self.link_object(new_object)
self.increment_count()
# for categories, rebuild positions
if not finalize and element_klass in category_classes:
objects_by_slug = {i.slug: i for i in element_klass.select()}
# find imported objects from initials
existing_positions = {k: v for k, v in initial_positions.items() if k in imported_positions}
# find not imported objects from initials
not_imported_positions = {
k: v for k, v in initial_positions.items() if k not in imported_positions
}
# determine position of application objects
application_position = None
if existing_positions:
application_position = min(existing_positions.values())
# all objects placed before application objects
before_positions = {
k: v
for k, v in not_imported_positions.items()
if application_position is None or v < application_position
}
# all objects placed after application objects
after_positions = {
k: v
for k, v in not_imported_positions.items()
if application_position is not None and v >= application_position
}
# rebuild positions
position = 1
slugs = sorted(before_positions.keys(), key=lambda a: before_positions[a])
slugs += sorted(imported_positions.keys(), key=lambda a: imported_positions[a])
slugs += sorted(after_positions.keys(), key=lambda a: after_positions[a])
for slug in slugs:
objects_by_slug[slug].position = position
objects_by_slug[slug].store(store_snapshot=False)
position += 1
def link_object(self, obj):
element = ApplicationElement.update_or_create_for_object(self.application, obj)
self.application_elements.add((element.object_type, element.object_id))
def unlink_obsolete_objects(self):
known_elements = ApplicationElement.select([Equal('application_id', self.application.id)])
for element in known_elements:
if (element.object_type, element.object_id) not in self.application_elements:
ApplicationElement.remove_object(element.id)
@signature_required
def bundle_import(request):
job = BundleImportJob(tar_content=request.FILES['bundle'].read())
job.store()
job.run(spool=True)
return JsonResponse({'err': 0, 'url': job.get_api_status_url()})
class BundleDeclareJob(BundleImportJob):
def execute(self):
object_types = [x for x in klasses if x != 'roles']
tar_io = io.BytesIO(self.tar_content)
error = None
try:
with tarfile.open(fileobj=tar_io) as self.tar:
try:
manifest = json.loads(self.tar.extractfile('manifest.json').read().decode())
except KeyError:
raise BundleKeyError(_('Invalid tar file, missing manifest.'))
else:
self.application = Application.update_or_create_from_manifest(
manifest, self.tar, editable=True, install=True
)
# count number of actions
self.total_count = len(
[x for x in manifest.get('elements') if x.get('type') in object_types]
)
# init cache of application elements, from manifest
self.application_elements = set()
# declare elements
for type in object_types:
self.declare([x for x in manifest.get('elements') if x.get('type') == type])
# remove obsolete application elements
self.unlink_obsolete_objects()
except tarfile.TarError:
error = _('Invalid tar file.')
except BundleKeyError as e:
error = str(e)
if error:
self.status = 'failed'
self.failure_label = str(_('Error: %s') % error)
def declare(self, elements):
for element in elements:
element_klass = klasses[element['type']]
element_slug = element['slug']
existing_object = element_klass.get_by_slug(element_slug, ignore_errors=True)
if existing_object:
self.link_object(existing_object)
self.increment_count()
@signature_required
def bundle_declare(request):
job = BundleDeclareJob(tar_content=request.FILES['bundle'].read())
job.store()
job.run(spool=True)
return JsonResponse({'err': 0, 'url': job.get_api_status_url()})
@signature_required
def unlink(request):
if request.method == 'POST' and request.POST.get('application'):
applications = Application.select([Equal('slug', request.POST['application'])])
if applications:
application = applications[0]
elements = ApplicationElement.select([Equal('application_id', application.id)])
for element in elements:
ApplicationElement.remove_object(element.id)
Application.remove_object(application.id)
return JsonResponse({'err': 0})