hobo/hobo/applications/models.py

450 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# hobo - portal to configure and deploy applications
# Copyright (C) 2015-2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import io
import json
import os
import sys
import tarfile
import traceback
import urllib.parse
from django.conf import settings
from django.core.files.base import ContentFile
from django.db import connection, models
from django.db.models import JSONField
from django.utils.text import slugify
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from hobo.environment.utils import get_installed_services
from .utils import Requests
requests = Requests()
def get_object_types():
object_types = []
for service_id, services in getattr(settings, 'KNOWN_SERVICES', {}).items():
if service_id not in Application.SUPPORTED_MODULES:
continue
service_objects = {x.get_base_url_path(): x for x in get_installed_services(types=[service_id])}
for service in services.values():
if service['url'] not in service_objects:
continue
if service_objects[service['url']].secondary:
continue
url = urllib.parse.urljoin(service['url'], 'api/export-import/')
response = requests.get(url)
if not response.ok:
continue
for object_type in response.json()['data']:
object_type['service'] = service
object_types.append(object_type)
return object_types
class ApplicationError(Exception):
def __init__(self, msg):
self.msg = msg
class ScanError(ApplicationError):
pass
class DeploymentError(ApplicationError):
pass
class UnlinkError(ApplicationError):
pass
class Application(models.Model):
SUPPORTED_MODULES = ('wcs',)
name = models.CharField(max_length=100, verbose_name=_('Name'))
slug = models.SlugField(max_length=100)
icon = models.FileField(
verbose_name=_('Icon'),
help_text=_(
'Icon file must be in JPEG or PNG format, and should be a square of at least 512×512 pixels.'
),
upload_to='applications/icons/',
blank=True,
null=True,
)
description = models.TextField(verbose_name=_('Description'), blank=True)
documentation_url = models.URLField(_('Documentation URL'), blank=True)
editable = models.BooleanField(default=True)
elements = models.ManyToManyField('Element', blank=True, through='Relation')
creation_timestamp = models.DateTimeField(default=now)
last_update_timestamp = models.DateTimeField(auto_now=True)
def __repr__(self):
return '<Application %s>' % self.slug
def save(self, *args, **kwargs):
if not self.slug:
base_slug = slugify(self.name)[:95]
slug = base_slug
i = 1
while Application.objects.filter(slug=slug).exists():
slug = '%s-%s' % (base_slug, i)
i += 1
self.slug = slug
super().save(*args, **kwargs)
def refresh_elements(self, cache_only=False):
if not cache_only:
self.relation_set.filter(auto_dependency=True).delete()
remote_elements = {}
relations = self.relation_set.select_related('element')
elements = {(x.element.type, x.element.slug): (x.element, x) for x in relations}
current_object_types = {t for t, s in elements}
for object_type in get_object_types():
if object_type['id'] not in current_object_types:
continue
if not cache_only and object_type.get('minor'):
continue
url = object_type['urls']['list']
response = requests.get(url)
if not response.ok:
raise ScanError(
_('Failed to get elements of type %s (%s)' % (object_type['id'], response.status_code))
)
remote_elements[object_type['id']] = {x['id']: x for x in response.json()['data']}
for element, relation in elements.values():
if not remote_elements[element.type].get(element.slug):
continue
remote_element = remote_elements[element.type][element.slug]
if cache_only:
if element.cache == remote_element:
continue
element.cache = remote_element
element.save()
elements[(element.type, element.slug)] = (element, relation)
continue
if element.name == remote_element['text'] and element.cache == remote_element:
continue
element.name = remote_element['text']
element.cache = remote_element
element.save()
elements[(element.type, element.slug)] = (element, relation)
return elements
def scandeps(self):
elements = self.refresh_elements()
finished = False
while not finished:
finished = True
for el, rel in list(elements.values()):
dependencies_url = el.cache['urls'].get('dependencies')
if not dependencies_url:
continue
response = requests.get(dependencies_url)
if not response.ok:
rel.set_error(response.status_code)
raise ScanError(
_(
'Failed to scan "%s" (type %s, slug %s) dependencies (%s)'
% (el.name, el.type, el.slug, response.status_code)
)
)
rel.reset_error()
for dependency in response.json()['data']:
if (dependency['type'], dependency['id']) in elements:
continue
finished = False
element, created = Element.objects.get_or_create(
type=dependency['type'], slug=dependency['id'], defaults={'name': dependency['text']}
)
element.name = dependency['text']
element.cache = dependency
element.save()
relation, created = Relation.objects.get_or_create(application=self, element=element)
if created:
relation.auto_dependency = True
relation.save()
elements[(element.type, element.slug)] = (element, relation)
return elements
def unlink(self):
for service_id, services in getattr(settings, 'KNOWN_SERVICES', {}).items():
if service_id not in Application.SUPPORTED_MODULES:
continue
service_objects = {x.get_base_url_path(): x for x in get_installed_services(types=[service_id])}
for service in services.values():
if service['url'] not in service_objects:
continue
if service_objects[service['url']].secondary:
continue
url = urllib.parse.urljoin(service['url'], 'api/export-import/unlink/')
response = requests.post(url, data={'application': self.slug})
if not response.ok:
raise UnlinkError(
_('Failed to unlink application in module %s (%s)')
% (service_id, response.status_code)
)
class Element(models.Model):
type = models.CharField(max_length=100, verbose_name=_('Type'))
slug = models.SlugField(max_length=500, verbose_name=_('Slug'))
name = models.CharField(max_length=500, verbose_name=_('Name'))
cache = JSONField(blank=True, default=dict)
def __repr__(self):
return '<Element %s/%s>' % (self.type, self.slug)
def get_redirect_url(self):
if self.type == 'roles':
return
if not self.cache.get('urls'):
return
if self.cache['urls'].get('redirect'):
return self.cache['urls']['redirect']
if self.cache['urls'].get('export'):
return '%sredirect/' % self.cache['urls']['export']
class Relation(models.Model):
application = models.ForeignKey(Application, on_delete=models.CASCADE)
element = models.ForeignKey(Element, on_delete=models.CASCADE)
auto_dependency = models.BooleanField(default=False)
error = models.BooleanField(default=False)
error_status = models.CharField(
max_length=100,
choices=[
('notfound', _('Not Found')),
('error', _('Error')),
],
null=True,
)
def __repr__(self):
return '<Relation %s - %s/%s>' % (self.application.slug, self.element.type, self.element.slug)
def set_error(self, http_status_code):
self.error = True
self.error_status = 'notfound' if http_status_code == 404 else 'error'
self.save()
def reset_error(self):
self.error = False
self.error_status = None
self.save()
class Version(models.Model):
application = models.ForeignKey(Application, on_delete=models.CASCADE)
number = models.CharField(max_length=100, verbose_name=_('Number'))
notes = models.TextField(verbose_name=_('Notes'), blank=True)
bundle = models.FileField(upload_to='applications', blank=True, null=True)
creation_timestamp = models.DateTimeField(default=now)
last_update_timestamp = models.DateTimeField(auto_now=True)
deployment_status = JSONField(blank=True, default=dict)
def __repr__(self):
return '<Version %s>' % self.application.slug
def create_bundle(self, job=None):
app = self.application
elements = app.scandeps()
tar_io = io.BytesIO()
with tarfile.open(mode='w', fileobj=tar_io) as tar:
manifest_json = {
'application': app.name,
'slug': app.slug,
'description': app.description,
'documentation_url': app.documentation_url,
'icon': os.path.basename(app.icon.name) if app.icon.name else None,
'version_number': self.number,
'version_notes': self.notes,
'elements': [],
}
for element, relation in elements.values():
manifest_json['elements'].append(
{
'type': element.type,
'slug': element.slug,
'name': element.name,
'auto-dependency': relation.auto_dependency,
}
)
response = requests.get(element.cache['urls']['export'])
tarinfo = tarfile.TarInfo('%s/%s' % (element.type, element.slug))
tarinfo.mtime = self.last_update_timestamp.timestamp()
tarinfo.size = int(response.headers['content-length'])
tar.addfile(tarinfo, fileobj=io.BytesIO(response.content))
manifest_fd = io.BytesIO(json.dumps(manifest_json, indent=2).encode())
tarinfo = tarfile.TarInfo('manifest.json')
tarinfo.size = len(manifest_fd.getvalue())
tarinfo.mtime = self.last_update_timestamp.timestamp()
tar.addfile(tarinfo, fileobj=manifest_fd)
if app.icon.name:
icon_fd = app.icon.file
tarinfo = tarfile.TarInfo(manifest_json['icon'])
tarinfo.size = icon_fd.size
tarinfo.mtime = self.last_update_timestamp.timestamp()
tar.addfile(tarinfo, fileobj=icon_fd)
self.bundle.save('%s.tar' % app.slug, content=ContentFile(tar_io.getvalue()))
self.save()
bundle_content = self.bundle.read()
self.do_something_with_bundle(bundle_content, 'declare', job=job)
def deploy(self, job=None):
bundle_content = self.bundle.read()
self.deploy_roles(bundle_content)
self.do_something_with_bundle(bundle_content, 'deploy', job=job)
self.application.refresh_elements(cache_only=True)
def do_something_with_bundle(self, bundle_content, action, job=None):
if action == 'deploy':
target_url = 'api/export-import/bundle-import/'
exception_message = _('Failed to deploy module %s (%s)')
elif action == 'declare':
target_url = 'api/export-import/bundle-declare/'
exception_message = _('Failed to declare elements for module %s (%s)')
for service_id, services in getattr(settings, 'KNOWN_SERVICES', {}).items():
if service_id not in Application.SUPPORTED_MODULES:
continue
service_objects = {x.get_base_url_path(): x for x in get_installed_services(types=[service_id])}
for service in services.values():
if service['url'] not in service_objects:
continue
if service_objects[service['url']].secondary:
continue
url = urllib.parse.urljoin(service['url'], target_url)
response = requests.put(url, data=bundle_content)
if not response.ok:
raise DeploymentError(exception_message % (service_id, response.status_code))
if not job:
continue
try:
response_json = response.json()
except json.JSONDecodeError:
continue
if not response_json.get('url'):
continue
if service_id not in job.progression_urls:
job.progression_urls[service_id] = {}
job.progression_urls[service_id][service['title']] = response_json['url']
job.save()
def get_authentic_service(self):
for service_id, services in getattr(settings, 'KNOWN_SERVICES', {}).items():
if service_id == 'authentic':
for service in services.values():
return service
return None
def deploy_roles(self, bundle):
tar_io = io.BytesIO(bundle)
service = self.get_authentic_service()
if not service:
return
roles_api_url = urllib.parse.urljoin(service['url'], 'api/roles/?update_or_create=slug')
provision_api_url = urllib.parse.urljoin(service['url'], 'api/provision/')
with tarfile.open(fileobj=tar_io) as tar:
manifest = json.loads(tar.extractfile('manifest.json').read().decode())
for element in manifest.get('elements'):
if element.get('type') != 'roles':
continue
role_info = json.loads(tar.extractfile('%s/%s' % (element['type'], element['slug'])).read())
# create or update
response = requests.post(roles_api_url, json=role_info)
if not response.ok:
raise DeploymentError(
_('Failed to create role %s (%s)') % (element['slug'], response.status_code)
)
# then force provisionning
response = requests.post(provision_api_url, json={'role_uuid': response.json()['uuid']})
if not response.ok:
raise DeploymentError(
_('Failed to provision role %s (%s)') % (element['slug'], response.status_code)
)
STATUS_CHOICES = [
('registered', _('Registered')),
('running', _('Running')),
('failed', _('Failed')),
('completed', _('Completed')),
]
class AsyncJob(models.Model):
label = models.CharField(max_length=100)
status = models.CharField(
max_length=100,
default='registered',
choices=STATUS_CHOICES,
)
creation_timestamp = models.DateTimeField(default=now)
last_update_timestamp = models.DateTimeField(auto_now=True)
completion_timestamp = models.DateTimeField(default=None, null=True)
exception = models.TextField()
application = models.ForeignKey(Application, on_delete=models.CASCADE)
version = models.ForeignKey(Version, on_delete=models.CASCADE, null=True)
action = models.CharField(max_length=100)
progression_urls = JSONField(blank=True, default=dict)
raise_exception = True
def run(self, spool=False):
if 'uwsgi' in sys.modules and spool:
from hobo.applications.spooler import run_job
tenant = getattr(connection, 'tenant', None)
domain = getattr(tenant, 'domain_url', '')
run_job.spool(domain=domain.encode(), job_id=str(self.pk).encode())
return
self.status = 'running'
self.save()
try:
if self.action == 'scandeps':
self.application.scandeps()
elif self.action == 'create_bundle':
self.version.create_bundle(self)
elif self.action == 'deploy':
self.version.deploy(self)
except ApplicationError as e:
self.status = 'failed'
self.exception = e.msg
if self.raise_exception:
raise
except Exception:
self.status = 'failed'
self.exception = traceback.format_exc()
if self.raise_exception:
raise
finally:
if self.status == 'running':
self.status = 'completed'
self.completion_timestamp = now()
self.save()