hobo/hobo/environment/models.py

502 lines
16 KiB
Python

import datetime
import json
import random
import requests
import socket
from django.conf import settings
from django.core.cache import cache
from django.db import models
from django.utils.crypto import get_random_string
from django.utils.encoding import force_text
from django.utils.six.moves.urllib.parse import urlparse
from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
from .utils import Zone, get_installed_services
from .mandayejs_app_settings import APP_SETTINGS_CLASSES, DEFAULT_APP_SETTINGS
SECRET_CHARS = 'abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*(-_=+)'
class Variable(models.Model):
name = models.CharField(max_length=100, verbose_name=_('name'))
label = models.CharField(max_length=100, blank=True, verbose_name=_('label'))
value = models.TextField(verbose_name=_('value'),
blank=True,
help_text=_('start with [ or { for a JSON document'))
auto = models.BooleanField(default=False)
service_type = models.ForeignKey(ContentType, null=True)
service_pk = models.PositiveIntegerField(null=True)
service = GenericForeignKey('service_type', 'service_pk')
last_update_timestamp = models.DateTimeField(auto_now=True, null=True)
def get_field_label(self):
if self.label:
return self.label
return self.name
@property
def json(self):
if self.value and (self.value[0] in '{[' or self.value in ('true', 'false')):
try:
return json.loads(self.value)
except ValueError:
pass
return self.value
def clean(self):
if self.value and (self.value[0] in '{[' or self.value in ('true', 'false')):
try:
json.loads(self.value)
except ValueError:
raise ValidationError('invalid JSON document')
class ServiceBase(models.Model):
class Meta:
abstract = True
title = models.CharField(_('Title'), max_length=50)
slug = models.SlugField(_('Slug'))
base_url = models.CharField(_('Base URL'), max_length=200)
secret_key = models.CharField(_('Secret Key'), max_length=60)
template_name = models.CharField(_('Template'), max_length=60, blank=True)
secondary = models.BooleanField(_('Secondary Service'), default=False)
last_operational_check_timestamp = models.DateTimeField(null=True)
last_operational_success_timestamp = models.DateTimeField(null=True)
last_update_timestamp = models.DateTimeField(auto_now=True, null=True)
variables = GenericRelation(Variable,
content_type_field='service_type', object_id_field='service_pk')
@classmethod
def is_enabled(cls):
return True
def is_operational(self):
return (self.last_operational_success_timestamp is not None and
self.last_operational_success_timestamp == self.last_operational_check_timestamp)
def check_operational(self):
once_now = now()
self.last_operational_check_timestamp = once_now
try:
zone = self.get_admin_zones()[0]
response = requests.get(zone.href, timeout=10, allow_redirects=False)
response.raise_for_status()
self.last_operational_success_timestamp = once_now
except requests.RequestException as e:
pass
self.save(update_fields=('last_operational_check_timestamp', 'last_operational_success_timestamp'))
def wants_frequent_checks(self):
# decides if a "being deployed..." spinner should be displayed (and
# automatically hidden) next to the service.
if self.last_operational_success_timestamp is not None:
# if the service has been marked as operational, we don't need a
# spinner at all.
return False
if self.last_operational_check_timestamp is None:
# if the service has never been checked, sure we wants a spinner.
return True
two_minutes = datetime.timedelta(minutes=2)
# monitor actively for two minutes max.
return (self.last_operational_check_timestamp - self.last_update_timestamp) < two_minutes
def as_dict(self):
as_dict = dict([(x, y) for (x, y) in self.__dict__.items()
if type(y) in (int, str, unicode)])
as_dict['base_url'] = self.get_base_url_path()
as_dict['service-id'] = self.Extra.service_id
as_dict['service-label'] = force_text(self.Extra.service_label)
as_dict['variables'] = dict(((v.name, v.json) for v in self.variables.all()))
as_dict['secondary'] = self.secondary
if self.get_saml_sp_metadata_url():
as_dict['saml-sp-metadata-url'] = self.get_saml_sp_metadata_url()
if self.get_saml_idp_metadata_url():
as_dict['saml-idp-metadata-url'] = self.get_saml_idp_metadata_url()
if self.get_backoffice_menu_url():
as_dict['backoffice-menu-url'] = self.get_backoffice_menu_url()
return as_dict
@property
def name(self):
return self.title
def clean(self, *args, **kwargs):
for service in get_installed_services():
if service.slug == self.slug and service.id != self.id:
raise ValidationError(_('This slug is already used. It must be unique.'))
return super(ServiceBase, self).clean(*args, **kwargs)
def save(self, *args, **kwargs):
self.base_url = self.base_url.strip().lower()
if not self.base_url.endswith('/'):
self.base_url += '/'
if not self.secret_key:
self.secret_key = get_random_string(50, SECRET_CHARS)
is_new = (self.id is None)
super(ServiceBase, self).save(*args, **kwargs)
if is_new and settings.SERVICE_EXTRA_VARIABLES:
for variable in settings.SERVICE_EXTRA_VARIABLES.get(self.Extra.service_id, []):
v = Variable()
if type(variable) is dict:
v.name = variable.get('name')
v.label = variable.get('label')
else:
v.name = variable
v.service = self
v.save()
def get_saml_sp_metadata_url(self):
return None
def get_saml_idp_metadata_url(self):
return None
def get_base_url_path(self):
base_url = self.base_url
if not base_url.endswith('/'):
base_url += '/'
return base_url
def get_backoffice_menu_url(self):
return None
def is_resolvable(self):
try:
netloc = urlparse(self.base_url).netloc
if netloc and socket.gethostbyname(netloc):
return True
except socket.gaierror:
return False
def has_valid_certificate(self):
if not self.is_resolvable():
return False
try:
requests.get(self.base_url, verify=True, allow_redirects=False)
return True
except requests.exceptions.SSLError:
return False
except requests.exceptions.ConnectionError:
return False
def is_running(self):
if not self.is_resolvable():
return False
r = requests.get(self.get_admin_zones()[0].href, verify=False, allow_redirects=False)
return (r.status_code >= 200 and r.status_code < 400)
def get_health_dict(self):
properties = [
('is_resolvable', 120),
('has_valid_certificate', 3600),
('is_running', 60),
('is_operational', 60),
]
result = {}
for name, cache_duration in properties:
cache_key = '%s_%s' % (self.slug, name)
value = cache.get(cache_key)
if value is None:
value = getattr(self, name)()
cache.set(cache_key, value, cache_duration * (0.5 + random.random()))
result[name] = value
return result
class Authentic(ServiceBase):
use_as_idp_for_self = models.BooleanField(
verbose_name=_('Use as IdP'),
default=False)
class Meta:
verbose_name = _('Authentic Identity Provider')
verbose_name_plural = _('Authentic Identity Providers')
ordering = ['title']
class Extra:
service_id = 'authentic'
service_label = _('Authentic')
service_default_slug = 'idp'
def get_admin_zones(self):
return [
Zone(_('User Management'), 'users', self.get_base_url_path() + 'manage/users/'),
Zone(_('Role Management'), 'roles', self.get_base_url_path() + 'manage/roles/'),
]
def get_saml_idp_metadata_url(self):
return self.get_base_url_path() + 'idp/saml2/metadata'
def get_backoffice_menu_url(self):
return self.get_base_url_path() + 'manage/menu.json'
class Wcs(ServiceBase):
class Meta:
verbose_name = _('w.c.s. Web Forms')
verbose_name_plural = _('w.c.s. Web Forms')
ordering = ['title']
class Extra:
service_id = 'wcs'
service_label = _('w.c.s.')
service_default_slug = 'eservices'
def get_backoffice_menu_url(self):
return self.get_base_url_path() + 'backoffice/menu.json'
def get_admin_zones(self):
return [
Zone(self.title, 'webforms', self.get_base_url_path() + 'admin/'),
]
def get_saml_sp_metadata_url(self):
return self.get_base_url_path() + 'saml/metadata'
class Passerelle(ServiceBase):
class Meta:
verbose_name = _('Passerelle')
verbose_name_plural = _('Passerelle')
ordering = ['title']
class Extra:
service_id = 'passerelle'
service_label = _('Passerelle')
service_default_slug = 'passerelle'
def get_admin_zones(self):
return [
Zone(self.title, 'webservices', self.get_base_url_path() + 'manage/')
]
def get_saml_sp_metadata_url(self):
return self.get_base_url_path() + 'accounts/mellon/metadata/'
def get_backoffice_menu_url(self):
return self.get_base_url_path() + 'manage/menu.json'
class Combo(ServiceBase):
class Meta:
verbose_name = _('Combo Portal')
verbose_name_plural = _('Combo Portals')
ordering = ['title']
class Extra:
service_id = 'combo'
service_label = _('Combo')
service_default_slug = 'portal'
def get_admin_zones(self):
return [
Zone(self.title, 'portal', self.get_base_url_path() + 'manage/')
]
def get_saml_sp_metadata_url(self):
return self.get_base_url_path() + 'accounts/mellon/metadata/'
def get_backoffice_menu_url(self):
return self.get_base_url_path() + 'manage/menu.json'
class Fargo(ServiceBase):
class Meta:
verbose_name = _('Fargo document box')
verbose_name_plural = _('Fargo document box')
ordering = ['title']
class Extra:
service_id = 'fargo'
service_label = _('Fargo')
service_default_slug = 'porte-doc'
def get_admin_zones(self):
return [
Zone(self.title, 'document-box', self.get_base_url_path() + 'admin/')
]
def get_saml_sp_metadata_url(self):
return self.get_base_url_path() + 'accounts/mellon/metadata/'
class Welco(ServiceBase):
class Meta:
verbose_name = _('Welco Multichannel Home')
ordering = ['title']
class Extra:
service_id = 'welco'
service_label = 'Welco'
service_default_slug = 'accueil'
def get_admin_zones(self):
return [
Zone(self.title, 'multichannel-guichet', self.get_base_url_path() + 'admin/')
]
def get_saml_sp_metadata_url(self):
return self.get_base_url_path() + 'accounts/mellon/metadata/'
def get_backoffice_menu_url(self):
return self.get_base_url_path() + 'menu.json'
class MandayeJS(ServiceBase):
site_app = models.CharField(_('Site Application'), max_length=128,
choices = APP_SETTINGS_CLASSES,
default = DEFAULT_APP_SETTINGS
)
class Meta:
verbose_name = _('Authentication Reverse Proxy')
ordering = ['title']
class Extra:
service_id = 'mandayejs'
service_label = _('mandayejs')
service_default_slug = 'mandayejs'
@classmethod
def is_enabled(cls):
return getattr(settings, 'MANDAYEJS_ENABLED', False)
def get_admin_zones(self):
return [
Zone(self.title, 'mandayejs', self.get_base_url_path() + '_mandaye/admin/')
]
def get_saml_sp_metadata_url(self):
return self.get_base_url_path()+ '_mandaye/accounts/mellon/metadata/'
class Chrono(ServiceBase):
class Meta:
verbose_name = _('Chrono Agendas')
verbose_name_plural = _('Chrono Agendas')
ordering = ['title']
class Extra:
service_id = 'chrono'
service_label = _('Chrono')
service_default_slug = 'agendas'
def get_admin_zones(self):
return [
Zone(self.title, 'calendar', self.get_base_url_path() + 'manage/')
]
def get_saml_sp_metadata_url(self):
return self.get_base_url_path() + 'accounts/mellon/metadata/'
def get_backoffice_menu_url(self):
return self.get_base_url_path() + 'manage/menu.json'
class Hobo(ServiceBase):
class Meta:
verbose_name = _('Hobo Deployment Server')
verbose_name_plural = _('Hobo Deployment Servers')
ordering = ['title']
class Extra:
service_id = 'hobo'
service_label = _('Deployment Server')
service_default_slug = 'hobo'
def get_admin_zones(self):
return [
Zone(self.title, 'hobo', self.get_base_url_path())
]
def get_saml_sp_metadata_url(self):
return self.get_base_url_path() + 'accounts/mellon/metadata/'
def get_backoffice_menu_url(self):
return None
class Piwik(ServiceBase):
admin_emails = models.TextField(_('Admins Emails'))
class Meta:
verbose_name = _('Piwik')
verbose_name_plural = _('Piwik')
ordering = ['title']
class Extra:
service_id = 'piwik'
service_label = _('piwik')
service_default_slug = 'piwik'
@classmethod
def is_enabled(cls):
return getattr(settings, 'PIWIK_ENABLED', False)
def get_admin_zones(self):
return [
Zone(self.title, 'piwik', self.get_base_url_path())
]
class Corbo(ServiceBase):
class Meta:
verbose_name = _('Announces Management')
verbose_name_plural = _('Announces Management')
ordering = ['title']
class Extra:
service_id = 'corbo'
service_label = _('Corbo')
service_default_slug = 'announces'
@classmethod
def is_enabled(cls):
return getattr(settings, 'CORBO_ENABLED', False)
def get_admin_zones(self):
return [
Zone(self.title, 'corbo', self.get_base_url_path() + 'admin/')
]
def get_saml_sp_metadata_url(self):
return self.get_base_url_path()+ 'accounts/mellon/metadata/'
def get_backoffice_menu_url(self):
return self.get_base_url_path() + 'manage/menu.json'
class BiJoe(ServiceBase):
class Meta:
verbose_name = _('Statistics')
verbose_name_plural = _('Statistics')
ordering = ['title']
class Extra:
service_id = 'bijoe'
service_label = _('Statistics')
service_default_slug = 'statistics'
def get_admin_zones(self):
return [
Zone(self.title, 'bijoe', self.get_base_url_path())
]
def get_saml_sp_metadata_url(self):
return self.get_base_url_path() + 'accounts/mellon/metadata/'
def get_backoffice_menu_url(self):
return self.get_base_url_path() + 'manage/menu.json'
AVAILABLE_SERVICES = [Authentic, Wcs, Passerelle, Combo, Fargo, Welco,
MandayeJS, Chrono, Piwik, Corbo, BiJoe, Hobo]