wcs/wcs/api_export_import.py

341 lines
12 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2021 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import io
import json
import tarfile
import xml.etree.ElementTree as ET
from django.http import Http404, HttpResponse, HttpResponseForbidden, JsonResponse
from django.shortcuts import redirect
from django.urls import reverse
from wcs.api_utils import is_url_signed
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
from wcs.categories import (
BlockCategory,
CardDefCategory,
Category,
CommentTemplateCategory,
DataSourceCategory,
MailTemplateCategory,
WorkflowCategory,
)
from wcs.comment_templates import CommentTemplate
from wcs.data_sources import NamedDataSource, StubNamedDataSource
from wcs.formdef import FormDef
from wcs.mail_templates import MailTemplate
from wcs.sql import Role
from wcs.workflows import Workflow
from wcs.wscalls import NamedWsCall
from .qommon import _
from .qommon.afterjobs import AfterJob
from .qommon.misc import indent_xml, xml_node_text
klasses = {
'blocks': BlockDef,
'blocks-categories': BlockCategory,
'cards': CardDef,
'cards-categories': CardDefCategory,
'data-sources': NamedDataSource,
'data-sources-categories': DataSourceCategory,
'forms-categories': Category,
'forms': FormDef,
'roles': Role,
'mail-templates-categories': MailTemplateCategory,
'mail-templates': MailTemplate,
'comment-templates-categories': CommentTemplateCategory,
'comment-templates': CommentTemplate,
'workflows-categories': WorkflowCategory,
'workflows': Workflow,
'wscalls': NamedWsCall,
}
klass_to_slug = {y: x for x, y in klasses.items()}
def signature_required(func):
def f(*args, **kwargs):
if not is_url_signed():
return HttpResponseForbidden()
return func(*args, **kwargs)
return f
@signature_required
def index(request):
response = [
{'id': 'forms', 'text': _('Forms'), 'singular': _('Form')},
{'id': 'cards', 'text': _('Cards'), 'singular': _('Card')},
{'id': 'workflows', 'text': _('Workflows'), 'singular': _('Workflow')},
{'id': 'blocks', 'text': _('Blocks'), 'singular': _('Block of fields'), 'minor': True},
{'id': 'data-sources', 'text': _('Data Sources'), 'singular': _('Data Source'), 'minor': True},
{'id': 'mail-templates', 'text': _('Mail Templates'), 'singular': _('Mail Template'), 'minor': True},
{
'id': 'comment-templates',
'text': _('Comment Templates'),
'singular': _('Mail Template'),
'minor': True,
},
{'id': 'wscalls', 'text': _('Webservice Calls'), 'singular': _('Webservice Call'), 'minor': True},
{
'id': 'blocks-categories',
'text': _('Categories (blocks)'),
'singular': _('Category (block)'),
'minor': True,
},
{
'id': 'cards-categories',
'text': _('Categories (cards)'),
'singular': _('Category (cards)'),
'minor': True,
},
{
'id': 'forms-categories',
'text': _('Categories (forms)'),
'singular': _('Category (forms)'),
'minor': True,
},
{
'id': 'workflows-categories',
'text': _('Categories (workflows)'),
'singular': _('Category (workflows)'),
'minor': True,
},
{
'id': 'mail-templates-categories',
'text': _('Categories (mail templates)'),
'singular': _('Category (mail templates)'),
'minor': True,
},
{
'id': 'comment-templates-categories',
'text': _('Categories (comment templates)'),
'singular': _('Category (comment templates)'),
'minor': True,
},
{
'id': 'data-sources-categories',
'text': _('Categories (data sources)'),
'singular': _('Category (data Sources)'),
'minor': True,
},
{
'id': 'roles',
'text': _('Roles'),
'singular': _('Role'),
'minor': True,
},
]
for obj in response:
obj['urls'] = {
'list': request.build_absolute_uri(
reverse('api-export-import-objects-list', kwargs={'objects': obj['id']})
),
}
return JsonResponse({'data': response})
@signature_required
def export_object_ref(request, obj):
slug = obj.slug
objects = klass_to_slug[obj.__class__]
urls = {
'export': request.build_absolute_uri(
reverse('api-export-import-object-export', kwargs={'objects': objects, 'slug': slug})
),
'dependencies': request.build_absolute_uri(
reverse('api-export-import-object-dependencies', kwargs={'objects': objects, 'slug': slug})
),
}
if objects != 'roles':
urls.update(
{
'redirect': request.build_absolute_uri(
reverse('api-export-import-object-redirect', kwargs={'objects': objects, 'slug': slug})
)
}
)
data = {
'id': slug,
'text': obj.name,
'type': objects,
'urls': urls,
}
if hasattr(obj, 'category_id'):
data['category'] = obj.category.name if (obj.category_id and obj.category) else None
return data
@signature_required
def objects_list(request, objects):
klass = klasses.get(objects)
if not klass:
raise Http404()
return JsonResponse({'data': [export_object_ref(request, x) for x in klass.select()]})
def get_object(objects, slug):
klass = klasses.get(objects)
if not klass:
raise Http404()
return klass.get_by_slug(slug, ignore_errors=True)
@signature_required
def object_export(request, objects, slug):
obj = get_object(objects, slug)
if obj is None:
raise Http404()
if hasattr(obj, 'export_for_application'):
content, content_type = obj.export_for_application()
else:
etree = obj.export_to_xml(include_id=True)
indent_xml(etree)
content = ET.tostring(etree)
content_type = 'text/xml'
return HttpResponse(content, content_type=content_type)
def object_redirect(request, objects, slug):
obj = get_object(objects, slug)
if obj is None or isinstance(obj, StubNamedDataSource) or objects == 'roles':
raise Http404()
return redirect(obj.get_admin_url())
@signature_required
def object_dependencies(request, objects, slug):
obj = get_object(objects, slug)
if obj is None:
raise Http404()
dependencies = []
if hasattr(obj, 'get_dependencies'):
for dependency in obj.get_dependencies():
if dependency is None or isinstance(dependency, StubNamedDataSource):
continue
dependencies.append(export_object_ref(request, dependency))
return JsonResponse({'data': dependencies})
class BundleImportJob(AfterJob):
def __init__(self, tar_content, **kwargs):
super().__init__(**kwargs)
self.tar_content = tar_content
def execute(self):
object_types = [x for x in klasses if x != 'roles']
# be sure categories are imported first
object_types = sorted(object_types, key=lambda a: 'categories' in a, reverse=True)
tar_io = io.BytesIO(self.tar_content)
with tarfile.open(fileobj=tar_io) as self.tar:
manifest = json.loads(self.tar.extractfile('manifest.json').read().decode())
self.app_name = manifest.get('application')
# count number of actions
self.total_count = 0
self.total_count += len(
[
x
for x in manifest.get('elements')
if x.get('type') in ('forms', 'cards', 'blocks', 'workflows')
]
)
self.total_count += len([x for x in manifest.get('elements') if x.get('type') in object_types])
# first pass on formdef/carddef/blockdef/workflows to create them empty
# (name and slug); so they can be found for sure in import pass
for type in ('forms', 'cards', 'blocks', 'workflows'):
self.pre_install([x for x in manifest.get('elements') if x.get('type') == type])
# real installation pass
for type in object_types:
self.install([x for x in manifest.get('elements') if x.get('type') == type])
def pre_install(self, elements):
for element in elements:
element_klass = klasses[element['type']]
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
tree = ET.fromstring(element_content)
if hasattr(element_klass, 'url_name'):
slug = xml_node_text(tree.find('url_name'))
else:
slug = xml_node_text(tree.find('slug'))
try:
existing_object = element_klass.get_by_slug(slug)
except KeyError:
pass
else:
if existing_object:
self.increment_count()
continue
new_object = element_klass()
new_object.slug = slug
new_object.name = '[pre-import] %s' % xml_node_text(tree.find('name'))
new_object.store(comment=_('Application (%s)') % self.app_name)
self.increment_count()
def install(self, elements):
for element in elements:
element_klass = klasses[element['type']]
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
new_object = element_klass.import_from_xml_tree(
ET.fromstring(element_content), include_id=False, check_datasources=False
)
try:
existing_object = element_klass.get_by_slug(new_object.slug)
if existing_object is None or not hasattr(existing_object, 'id'):
raise KeyError()
except KeyError:
new_object.store(comment=_('Application (%s)') % self.app_name)
self.increment_count()
continue
# replace
new_object.id = existing_object.id
if element['type'] in ('forms', 'cards'):
# keep internal references
new_object.internal_identifier = existing_object.internal_identifier
new_object.table_name = existing_object.table_name
if element['type'] in ('forms', 'cards') and not existing_object.name.startswith('[pre-import]'):
# keep some settings when updating
for attr in (
'workflow_options',
'workflow_roles',
'roles',
'required_authentication_contexts',
'backoffice_submission_roles',
'publication_date',
'expiration_date',
'disabled',
):
setattr(new_object, attr, getattr(existing_object, attr))
new_object.store(comment=_('Application (%s) update') % self.app_name)
self.increment_count()
@signature_required
def bundle_import(request):
job = BundleImportJob(tar_content=request.body)
job.store()
job.run(spool=True)
return JsonResponse({'err': 0, 'url': job.get_api_status_url()})