713 lines
27 KiB
Python
713 lines
27 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2021 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import io
|
|
import json
|
|
import tarfile
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from django.http import Http404, HttpResponse, HttpResponseForbidden, JsonResponse
|
|
from django.shortcuts import redirect
|
|
from django.urls import NoReverseMatch, reverse
|
|
from quixote import get_publisher, get_response
|
|
|
|
from wcs.api_utils import is_url_signed
|
|
from wcs.applications import Application, ApplicationElement
|
|
from wcs.blocks import BlockDef, BlockdefImportError
|
|
from wcs.carddef import CardDef
|
|
from wcs.categories import (
|
|
BlockCategory,
|
|
CardDefCategory,
|
|
Category,
|
|
CommentTemplateCategory,
|
|
DataSourceCategory,
|
|
MailTemplateCategory,
|
|
WorkflowCategory,
|
|
)
|
|
from wcs.comment_templates import CommentTemplate
|
|
from wcs.data_sources import NamedDataSource, NamedDataSourceImportError
|
|
from wcs.formdef import FormDef, FormdefImportError
|
|
from wcs.mail_templates import MailTemplate
|
|
from wcs.sql import Equal, Role
|
|
from wcs.workflows import Workflow, WorkflowImportError
|
|
from wcs.wscalls import NamedWsCall, NamedWsCallImportError
|
|
|
|
from .qommon import _
|
|
from .qommon.afterjobs import AfterJob
|
|
from .qommon.misc import indent_xml, xml_node_text
|
|
|
|
klasses = {
|
|
'blocks': BlockDef,
|
|
'blocks-categories': BlockCategory,
|
|
'cards': CardDef,
|
|
'cards-categories': CardDefCategory,
|
|
'data-sources': NamedDataSource,
|
|
'data-sources-categories': DataSourceCategory,
|
|
'forms-categories': Category,
|
|
'forms': FormDef,
|
|
'roles': Role,
|
|
'mail-templates-categories': MailTemplateCategory,
|
|
'mail-templates': MailTemplate,
|
|
'comment-templates-categories': CommentTemplateCategory,
|
|
'comment-templates': CommentTemplate,
|
|
'workflows-categories': WorkflowCategory,
|
|
'workflows': Workflow,
|
|
'wscalls': NamedWsCall,
|
|
}
|
|
|
|
klass_to_slug = {y: x for x, y in klasses.items()}
|
|
|
|
category_classes = [
|
|
Category,
|
|
CardDefCategory,
|
|
BlockCategory,
|
|
WorkflowCategory,
|
|
MailTemplateCategory,
|
|
CommentTemplateCategory,
|
|
DataSourceCategory,
|
|
]
|
|
|
|
|
|
def signature_required(func):
|
|
def f(*args, **kwargs):
|
|
if not is_url_signed():
|
|
return HttpResponseForbidden()
|
|
return func(*args, **kwargs)
|
|
|
|
return f
|
|
|
|
|
|
@signature_required
|
|
def index(request):
|
|
response = [
|
|
{'id': 'forms', 'text': _('Forms'), 'singular': _('Form')},
|
|
{'id': 'cards', 'text': _('Cards'), 'singular': _('Card')},
|
|
{'id': 'workflows', 'text': _('Workflows'), 'singular': _('Workflow')},
|
|
{'id': 'blocks', 'text': _('Blocks'), 'singular': _('Block of fields'), 'minor': True},
|
|
{'id': 'data-sources', 'text': _('Data Sources'), 'singular': _('Data Source'), 'minor': True},
|
|
{'id': 'mail-templates', 'text': _('Mail Templates'), 'singular': _('Mail Template'), 'minor': True},
|
|
{
|
|
'id': 'comment-templates',
|
|
'text': _('Comment Templates'),
|
|
'singular': _('Comment Template'),
|
|
'minor': True,
|
|
},
|
|
{'id': 'wscalls', 'text': _('Webservice Calls'), 'singular': _('Webservice Call'), 'minor': True},
|
|
{
|
|
'id': 'blocks-categories',
|
|
'text': _('Categories (blocks)'),
|
|
'singular': _('Category (block)'),
|
|
'minor': True,
|
|
},
|
|
{
|
|
'id': 'cards-categories',
|
|
'text': _('Categories (cards)'),
|
|
'singular': _('Category (cards)'),
|
|
'minor': True,
|
|
},
|
|
{
|
|
'id': 'forms-categories',
|
|
'text': _('Categories (forms)'),
|
|
'singular': _('Category (forms)'),
|
|
'minor': True,
|
|
},
|
|
{
|
|
'id': 'workflows-categories',
|
|
'text': _('Categories (workflows)'),
|
|
'singular': _('Category (workflows)'),
|
|
'minor': True,
|
|
},
|
|
{
|
|
'id': 'mail-templates-categories',
|
|
'text': _('Categories (mail templates)'),
|
|
'singular': _('Category (mail templates)'),
|
|
'minor': True,
|
|
},
|
|
{
|
|
'id': 'comment-templates-categories',
|
|
'text': _('Categories (comment templates)'),
|
|
'singular': _('Category (comment templates)'),
|
|
'minor': True,
|
|
},
|
|
{
|
|
'id': 'data-sources-categories',
|
|
'text': _('Categories (data sources)'),
|
|
'singular': _('Category (data Sources)'),
|
|
'minor': True,
|
|
},
|
|
{
|
|
'id': 'roles',
|
|
'text': _('Roles'),
|
|
'singular': _('Role'),
|
|
'minor': True,
|
|
},
|
|
]
|
|
for obj in response:
|
|
obj['urls'] = {
|
|
'list': request.build_absolute_uri(
|
|
reverse('api-export-import-objects-list', kwargs={'objects': obj['id']})
|
|
),
|
|
}
|
|
return JsonResponse({'data': response})
|
|
|
|
|
|
def export_object_ref(request, obj):
|
|
slug = obj.slug
|
|
objects = klass_to_slug[obj.__class__]
|
|
try:
|
|
urls = {
|
|
'export': request.build_absolute_uri(
|
|
reverse('api-export-import-object-export', kwargs={'objects': objects, 'slug': slug})
|
|
),
|
|
'dependencies': request.build_absolute_uri(
|
|
reverse('api-export-import-object-dependencies', kwargs={'objects': objects, 'slug': slug})
|
|
),
|
|
}
|
|
except NoReverseMatch:
|
|
return None
|
|
urls.update(
|
|
{
|
|
'redirect': request.build_absolute_uri(
|
|
reverse('api-export-import-object-redirect', kwargs={'objects': objects, 'slug': slug})
|
|
)
|
|
}
|
|
)
|
|
data = {
|
|
'id': slug,
|
|
'text': obj.name,
|
|
'type': objects,
|
|
'urls': urls,
|
|
}
|
|
if hasattr(obj, 'category_id'):
|
|
data['category'] = obj.category.name if (obj.category_id and obj.category) else None
|
|
if objects == 'roles':
|
|
# include uuid in object reference, this is not used for applification API but is useful
|
|
# for authentic creating its role summary page.
|
|
data['uuid'] = obj.uuid
|
|
data['urls'] = {}
|
|
return data
|
|
|
|
|
|
@signature_required
|
|
def objects_list(request, objects):
|
|
klass = klasses.get(objects)
|
|
if not klass:
|
|
raise Http404()
|
|
object_refs = [export_object_ref(request, x) for x in klass.select()]
|
|
return JsonResponse({'data': [x for x in object_refs if x]})
|
|
|
|
|
|
def get_object(objects, slug):
|
|
klass = klasses.get(objects)
|
|
if not klass:
|
|
raise Http404()
|
|
return klass.get_by_slug(slug, ignore_errors=True)
|
|
|
|
|
|
@signature_required
|
|
def object_export(request, objects, slug):
|
|
obj = get_object(objects, slug)
|
|
if obj is None:
|
|
raise Http404()
|
|
if hasattr(obj, 'export_for_application'):
|
|
content, content_type = obj.export_for_application()
|
|
else:
|
|
etree = obj.export_to_xml(include_id=True)
|
|
indent_xml(etree)
|
|
content = ET.tostring(etree)
|
|
content_type = 'text/xml'
|
|
return HttpResponse(content, content_type=content_type)
|
|
|
|
|
|
def object_redirect(request, objects, slug):
|
|
obj = get_object(objects, slug)
|
|
if obj is None or objects == 'roles':
|
|
raise Http404()
|
|
url = obj.get_admin_url()
|
|
if (
|
|
'compare' in request.GET
|
|
and request.GET.get('application')
|
|
and request.GET.get('version1')
|
|
and request.GET.get('version2')
|
|
):
|
|
url += 'history/compare?version1=%s&version2=%s&application=%s' % (
|
|
request.GET['version1'],
|
|
request.GET['version2'],
|
|
request.GET['application'],
|
|
)
|
|
return redirect(url)
|
|
|
|
|
|
@signature_required
|
|
def object_dependencies(request, objects, slug):
|
|
obj = get_object(objects, slug)
|
|
if obj is None:
|
|
raise Http404()
|
|
dependencies = []
|
|
if hasattr(obj, 'get_dependencies'):
|
|
for dependency in obj.get_dependencies():
|
|
if dependency is None:
|
|
continue
|
|
object_ref = export_object_ref(request, dependency)
|
|
if object_ref:
|
|
dependencies.append(object_ref)
|
|
return JsonResponse({'data': dependencies})
|
|
|
|
|
|
@signature_required
|
|
def bundle_check(request):
|
|
bundle = request.FILES['bundle']
|
|
try:
|
|
with tarfile.open(fileobj=bundle) as tar:
|
|
try:
|
|
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
|
|
except KeyError:
|
|
return JsonResponse({'err': 1, 'err_desc': _('Invalid tar file, missing manifest')})
|
|
application_slug = manifest.get('slug')
|
|
application_version = manifest.get('version_number')
|
|
if not application_slug or not application_version:
|
|
return JsonResponse({'data': {}})
|
|
|
|
differences = []
|
|
unknown_elements = []
|
|
no_history_elements = []
|
|
legacy_elements = []
|
|
for element in manifest.get('elements'):
|
|
if element['type'] not in klasses or element['type'] == 'roles':
|
|
continue
|
|
element_klass = klasses[element['type']]
|
|
try:
|
|
element_content = tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
|
|
except KeyError:
|
|
return JsonResponse(
|
|
{
|
|
'err': 1,
|
|
'err_desc': _(
|
|
'Invalid tar file, missing component %s/%s'
|
|
% (element['type'], element['slug'])
|
|
),
|
|
}
|
|
)
|
|
tree = ET.fromstring(element_content)
|
|
if hasattr(element_klass, 'url_name'):
|
|
slug = xml_node_text(tree.find('url_name'))
|
|
else:
|
|
slug = xml_node_text(tree.find('slug'))
|
|
try:
|
|
obj = element_klass.get_by_slug(slug)
|
|
if obj is None:
|
|
raise KeyError
|
|
except KeyError:
|
|
# element not found, report this as unexisting
|
|
unknown_elements.append(
|
|
{
|
|
'type': element['type'],
|
|
'slug': element['slug'],
|
|
}
|
|
)
|
|
continue
|
|
applications = Application.select([Equal('slug', application_slug)])
|
|
legacy = False
|
|
if not applications:
|
|
legacy = True
|
|
else:
|
|
application = applications[0]
|
|
elements = ApplicationElement.select(
|
|
[
|
|
Equal('application_id', application.id),
|
|
Equal('object_type', obj.xml_root_node),
|
|
Equal('object_id', obj.id),
|
|
]
|
|
)
|
|
if not elements:
|
|
legacy = True
|
|
if legacy:
|
|
# object exists, but not linked to the application
|
|
legacy_elements.append(
|
|
{
|
|
'type': element['type'],
|
|
'slug': element['slug'],
|
|
# information needed here, Relation objects may not exist yet in hobo
|
|
'text': obj.name,
|
|
'url': request.build_absolute_uri(
|
|
reverse(
|
|
'api-export-import-object-redirect',
|
|
kwargs={'objects': element['type'], 'slug': element['slug']},
|
|
)
|
|
),
|
|
}
|
|
)
|
|
continue
|
|
snapshots_for_app = get_publisher().snapshot_class.select(
|
|
[
|
|
Equal('object_type', obj.xml_root_node),
|
|
Equal('object_id', obj.id),
|
|
Equal('application_slug', application_slug),
|
|
Equal('application_version', application_version),
|
|
],
|
|
order_by='-timestamp',
|
|
)
|
|
if not snapshots_for_app:
|
|
# legacy, no snapshot for this bundle
|
|
no_history_elements.append(
|
|
{
|
|
'type': element['type'],
|
|
'slug': element['slug'],
|
|
}
|
|
)
|
|
continue
|
|
snapshot_for_app = snapshots_for_app[0]
|
|
last_snapshot = get_publisher().snapshot_class.select_object_history(obj)[0]
|
|
if snapshot_for_app.id != last_snapshot.id:
|
|
differences.append(
|
|
{
|
|
'type': element['type'],
|
|
'slug': element['slug'],
|
|
'url': '%shistory/compare?version1=%s&version2=%s'
|
|
% (obj.get_admin_url(), snapshot_for_app.id, last_snapshot.id),
|
|
}
|
|
)
|
|
except tarfile.TarError:
|
|
return JsonResponse({'err': 1, 'err_desc': _('Invalid tar file')})
|
|
|
|
return JsonResponse(
|
|
{
|
|
'data': {
|
|
'differences': differences,
|
|
'unknown_elements': unknown_elements,
|
|
'no_history_elements': no_history_elements,
|
|
'legacy_elements': legacy_elements,
|
|
}
|
|
}
|
|
)
|
|
|
|
|
|
class BundleKeyError(Exception):
|
|
pass
|
|
|
|
|
|
class BundleImportJob(AfterJob):
|
|
def __init__(self, tar_content, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.tar_content = tar_content
|
|
|
|
def execute(self):
|
|
object_types = [x for x in klasses if x != 'roles']
|
|
# be sure categories are imported first
|
|
object_types = sorted(object_types, key=lambda a: 'categories' in a, reverse=True)
|
|
|
|
tar_io = io.BytesIO(self.tar_content)
|
|
error = None
|
|
try:
|
|
with tarfile.open(fileobj=tar_io) as self.tar:
|
|
try:
|
|
manifest = json.loads(self.tar.extractfile('manifest.json').read().decode())
|
|
except KeyError:
|
|
raise BundleKeyError(_('Invalid tar file, missing manifest.'))
|
|
self.application = Application.update_or_create_from_manifest(
|
|
manifest, self.tar, editable=False, install=False
|
|
)
|
|
|
|
# count number of actions
|
|
self.total_count = 0
|
|
self.total_count += len(
|
|
[
|
|
x
|
|
for x in manifest.get('elements')
|
|
if x.get('type') in ('forms', 'cards', 'blocks', 'workflows')
|
|
]
|
|
)
|
|
self.total_count += (
|
|
len([x for x in manifest.get('elements') if x.get('type') in object_types]) * 2
|
|
)
|
|
|
|
# init cache of application elements, from imported manifest
|
|
self.application_elements = set()
|
|
|
|
# first pass on formdef/carddef/blockdef/workflows to create them empty
|
|
# (name and slug); so they can be found for sure in import pass
|
|
for _type in ('forms', 'cards', 'blocks', 'workflows'):
|
|
self.pre_install([x for x in manifest.get('elements') if x.get('type') == _type])
|
|
|
|
# real installation pass
|
|
for _type in object_types:
|
|
self.install([x for x in manifest.get('elements') if x.get('type') == _type])
|
|
|
|
# again, to remove [pre-install] in dependencies labels
|
|
for _type in object_types:
|
|
self.install(
|
|
[x for x in manifest.get('elements') if x.get('type') == _type], finalize=True
|
|
)
|
|
|
|
# remove obsolete application elements
|
|
self.unlink_obsolete_objects()
|
|
|
|
except (
|
|
BlockdefImportError,
|
|
FormdefImportError,
|
|
WorkflowImportError,
|
|
NamedDataSourceImportError,
|
|
NamedWsCallImportError,
|
|
) as e:
|
|
error = str(e)
|
|
except tarfile.TarError:
|
|
error = _('Invalid tar file.')
|
|
except BundleKeyError as e:
|
|
error = str(e)
|
|
|
|
if error:
|
|
self.status = 'failed'
|
|
self.failure_label = str(_('Error: %s') % error)
|
|
|
|
def pre_install(self, elements):
|
|
for element in elements:
|
|
element_klass = klasses[element['type']]
|
|
try:
|
|
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
|
|
except KeyError:
|
|
raise BundleKeyError(
|
|
'Invalid tar file, missing component %s/%s.' % (element['type'], element['slug'])
|
|
)
|
|
tree = ET.fromstring(element_content)
|
|
if hasattr(element_klass, 'url_name'):
|
|
slug = xml_node_text(tree.find('url_name'))
|
|
else:
|
|
slug = xml_node_text(tree.find('slug'))
|
|
try:
|
|
existing_object = element_klass.get_by_slug(slug)
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
if existing_object:
|
|
self.increment_count()
|
|
continue
|
|
new_object = element_klass()
|
|
new_object.slug = slug
|
|
new_object.name = '[pre-import] %s' % xml_node_text(tree.find('name'))
|
|
new_object.store(comment=_('Application (%s) initial installation') % self.application.name)
|
|
self.link_object(new_object)
|
|
self.increment_count()
|
|
|
|
if get_response():
|
|
# process pre-import after jobs earlier, so there are no multiple jobs for
|
|
# the same object afterwards.
|
|
get_response().process_after_jobs()
|
|
|
|
def install(self, elements, finalize=False):
|
|
if not elements:
|
|
return
|
|
|
|
element_klass = klasses[elements[0]['type']]
|
|
|
|
if not finalize and element_klass in category_classes:
|
|
# for categories, keep positions before install
|
|
objects_by_slug = {i.slug: i for i in element_klass.select()}
|
|
initial_positions = {
|
|
i.slug: i.position if i.position is not None else 10000 for i in objects_by_slug.values()
|
|
}
|
|
|
|
imported_positions = {}
|
|
|
|
for element in elements:
|
|
try:
|
|
element_content = self.tar.extractfile('%s/%s' % (element['type'], element['slug'])).read()
|
|
except KeyError:
|
|
raise BundleKeyError(
|
|
'Invalid tar file, missing component %s/%s.' % (element['type'], element['slug'])
|
|
)
|
|
new_object = element_klass.import_from_xml_tree(
|
|
ET.fromstring(element_content),
|
|
include_id=False,
|
|
check_datasources=False,
|
|
check_deprecated=True,
|
|
)
|
|
if not finalize and element_klass in category_classes:
|
|
# for categories, keep positions of imported objects
|
|
imported_positions[new_object.slug] = (
|
|
new_object.position if new_object.position is not None else 10000
|
|
)
|
|
try:
|
|
existing_object = element_klass.get_by_slug(new_object.slug)
|
|
if existing_object is None:
|
|
raise KeyError()
|
|
except KeyError:
|
|
new_object.store(
|
|
comment=_('Application (%s)') % self.application.name, application=self.application
|
|
)
|
|
self.link_object(new_object)
|
|
self.increment_count()
|
|
continue
|
|
# replace
|
|
new_object.id = existing_object.id
|
|
if finalize:
|
|
last_snapshot = get_publisher().snapshot_class.get_latest(
|
|
new_object.xml_root_node, new_object.id
|
|
)
|
|
if '[pre-import]' not in last_snapshot.get_serialization():
|
|
self.increment_count()
|
|
continue
|
|
|
|
if element['type'] in ('forms', 'cards'):
|
|
# keep internal references
|
|
new_object.table_name = existing_object.table_name
|
|
|
|
if element['type'] in ('forms', 'cards') and not existing_object.name.startswith('[pre-import]'):
|
|
# keep some settings when updating
|
|
for attr in (
|
|
'workflow_options',
|
|
'workflow_roles',
|
|
'roles',
|
|
'required_authentication_contexts',
|
|
'backoffice_submission_roles',
|
|
'publication_date',
|
|
'expiration_date',
|
|
'disabled',
|
|
):
|
|
setattr(new_object, attr, getattr(existing_object, attr))
|
|
comment = _('Application (%s) update')
|
|
if existing_object.name.startswith('[pre-import]'):
|
|
comment = _('Application (%s) complete initial installation')
|
|
if finalize:
|
|
comment = _('Application (%s) finalize initial installation')
|
|
new_object.store(comment=comment % self.application.name, application=self.application)
|
|
self.link_object(new_object)
|
|
self.increment_count()
|
|
|
|
# for categories, rebuild positions
|
|
if not finalize and element_klass in category_classes:
|
|
objects_by_slug = {i.slug: i for i in element_klass.select()}
|
|
# find imported objects from initials
|
|
existing_positions = {k: v for k, v in initial_positions.items() if k in imported_positions}
|
|
# find not imported objects from initials
|
|
not_imported_positions = {
|
|
k: v for k, v in initial_positions.items() if k not in imported_positions
|
|
}
|
|
# determine position of application objects
|
|
application_position = None
|
|
if existing_positions:
|
|
application_position = min(existing_positions.values())
|
|
# all objects placed before application objects
|
|
before_positions = {
|
|
k: v
|
|
for k, v in not_imported_positions.items()
|
|
if application_position is None or v < application_position
|
|
}
|
|
# all objects placed after application objects
|
|
after_positions = {
|
|
k: v
|
|
for k, v in not_imported_positions.items()
|
|
if application_position is not None and v >= application_position
|
|
}
|
|
# rebuild positions
|
|
position = 1
|
|
slugs = sorted(before_positions.keys(), key=lambda a: before_positions[a])
|
|
slugs += sorted(imported_positions.keys(), key=lambda a: imported_positions[a])
|
|
slugs += sorted(after_positions.keys(), key=lambda a: after_positions[a])
|
|
for slug in slugs:
|
|
objects_by_slug[slug].position = position
|
|
objects_by_slug[slug].store(store_snapshot=False)
|
|
position += 1
|
|
|
|
def link_object(self, obj):
|
|
element = ApplicationElement.update_or_create_for_object(self.application, obj)
|
|
self.application_elements.add((element.object_type, element.object_id))
|
|
|
|
def unlink_obsolete_objects(self):
|
|
known_elements = ApplicationElement.select([Equal('application_id', self.application.id)])
|
|
for element in known_elements:
|
|
if (element.object_type, element.object_id) not in self.application_elements:
|
|
ApplicationElement.remove_object(element.id)
|
|
|
|
|
|
@signature_required
|
|
def bundle_import(request):
|
|
job = BundleImportJob(tar_content=request.FILES['bundle'].read())
|
|
job.store()
|
|
job.run(spool=True)
|
|
return JsonResponse({'err': 0, 'url': job.get_api_status_url()})
|
|
|
|
|
|
class BundleDeclareJob(BundleImportJob):
|
|
def execute(self):
|
|
object_types = [x for x in klasses if x != 'roles']
|
|
|
|
tar_io = io.BytesIO(self.tar_content)
|
|
error = None
|
|
try:
|
|
with tarfile.open(fileobj=tar_io) as self.tar:
|
|
try:
|
|
manifest = json.loads(self.tar.extractfile('manifest.json').read().decode())
|
|
except KeyError:
|
|
raise BundleKeyError(_('Invalid tar file, missing manifest.'))
|
|
else:
|
|
self.application = Application.update_or_create_from_manifest(
|
|
manifest, self.tar, editable=True, install=True
|
|
)
|
|
|
|
# count number of actions
|
|
self.total_count = len(
|
|
[x for x in manifest.get('elements') if x.get('type') in object_types]
|
|
)
|
|
|
|
# init cache of application elements, from manifest
|
|
self.application_elements = set()
|
|
|
|
# declare elements
|
|
for type in object_types:
|
|
self.declare([x for x in manifest.get('elements') if x.get('type') == type])
|
|
|
|
# remove obsolete application elements
|
|
self.unlink_obsolete_objects()
|
|
except tarfile.TarError:
|
|
error = _('Invalid tar file.')
|
|
except BundleKeyError as e:
|
|
error = str(e)
|
|
|
|
if error:
|
|
self.status = 'failed'
|
|
self.failure_label = str(_('Error: %s') % error)
|
|
|
|
def declare(self, elements):
|
|
for element in elements:
|
|
element_klass = klasses[element['type']]
|
|
element_slug = element['slug']
|
|
existing_object = element_klass.get_by_slug(element_slug, ignore_errors=True)
|
|
if existing_object:
|
|
self.link_object(existing_object)
|
|
self.increment_count()
|
|
|
|
|
|
@signature_required
|
|
def bundle_declare(request):
|
|
job = BundleDeclareJob(tar_content=request.FILES['bundle'].read())
|
|
job.store()
|
|
job.run(spool=True)
|
|
return JsonResponse({'err': 0, 'url': job.get_api_status_url()})
|
|
|
|
|
|
@signature_required
|
|
def unlink(request):
|
|
if request.method == 'POST' and request.POST.get('application'):
|
|
applications = Application.select([Equal('slug', request.POST['application'])])
|
|
if applications:
|
|
application = applications[0]
|
|
elements = ApplicationElement.select([Equal('application_id', application.id)])
|
|
for element in elements:
|
|
ApplicationElement.remove_object(element.id)
|
|
Application.remove_object(application.id)
|
|
|
|
return JsonResponse({'err': 0})
|