export_import: endpoints import and declare with async job (#87614)
This commit is contained in:
parent
61fc9aab9b
commit
d128ae63be
|
@ -20,6 +20,7 @@ import tarfile
|
|||
|
||||
from django.contrib.auth.models import Group
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.files.base import ContentFile
|
||||
from django.http import Http404
|
||||
from django.shortcuts import get_object_or_404, redirect
|
||||
from django.urls import reverse
|
||||
|
@ -28,9 +29,8 @@ from rest_framework import permissions
|
|||
from rest_framework.generics import GenericAPIView
|
||||
from rest_framework.response import Response
|
||||
|
||||
from combo.apps.export_import.models import Application, ApplicationElement
|
||||
from combo.apps.export_import.models import Application, ApplicationAsyncJob, ApplicationElement
|
||||
from combo.data.models import Page, PageSnapshot
|
||||
from combo.data.utils import import_site
|
||||
from combo.utils.misc import is_portal_agent
|
||||
|
||||
klasses = {klass.application_component_type: klass for klass in [Page]}
|
||||
|
@ -301,73 +301,28 @@ bundle_check = BundleCheck.as_view()
|
|||
|
||||
class BundleImport(GenericAPIView):
|
||||
permission_classes = (permissions.IsAuthenticated,)
|
||||
install = True
|
||||
action = 'import_bundle'
|
||||
|
||||
def put(self, request, *args, **kwargs):
|
||||
tar_io = io.BytesIO(request.read())
|
||||
page_type = 'portal-agent-pages' if is_portal_agent() else 'pages'
|
||||
pages = []
|
||||
with tarfile.open(fileobj=tar_io) as tar:
|
||||
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
|
||||
self.application = Application.update_or_create_from_manifest(
|
||||
manifest,
|
||||
tar,
|
||||
editable=not self.install,
|
||||
)
|
||||
for element in manifest.get('elements'):
|
||||
if element.get('type') != page_type:
|
||||
continue
|
||||
pages.append(
|
||||
json.loads(tar.extractfile(f'{page_type}/{element["slug"]}').read().decode()).get('data')
|
||||
)
|
||||
# init cache of application elements, from manifest
|
||||
self.application_elements = set()
|
||||
# install pages
|
||||
self.do_something(pages)
|
||||
# create application elements
|
||||
self.link_objects(pages)
|
||||
# remove obsolete application elements
|
||||
self.unlink_obsolete_objects()
|
||||
return Response({'err': 0})
|
||||
|
||||
def do_something(self, pages):
|
||||
if pages:
|
||||
import_site({'pages': pages})
|
||||
|
||||
def link_objects(self, pages):
|
||||
for page in pages:
|
||||
page_uuid = page['fields']['uuid']
|
||||
try:
|
||||
existing_page = Page.objects.get(uuid=page_uuid)
|
||||
except Page.DoesNotExist:
|
||||
pass
|
||||
else:
|
||||
element = ApplicationElement.update_or_create_for_object(self.application, existing_page)
|
||||
self.application_elements.add(element.content_object)
|
||||
if self.install is True:
|
||||
PageSnapshot.take(
|
||||
existing_page,
|
||||
request=self.request,
|
||||
comment=_('Application (%s)') % self.application,
|
||||
application=self.application,
|
||||
)
|
||||
|
||||
def unlink_obsolete_objects(self):
|
||||
known_elements = ApplicationElement.objects.filter(application=self.application)
|
||||
for element in known_elements:
|
||||
if element.content_object not in self.application_elements:
|
||||
element.delete()
|
||||
application_slug = manifest.get('slug')
|
||||
job = ApplicationAsyncJob(
|
||||
action=self.action,
|
||||
)
|
||||
job.bundle.save('%s.tar' % application_slug, content=ContentFile(tar_io.getvalue()))
|
||||
job.save()
|
||||
job.run(spool=True)
|
||||
return Response({'err': 0, 'url': job.get_api_status_url(request)})
|
||||
|
||||
|
||||
bundle_import = BundleImport.as_view()
|
||||
|
||||
|
||||
class BundleDeclare(BundleImport):
|
||||
install = False
|
||||
|
||||
def do_something(self, pages):
|
||||
# no installation on declare
|
||||
pass
|
||||
permission_classes = (permissions.IsAuthenticated,)
|
||||
action = 'declare_bundle'
|
||||
|
||||
|
||||
bundle_declare = BundleDeclare.as_view()
|
||||
|
@ -389,3 +344,24 @@ class BundleUnlink(GenericAPIView):
|
|||
|
||||
|
||||
bundle_unlink = BundleUnlink.as_view()
|
||||
|
||||
|
||||
class JobStatus(GenericAPIView):
|
||||
permission_classes = (permissions.IsAuthenticated,)
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
job = get_object_or_404(ApplicationAsyncJob, uuid=kwargs['job_uuid'])
|
||||
return Response(
|
||||
{
|
||||
'err': 0,
|
||||
'data': {
|
||||
'status': job.status,
|
||||
'creation_time': job.creation_timestamp,
|
||||
'completion_time': job.completion_timestamp,
|
||||
'completion_status': job.get_completion_status(),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
job_status = JobStatus.as_view()
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
import uuid
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
import combo.apps.export_import.models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
dependencies = [
|
||||
('export_import', '0002_application'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='ApplicationAsyncJob',
|
||||
fields=[
|
||||
(
|
||||
'id',
|
||||
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
|
||||
),
|
||||
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, unique=True)),
|
||||
(
|
||||
'status',
|
||||
models.CharField(
|
||||
choices=[
|
||||
('registered', 'Registered'),
|
||||
('running', 'Running'),
|
||||
('failed', 'Failed'),
|
||||
('completed', 'Completed'),
|
||||
],
|
||||
default='registered',
|
||||
max_length=100,
|
||||
),
|
||||
),
|
||||
('exception', models.TextField()),
|
||||
('action', models.CharField(max_length=100)),
|
||||
(
|
||||
'bundle',
|
||||
models.FileField(
|
||||
blank=True, null=True, upload_to=combo.apps.export_import.models.upload_to_job_uuid
|
||||
),
|
||||
),
|
||||
('total_count', models.PositiveIntegerField(default=0)),
|
||||
('current_count', models.PositiveIntegerField(default=0)),
|
||||
('creation_timestamp', models.DateTimeField(auto_now_add=True)),
|
||||
('last_update_timestamp', models.DateTimeField(auto_now=True)),
|
||||
('completion_timestamp', models.DateTimeField(default=None, null=True)),
|
||||
],
|
||||
),
|
||||
]
|
|
@ -15,10 +15,21 @@
|
|||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import collections
|
||||
import io
|
||||
import json
|
||||
import sys
|
||||
import tarfile
|
||||
import traceback
|
||||
import uuid
|
||||
|
||||
from django.contrib.contenttypes.fields import GenericForeignKey
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db import models
|
||||
from django.urls import reverse
|
||||
from django.utils.timezone import now
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from combo.utils.misc import is_portal_agent
|
||||
|
||||
|
||||
class Application(models.Model):
|
||||
|
@ -124,3 +135,149 @@ class ApplicationElement(models.Model):
|
|||
if not created:
|
||||
element.save()
|
||||
return element
|
||||
|
||||
|
||||
STATUS_CHOICES = [
|
||||
('registered', _('Registered')),
|
||||
('running', _('Running')),
|
||||
('failed', _('Failed')),
|
||||
('completed', _('Completed')),
|
||||
]
|
||||
|
||||
|
||||
def upload_to_job_uuid(instance, filename):
|
||||
return f'applications/bundles/{instance.uuid}/{filename}'
|
||||
|
||||
|
||||
class ApplicationAsyncJob(models.Model):
|
||||
uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
|
||||
status = models.CharField(
|
||||
max_length=100,
|
||||
default='registered',
|
||||
choices=STATUS_CHOICES,
|
||||
)
|
||||
exception = models.TextField()
|
||||
action = models.CharField(max_length=100)
|
||||
bundle = models.FileField(
|
||||
upload_to=upload_to_job_uuid,
|
||||
blank=True,
|
||||
null=True,
|
||||
)
|
||||
total_count = models.PositiveIntegerField(default=0)
|
||||
current_count = models.PositiveIntegerField(default=0)
|
||||
|
||||
creation_timestamp = models.DateTimeField(auto_now_add=True)
|
||||
last_update_timestamp = models.DateTimeField(auto_now=True)
|
||||
completion_timestamp = models.DateTimeField(default=None, null=True)
|
||||
|
||||
raise_exception = True
|
||||
|
||||
def run(self, spool=False):
|
||||
if 'uwsgi' in sys.modules and spool:
|
||||
from combo.utils.spooler import run_async_job
|
||||
|
||||
run_async_job(job_id=str(self.pk))
|
||||
return
|
||||
self.status = 'running'
|
||||
self.save()
|
||||
try:
|
||||
getattr(self, self.action)()
|
||||
except Exception:
|
||||
self.status = 'failed'
|
||||
self.exception = traceback.format_exc()
|
||||
if self.raise_exception:
|
||||
raise
|
||||
finally:
|
||||
if self.status == 'running':
|
||||
self.status = 'completed'
|
||||
self.completion_timestamp = now()
|
||||
self.save()
|
||||
|
||||
def process_bundle(self, install=True):
|
||||
from combo.data.utils import import_site
|
||||
|
||||
page_type = 'portal-agent-pages' if is_portal_agent() else 'pages'
|
||||
pages = []
|
||||
tar_io = io.BytesIO(self.bundle.read())
|
||||
with tarfile.open(fileobj=tar_io) as tar:
|
||||
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
|
||||
self.application = Application.update_or_create_from_manifest(
|
||||
manifest,
|
||||
tar,
|
||||
editable=not install,
|
||||
)
|
||||
|
||||
# count number of actions
|
||||
self.total_count = len([x for x in manifest.get('elements') if x.get('type') == page_type])
|
||||
|
||||
for element in manifest.get('elements'):
|
||||
if element.get('type') != page_type:
|
||||
continue
|
||||
pages.append(
|
||||
json.loads(tar.extractfile(f'{page_type}/{element["slug"]}').read().decode()).get('data')
|
||||
)
|
||||
# init cache of application elements, from manifest
|
||||
self.application_elements = set()
|
||||
# install pages
|
||||
if install and pages:
|
||||
import_site({'pages': pages}, job=self)
|
||||
# create application elements
|
||||
self.link_objects(pages, increment=not install)
|
||||
# remove obsolete application elements
|
||||
self.unlink_obsolete_objects()
|
||||
|
||||
def import_bundle(self):
|
||||
self.process_bundle()
|
||||
|
||||
def declare_bundle(self):
|
||||
self.process_bundle(install=False)
|
||||
|
||||
def link_objects(self, pages, increment=False):
|
||||
from combo.data.models import Page, PageSnapshot
|
||||
|
||||
for page in pages:
|
||||
page_uuid = page['fields']['uuid']
|
||||
try:
|
||||
existing_page = Page.objects.get(uuid=page_uuid)
|
||||
except Page.DoesNotExist:
|
||||
pass
|
||||
else:
|
||||
element = ApplicationElement.update_or_create_for_object(self.application, existing_page)
|
||||
self.application_elements.add(element.content_object)
|
||||
if self.action == 'import_bundle':
|
||||
PageSnapshot.take(
|
||||
existing_page,
|
||||
comment=_('Application (%s)') % self.application,
|
||||
application=self.application,
|
||||
)
|
||||
if increment:
|
||||
self.increment_count()
|
||||
|
||||
def unlink_obsolete_objects(self):
|
||||
known_elements = ApplicationElement.objects.filter(application=self.application)
|
||||
for element in known_elements:
|
||||
if element.content_object not in self.application_elements:
|
||||
element.delete()
|
||||
|
||||
def increment_count(self, amount=1):
|
||||
self.current_count = (self.current_count or 0) + amount
|
||||
if (now() - self.last_update_timestamp).total_seconds() > 1:
|
||||
self.save()
|
||||
|
||||
def get_api_status_url(self, request):
|
||||
return request.build_absolute_uri(reverse('api-export-import-job-status', args=[self.uuid]))
|
||||
|
||||
def get_completion_status(self):
|
||||
current_count = self.current_count or 0
|
||||
|
||||
if not current_count:
|
||||
return ''
|
||||
|
||||
if not self.total_count:
|
||||
return _('%(current_count)s (unknown total)') % {'current_count': current_count}
|
||||
|
||||
return _('%(current_count)s/%(total_count)s (%(percent)s%%)') % {
|
||||
'current_count': int(current_count),
|
||||
'total_count': self.total_count,
|
||||
'percent': int(current_count * 100 / self.total_count),
|
||||
}
|
||||
|
|
|
@ -44,4 +44,9 @@ urlpatterns = [
|
|||
api_views.component_redirect,
|
||||
name='api-export-import-component-redirect',
|
||||
),
|
||||
path(
|
||||
'api/export-import/job/<uuid:job_uuid>/status/',
|
||||
api_views.job_status,
|
||||
name='api-export-import-job-status',
|
||||
),
|
||||
]
|
||||
|
|
|
@ -677,7 +677,7 @@ class Page(models.Model):
|
|||
cell.object.import_subobjects(cell_data)
|
||||
|
||||
@classmethod
|
||||
def load_serialized_pages(cls, json_site, request=None):
|
||||
def load_serialized_pages(cls, json_site, request=None, job=None):
|
||||
cells_to_load = []
|
||||
to_load = []
|
||||
imported_pages = []
|
||||
|
@ -703,6 +703,8 @@ class Page(models.Model):
|
|||
for page, created, json_page in to_load:
|
||||
imported_pages.append(cls.load_serialized_page(json_page, page=page, request=request))
|
||||
cells_to_load.extend(json_page.get('cells'))
|
||||
if job is not None:
|
||||
job.increment_count()
|
||||
|
||||
# and cells
|
||||
cls.load_serialized_cells(cells_to_load)
|
||||
|
|
|
@ -70,7 +70,7 @@ def export_site(pages=True, cartography=True, pwa=True, assets=True, payment=Tru
|
|||
return export
|
||||
|
||||
|
||||
def import_site(data, if_empty=False, clean=False, request=None):
|
||||
def import_site(data, if_empty=False, clean=False, request=None, job=None):
|
||||
if 'combo.apps.lingo' in settings.INSTALLED_APPS:
|
||||
from combo.apps.lingo.models import PaymentBackend, Regie
|
||||
|
||||
|
@ -126,7 +126,7 @@ def import_site(data, if_empty=False, clean=False, request=None):
|
|||
if data.get('map-layers') and cartography_support:
|
||||
MapLayer.load_serialized_objects(data.get('map-layers'))
|
||||
Asset.load_serialized_objects(data.get('assets') or [])
|
||||
pages = Page.load_serialized_pages(data.get('pages') or [], request=request)
|
||||
pages = Page.load_serialized_pages(data.get('pages') or [], request=request, job=job)
|
||||
|
||||
if data.get('pwa') and pwa_support:
|
||||
PwaSettings.load_serialized_settings(data['pwa'].get('settings'))
|
||||
|
|
|
@ -21,6 +21,8 @@ from functools import wraps
|
|||
|
||||
from django.db import close_old_connections, connection
|
||||
|
||||
from combo.apps.export_import.models import ApplicationAsyncJob
|
||||
|
||||
USE_UWSGI = 'uwsgi' in sys.modules
|
||||
|
||||
|
||||
|
@ -109,3 +111,11 @@ def refresh_statistics_data(cell_pk, filter_params):
|
|||
|
||||
if cell.statistic.service_slug != 'bijoe':
|
||||
cell.update_subfilters(filter_params)
|
||||
|
||||
|
||||
@tenantspool
|
||||
def run_async_job(job_id):
|
||||
job = ApplicationAsyncJob.objects.get(pk=job_id)
|
||||
job.raise_exception = False
|
||||
job.run()
|
||||
print('got job:', job)
|
||||
|
|
|
@ -301,6 +301,10 @@ def test_bundle_import(app, john_doe):
|
|||
|
||||
# check update
|
||||
resp = app.put('/api/export-import/bundle-import/', bundles[1])
|
||||
job_url = resp.json['url']
|
||||
resp = app.get(job_url)
|
||||
assert resp.json['data']['status'] == 'completed'
|
||||
assert resp.json['data']['completion_status'] == '1/1 (100%)'
|
||||
assert Page.objects.all().count() == 1
|
||||
assert resp.json['err'] == 0
|
||||
assert Application.objects.count() == 1
|
||||
|
@ -327,6 +331,10 @@ def test_bundle_declare(app, john_doe):
|
|||
|
||||
bundle = create_bundle(app, john_doe, visible=False)
|
||||
resp = app.put('/api/export-import/bundle-declare/', bundle)
|
||||
job_url = resp.json['url']
|
||||
resp = app.get(job_url)
|
||||
assert resp.json['data']['status'] == 'completed'
|
||||
assert resp.json['data']['completion_status'] == '1/1 (100%)'
|
||||
assert Page.objects.all().count() == 1
|
||||
assert resp.json['err'] == 0
|
||||
assert Application.objects.count() == 1
|
||||
|
|
Loading…
Reference in New Issue