hobo/hobo/multitenant/settings_loaders.py

414 lines
16 KiB
Python

import os
import json
import hashlib
from importlib import import_module
from django.conf import settings
from django.utils.encoding import smart_bytes
from django.utils.six.moves.urllib import parse as urlparse
from hobo.theme.utils import get_theme
class FileBaseSettingsLoader(object):
"""
Base middleware class for loading settings from FILENAME.
Child classes MUST override update_settings_from_path().
"""
FILENAME = None
def get_new_time(self, tenant):
tenant_dir = os.path.join(settings.TENANT_BASE, tenant.domain_url)
path = os.path.join(tenant_dir, self.FILENAME)
try:
return os.stat(path).st_mtime
except OSError:
return None
def update_settings(self, tenant_settings, tenant):
tenant_dir = os.path.join(settings.TENANT_BASE, tenant.domain_url)
path = os.path.join(tenant_dir, self.FILENAME)
if os.path.exists(path):
self.update_settings_from_path(tenant_settings, path)
def update_settings_from_path(self, tenant_settings, path):
raise NotImplemented
class SettingsDictUpdateMixin(object):
def do_update(self, tenant_settings, key, value):
old_value = getattr(tenant_settings, key, {})
new_value = old_value.copy()
new_value.update(value)
setattr(tenant_settings, key, new_value)
def do_extend(self, tenant_settings, key, value):
new_value = list(getattr(tenant_settings, key, []))
new_value.extend(value)
setattr(tenant_settings, key, new_value)
def handle_settings(self, tenant_settings, new_settings):
for key in new_settings:
if '.' in key:
real_key, op = key.rsplit('.', 1)
method = getattr(self, 'do_%s' % op)
method(tenant_settings, real_key, new_settings[key])
else:
setattr(tenant_settings, key, new_settings[key])
class KnownServices(FileBaseSettingsLoader):
FILENAME = 'hobo.json'
@classmethod
def shared_secret(cls, secret1, secret2):
secret1 = hashlib.sha256(secret1.encode('ascii')).hexdigest()
secret2 = hashlib.sha256(secret2.encode('ascii')).hexdigest()
# rstrip('L') for py2/3 compatibility, as py2 formats number as 0x...L, and py3 as 0x...
return hex(int(secret1, 16) ^ int(secret2, 16))[2:].rstrip('L')
def update_settings_from_path(self, tenant_settings, path):
known_services = {}
with open(path) as f:
hobo_json = json.load(f)
services = hobo_json.get('services')
this = [s for s in services if s.get('this')][0]
base_url = this['base_url']
orig = urlparse.urlparse(base_url).netloc.split(':')[0]
secret = this['secret_key']
for service in services:
service_id = service.get('service-id')
url = service.get('base_url')
verif_orig = urlparse.urlparse(url).netloc.split(':')[0]
service_data = {
'url': url,
'backoffice-menu-url': service.get('backoffice-menu-url'),
'title': service.get('title'),
'orig': orig,
'verif_orig': verif_orig,
'variables': service.get('variables'),
'secondary': service.get('secondary'),
'template_name': service.get('template_name'),
}
if service.get('secondary') and (service.get('variables') and
service.get('variables').get('ou-label')):
# for secondary services include collectivity in label
service_data['title'] = '%s (%s)' % (
service.get('title'),
service.get('variables').get('ou-label'))
if service.get('service-id') == 'combo' and service.get('template_name') == 'portal-agent':
service_data['is-portal-agent'] = True
# compute a symmetric shared secret using XOR
# secrets MUST be hexadecimal numbers of the same even length
if not service.get('this'):
service_data['secret'] = (self.shared_secret(secret, service['secret_key']) if
'secret_key' in service else None)
if service_id in known_services:
known_services[service_id][service.get('slug')] = service_data
else:
known_services[service_id] = {
service.get('slug'): service_data
}
tenant_settings.KNOWN_SERVICES = known_services
SETTING_PREFIX = 'SETTING_'
class TemplateVars(FileBaseSettingsLoader):
FILENAME = 'hobo.json'
@classmethod
def get_hobo_json_variables(cls, hobo_json):
variables = hobo_json.get('variables', {})
variables['is_portal_agent'] = False
for service in hobo_json.get('services'):
if not service.get('slug'):
continue
variables['%s_url' % service.get('slug').replace('-','_')] = service.get('base_url')
if service.get('service-id') == 'combo' and not service.get('secondary'):
if service.get('template_name') == 'portal-agent':
variables['portal_agent_url'] = service.get('base_url')
variables['portal_agent_title'] = service.get('title')
if service.get('template_name') == 'portal-user':
variables['portal_user_url'] = service.get('base_url')
variables['portal_user_title'] = service.get('title')
if service.get('service-id') == 'authentic':
variables['idp_url'] = service.get('base_url')
variables['idp_api_url'] = service.get('base_url') + 'api/'
variables['idp_account_url'] = service.get('base_url') + 'accounts/'
variables['idp_registration_url'] = service.get('base_url') + 'accounts/register/'
if not service.get('this'):
continue
variables.update(service.get('variables') or {})
variables['site_title'] = service.get('title')
if service.get('template_name') == 'portal-agent':
variables['is_portal_agent'] = True
if getattr(settings, 'HOBO_MANAGER_HOMEPAGE_TITLE_VAR', None):
variables['manager_homepage_title'] = \
variables.get(settings.HOBO_MANAGER_HOMEPAGE_TITLE_VAR)
return variables
def update_settings_from_path(self, tenant_settings, path):
with open(path) as f:
hobo_json = json.load(f)
variables = self.get_hobo_json_variables(hobo_json)
if hasattr(tenant_settings.default_settings, 'TEMPLATE_VARS') and \
tenant_settings.default_settings.TEMPLATE_VARS:
tenant_settings.TEMPLATE_VARS = tenant_settings.default_settings.TEMPLATE_VARS.copy()
else:
tenant_settings.TEMPLATE_VARS = {}
for key, value in variables.items():
# prevent mixing settings and template vars
if not key.startswith(SETTING_PREFIX):
tenant_settings.TEMPLATE_VARS[key] = value
if variables.get('default_from_email'):
if variables.get('global_title'):
tenant_settings.DEFAULT_FROM_EMAIL = '"%s" <%s>' % (
variables['global_title'].replace('"', ' '), variables['default_from_email'])
else:
tenant_settings.DEFAULT_FROM_EMAIL = variables['default_from_email']
tenant_settings.USER_PROFILE_CONFIG = hobo_json.get('profile')
class SettingsVars(SettingsDictUpdateMixin, FileBaseSettingsLoader):
FILENAME = 'hobo.json'
@classmethod
def get_hobo_json_variables(cls, hobo_json):
variables = hobo_json.get('variables', {})
for service in hobo_json.get('services'):
if service.get('this'):
variables.update(service.get('variables') or {})
break
return variables
def update_settings_from_path(self, tenant_settings, path):
with open(path) as f:
hobo_json = json.load(f)
variables = self.get_hobo_json_variables(hobo_json)
settings_dict = {}
for key, value in variables.items():
if key.startswith(SETTING_PREFIX):
settings_dict[key[len(SETTING_PREFIX):]] = value
self.handle_settings(tenant_settings, settings_dict)
class CORSSettings(FileBaseSettingsLoader):
FILENAME = 'hobo.json'
def update_settings_from_path(self, tenant_settings, path):
whitelist = getattr(tenant_settings, 'CORS_ORIGIN_WHITELIST', [])
with open(path) as f:
hobo_json = json.load(f)
for service in hobo_json.get('services', []):
base_url = service.get('base_url')
if not base_url:
continue
base_url = urlparse.urlparse(base_url)
origin = '%s://%s' % (base_url.scheme, base_url.netloc)
if not origin in whitelist:
whitelist.append(origin)
tenant_settings.CORS_ORIGIN_WHITELIST = whitelist
class ThemeSettings(SettingsDictUpdateMixin):
def get_new_time(self, tenant):
return 0
def update_settings(self, tenant_settings, tenant):
if not hasattr(tenant_settings, 'TEMPLATE_VARS'):
return
theme_id = tenant_settings.TEMPLATE_VARS.get('theme')
if not theme_id:
return
theme = get_theme(theme_id)
if not theme:
return
tenant_settings.THEME_INFO = theme
module_name = os.environ.get('DJANGO_SETTINGS_MODULE', '').split('.')[0]
module_settings = theme.get('settings', {}).get(module_name)
if not module_settings:
return
self.handle_settings(tenant_settings, module_settings)
class SharedThemeSettings(FileBaseSettingsLoader):
FILENAME = 'hobo.json'
def update_settings_from_path(self, tenant_settings, path):
with open(path) as f:
hobo_json = json.load(f)
for service in hobo_json.get('services', []):
if service.get('service-id') != 'combo':
continue
if service.get('template_name') != 'portal-user':
continue
if service.get('secondary'):
continue
tenant_settings.THEME_SKELETON_URL = '%s__skeleton__/' % (
service.get('base_url'))
break
class CookieNames(object):
def get_new_time(self, tenant):
return 0
def update_settings(self, tenant_settings, tenant):
domain_hash = hashlib.md5(smart_bytes(tenant.domain_url)).hexdigest()[:6]
tenant_settings.CSRF_COOKIE_NAME = 'csrftoken-%s' % domain_hash
tenant_settings.SESSION_COOKIE_NAME = 'sessionid-%s' % domain_hash
# unique but common name for authentic opened session cookie name
if getattr(tenant_settings, 'TEMPLATE_VARS', None):
idp_url = tenant_settings.TEMPLATE_VARS.get('idp_url')
if idp_url:
idp_hash = hashlib.md5(smart_bytes(idp_url)).hexdigest()[:6]
cookie_name = 'a2-opened-session-%s' % idp_hash
tenant_settings.A2_OPENED_SESSION_COOKIE_NAME = cookie_name
tenant_settings.MELLON_OPENED_SESSION_COOKIE_NAME = cookie_name
#
# Specific loaders
#
class Authentic(FileBaseSettingsLoader):
FILENAME = 'hobo.json'
def update_settings(self, tenant_settings, tenant):
# update SAML certicates and keys
tenant_dir = os.path.join(settings.TENANT_BASE, tenant.domain_url)
saml_crt = os.path.join(tenant_dir, 'saml.crt')
saml_key = os.path.join(tenant_dir, 'saml.key')
if os.path.exists(saml_crt) and os.path.exists(saml_key):
tenant_settings.A2_IDP_SAML2_ENABLE = True
tenant_settings.A2_IDP_SAML2_SIGNATURE_PUBLIC_KEY = open(saml_crt).read()
tenant_settings.A2_IDP_SAML2_SIGNATURE_PRIVATE_KEY = open(saml_key).read()
if not getattr(tenant_settings, 'A2_IDP_OIDC_JWKSET', None):
from jwcrypto import jwk
jwkkey = jwk.JWK.from_pem(
tenant_settings.A2_IDP_SAML2_SIGNATURE_PRIVATE_KEY)
jwkset = jwk.JWKSet()
jwkset['keys'].add(jwkkey)
tenant_settings.A2_IDP_OIDC_JWKSET = json.loads(jwkset.export())
else:
tenant_settings.A2_IDP_SAML2_ENABLE = False
# then other things
tenant_settings.A2_OPENED_SESSION_COOKIE_DOMAIN = 'parent'
tenant_settings.A2_EMAIL_IS_UNIQUE = True
tenant_settings.A2_REGISTRATION_EMAIL_IS_UNIQUE = True
tenant_settings.A2_SET_RANDOM_PASSWORD_ON_RESET = False
path = os.path.join(tenant_dir, self.FILENAME)
if os.path.exists(path):
self.update_settings_from_path(tenant_settings, path)
def update_settings_from_path(self, tenant_settings, path):
# profile fields
with open(path) as f:
hobo_json = json.load(f)
fields = hobo_json.get('profile', {}).get('fields')
if fields:
fields.sort(key=lambda x: x.get('order'))
fields = [x for x in fields if not x['disabled']]
tenant_settings.A2_PROFILE_FIELDS = [x['name'] for x in fields]
tenant_settings.A2_REQUIRED_FIELDS = [
x['name'] for x in fields if x['required']]
tenant_settings.A2_REGISTRATION_FIELDS = [
x['name'] for x in fields if x['asked_on_registration']]
for service in hobo_json.get('services', []):
if service.get('service-id') != 'combo':
continue
if service.get('template_name') != 'portal-user':
continue
tenant_settings.A2_HOMEPAGE_URL = service.get('base_url')
class Mellon(FileBaseSettingsLoader):
FILENAME = 'sp-saml.crt'
def update_settings(self, tenant_settings, tenant):
# set SAML certicates and keys
tenant_dir = os.path.join(settings.TENANT_BASE, tenant.domain_url)
saml_crt = os.path.join(tenant_dir, 'sp-saml.crt')
saml_key = os.path.join(tenant_dir, 'sp-saml.key')
if os.path.exists(saml_crt) and os.path.exists(saml_key):
tenant_settings.MELLON_PUBLIC_KEYS = [saml_crt]
tenant_settings.MELLON_PRIVATE_KEY = saml_key
class MandayeJS(FileBaseSettingsLoader):
FILENAME = 'hobo.json'
def update_settings(self, tenant_settings, tenant):
service = tenant.get_service()
if service.get('site_app', None):
tenant_settings.SITE_APP = service['site_app']
class SiteBaseUrl(object):
def get_new_time(self, tenant):
tenant_dir = os.path.join(settings.TENANT_BASE, tenant.domain_url)
for filename in ['unsecure', 'base_url']:
path = os.path.join(tenant_dir, filename)
try:
return os.stat(path).st_mtime
except OSError:
pass
return os.stat(tenant_dir).st_mtime
def update_settings(self, tenant_settings, tenant):
tenant_settings.SITE_BASE_URL = tenant.get_base_url()
#
# Generic loaders (not recommended)
#
class SettingsJSON(FileBaseSettingsLoader, SettingsDictUpdateMixin):
FILENAME = 'settings.json'
def update_settings_from_path(self, tenant_settings, path):
with open(path) as f:
self.handle_settings(tenant_settings, json.load(f))
class DictAdapter(dict):
'''Give dict interface to plain objects'''
def __init__(self, wrapped):
self.wrapped = wrapped
def __setitem__(self, key, value):
setattr(self.wrapped, key, value)
def __getitem__(self, key):
try:
return getattr(self.wrapped, key)
except AttributeError:
raise KeyError
class SettingsPy(FileBaseSettingsLoader):
FILENAME = 'settings.py'
def update_settings_from_path(self, tenant_settings, path):
execfile(path, DictAdapter(tenant_settings)) # yep, it's dangerous