
233 lines
9.1 KiB

import datetime
import hashlib
import json
import logging
import os
import threading
import urllib.parse
import requests
from django.conf import settings
from django.core.cache import cache
from django.template import Template
from django.utils.encoding import smart_bytes
from django.utils.http import urlencode
from hobo.scrutiny.wsgi.middleware import VersionMiddleware
logger = logging.getLogger('hobo')
def statics_hash(request):
versions = VersionMiddleware.get_packages_version()
statics_hash = hashlib.md5(smart_bytes(repr(sorted(versions.items()))))
counter_filename = getattr(settings, 'STATICS_HASH_COUNTER', None)
if counter_filename:
except OSError:
return {'statics_hash': statics_hash.hexdigest()}
def template_vars(request):
template_vars = getattr(settings, 'TEMPLATE_VARS', {})
if 'theme_base_filename' in template_vars:
del template_vars['theme_base_filename']
return template_vars
class RemoteTemplate(object):
PAGE_CACHE_KEY = 'page-cache'
def __init__(self, source):
self.source = source
def get_cached_item(self):
item = cache.get(self.PAGE_CACHE_KEY)
if self.source != '404' and item:
# page_cache is a dict redirect_url -> page content, get the best
# match.
page_cache, expiry_time = item
selected_cache_page = None
for page_redirect_url in sorted(page_cache.keys(), key=lambda x: len(x)):
if selected_cache_page is None:
selected_cache_page = page_redirect_url
if not self.source.startswith(page_redirect_url):
if len(page_redirect_url) > len(selected_cache_page):
selected_cache_page = page_redirect_url
item = (page_cache[selected_cache_page], expiry_time)
item = cache.get(self.cache_key)
return item
def cache_key(self):
return hashlib.md5(
urllib.parse.urlunparse(urllib.parse.urlparse(self.source)[:3] + ('', '', '')).encode('ascii')
def get_template(self):
item = self.get_cached_item()
if 'hobo.environment' in settings.INSTALLED_APPS:
from hobo.deploy.utils import get_hobo_json
from hobo.multitenant.settings_loaders import TemplateVars
context = TemplateVars.get_hobo_json_variables(get_hobo_json())
if 'portal_url' not in context:
# serve a minimalistic template if no portal have been
# deployed.
return Template('<html><body>{% block content %}{% endblock %}</body></html>')
self.theme_skeleton_url = context['portal_url'] + '__skeleton__/'
self.theme_skeleton_url = settings.THEME_SKELETON_URL
if item is None:
template_body = self.update_content(in_thread=False)
if template_body is None:
raise Exception('Failed to retrieve theme')
template_body, expiry_time = item
if expiry_time < datetime.datetime.now():
# stale value, put it back into the cache for other consumers and
# update the content in a different thread
return Template(template_body)
def update_content(self, in_thread=True):
r = requests.get(self.theme_skeleton_url, params={'source': self.source})
if r.status_code != 200:
logger.error('failed to retrieve theme (status code: %s)', r.status_code)
return None
if r.headers.get('X-Combo-Skeleton-Pages'):
# X-Combo-Skeleton-Pages header is a dict (page_id -> redirect_url),
# it is use to create page cache.
self.combo_skeleton_pages = json.loads(r.headers.get('X-Combo-Skeleton-Pages'))
if in_thread:
return r.text
def update_all_pages_cache(self):
# always cache root
root_url = urllib.parse.urlunparse(urllib.parse.urlparse(self.source)[:2] + ('/', '', '', ''))
if not root_url in self.combo_skeleton_pages.values():
self.combo_skeleton_pages['__root'] = root_url
page_cache = {}
for page_id, page_redirect_url in self.combo_skeleton_pages.items():
r = requests.get(self.theme_skeleton_url, params={'source': page_redirect_url})
if r.status_code != 200:
# abort
page_cache[page_redirect_url] = r.text
expiry_time = datetime.datetime.now() + datetime.timedelta(seconds=CACHE_REFRESH_TIMEOUT)
self.PAGE_CACHE_KEY, (page_cache, expiry_time), 2592000
) # bypass cache level expiration time
def cache(self, template_body):
expiry_time = datetime.datetime.now() + datetime.timedelta(seconds=CACHE_REFRESH_TIMEOUT)
cache.set(self.cache_key, (template_body, expiry_time), 2592000) # bypass cache level expiration time
def theme_base(request):
# this context processor adds three variables to context:
# * theme_base will get evaluated when encountered in a template and return
# a template string downloaded from settings.THEME_SKELETON_URL.
# * theme_404 is identical but dedicated to 404 error pages.
# Both those variables are to be used by "slave" sites, authentic,
# w.c.s., etc.
# * theme_base_filename will return the filename to be used as base
# template by combo, it will be taken from template_vars and will
# default to theme.html.
source = request.build_absolute_uri()
return {
'theme_base': RemoteTemplate(source).get_template,
'theme_404': RemoteTemplate('404').get_template,
'theme_base_filename': template_vars(request).get('theme_base_filename') or 'theme.html',
def portal_agent_url(request):
def get_portal_agent_url():
portal_agents = []
if 'authentic2' in settings.INSTALLED_APPS:
portal_agents = [
x for x in settings.KNOWN_SERVICES.get('combo', {}).values() if x.get('is-portal-agent')
if len(portal_agents) > 1 and request.user and request.user.is_authenticated and request.user.ou_id:
ou_slug = request.user.ou.slug
for portal_agent in portal_agents:
variables = portal_agent.get('variables') or {}
if variables.get('ou-slug') == ou_slug:
return portal_agent['url']
variables = getattr(settings, 'TEMPLATE_VARS', {})
return variables.get(settings.HOBO_MANAGER_HOMEPAGE_URL_VAR)
return {'manager_homepage_url': get_portal_agent_url}
def hobo_json(request):
# this context processor gives Hobo itself variables that would be defined
# from settings loaders based on hobo.json.
from hobo.deploy.utils import get_hobo_json
from hobo.multitenant.settings_loaders import TemplateVars
context = TemplateVars.get_hobo_json_variables(get_hobo_json())
context['manager_homepage_url'] = context.get(settings.HOBO_MANAGER_HOMEPAGE_URL_VAR)
return context
def _authentic2_get_next_url(request):
from authentic2.utils.misc import get_next_url
return get_next_url(request.GET)
def user_urls(request):
# ugly, but necessary..
if 'wcs.qommon' in settings.INSTALLED_APPS:
from quixote import get_publisher
pub = get_publisher()
template_vars = (pub and pub.get_substitution_variables()) or {}
logout_url = '/logout?'
template_vars = getattr(settings, 'TEMPLATE_VARS', {})
logout_url = '/logout/?'
full_path = request.get_full_path()
if 'authentic2' in settings.INSTALLED_APPS:
full_path = _authentic2_get_next_url(request) or full_path
query_string = urlencode({'next': full_path})
context = {
'login_url': '/login/?' + query_string,
'logout_url': logout_url + urlencode({'next': '/'}),
absolute_uri = request.build_absolute_uri()
if 'idp_account_url' in template_vars:
context['account_url'] = template_vars['idp_account_url'] + '?' + urlencode({'next': absolute_uri})
if 'idp_registration_url' in template_vars:
context['registration_url'] = (
template_vars['idp_registration_url'] + '?' + urlencode({'next': absolute_uri})
if 'authentic2' in settings.INSTALLED_APPS:
if request.path == '/login/':
context['login_url'] = '#'
context['registration_url'] = '/register/?' + request.GET.urlencode()
if request.path == '/register/':
context['login_url'] = '/login/?' + request.GET.urlencode()
context['registration_url'] = '#'
return context