269 lines
11 KiB
Python
269 lines
11 KiB
Python
import datetime
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import threading
|
|
import urllib.parse
|
|
|
|
import requests
|
|
from django.conf import settings
|
|
from django.core.cache import cache
|
|
from django.template import Template
|
|
from django.utils.encoding import smart_bytes
|
|
from django.utils.http import urlencode
|
|
from django.utils.translation import get_language, get_supported_language_variant
|
|
|
|
from hobo.scrutiny.wsgi.middleware import VersionMiddleware
|
|
|
|
logger = logging.getLogger('hobo')
|
|
|
|
CACHE_REFRESH_TIMEOUT = 300
|
|
|
|
|
|
def statics_hash(request):
|
|
versions = VersionMiddleware.get_packages_version()
|
|
statics_hash = hashlib.md5(smart_bytes(repr(sorted(versions.items()))))
|
|
try:
|
|
counter_filename = getattr(settings, 'STATICS_HASH_COUNTER', None)
|
|
if counter_filename:
|
|
statics_hash.update(str(os.stat(counter_filename).st_atime))
|
|
except OSError:
|
|
pass
|
|
return {'statics_hash': statics_hash.hexdigest()}
|
|
|
|
|
|
def template_vars(request):
|
|
template_vars = getattr(settings, 'TEMPLATE_VARS', {})
|
|
if 'theme_base_filename' in template_vars:
|
|
del template_vars['theme_base_filename']
|
|
return template_vars
|
|
|
|
|
|
class RemoteTemplate:
|
|
PAGE_CACHE_KEY = 'page-cache'
|
|
|
|
def __init__(self, source):
|
|
self.source = source
|
|
self.set_language()
|
|
|
|
def set_language(self, language=None):
|
|
try:
|
|
self.language_code = get_supported_language_variant(language or get_language())
|
|
except LookupError:
|
|
self.language_code = settings.LANGUAGES[0][0]
|
|
|
|
def get_cached_item(self):
|
|
item = cache.get(self.get_page_cache_key())
|
|
if self.source != '404' and item:
|
|
# page_cache is a dict redirect_url -> page content, get the best
|
|
# match.
|
|
page_cache, expiry_time = item
|
|
selected_cache_page = None
|
|
for page_redirect_url in sorted(page_cache.keys(), key=len):
|
|
if selected_cache_page is None:
|
|
selected_cache_page = page_redirect_url
|
|
continue
|
|
if not self.source.startswith(page_redirect_url):
|
|
continue
|
|
if len(page_redirect_url) > len(selected_cache_page):
|
|
selected_cache_page = page_redirect_url
|
|
item = (page_cache[selected_cache_page], expiry_time)
|
|
else:
|
|
item = cache.get(self.cache_key)
|
|
return item
|
|
|
|
def get_page_cache_key(self):
|
|
return self.PAGE_CACHE_KEY + '-' + self.language_code
|
|
|
|
@property
|
|
def cache_key(self):
|
|
return hashlib.md5(
|
|
urllib.parse.urlunparse(urllib.parse.urlparse(self.source)[:3] + ('', '', '')).encode('ascii')
|
|
+ self.language_code.encode('ascii')
|
|
).hexdigest()
|
|
|
|
def get_template(self):
|
|
item = self.get_cached_item()
|
|
if 'hobo.environment' in settings.INSTALLED_APPS:
|
|
from hobo.deploy.utils import get_hobo_json
|
|
from hobo.multitenant.settings_loaders import TemplateVars
|
|
|
|
context = TemplateVars.get_hobo_json_variables(get_hobo_json())
|
|
if 'portal_url' not in context:
|
|
# serve a minimalistic template if no portal have been
|
|
# deployed.
|
|
return Template('<html><body>{% block content %}{% endblock %}</body></html>')
|
|
self.theme_skeleton_url = context['portal_url'] + '__skeleton__/'
|
|
elif not settings.THEME_SKELETON_URL:
|
|
return Template('<html><body>{% block content %}{% endblock %}</body></html>')
|
|
else:
|
|
self.theme_skeleton_url = settings.THEME_SKELETON_URL
|
|
if item is None:
|
|
template_body = self.update_content(in_thread=False)
|
|
if template_body is None:
|
|
raise Exception('Failed to retrieve theme')
|
|
else:
|
|
template_body, expiry_time = item
|
|
if expiry_time < datetime.datetime.now():
|
|
# stale value, put it back into the cache for other consumers and
|
|
# update the content in a different thread
|
|
self.cache(template_body)
|
|
threading.Thread(target=self.update_content).start()
|
|
return Template(template_body)
|
|
|
|
def update_content(self, in_thread=True):
|
|
r = requests.get(
|
|
self.theme_skeleton_url,
|
|
params={'source': self.source},
|
|
headers={'Accept-Language': self.language_code},
|
|
timeout=10,
|
|
)
|
|
if r.status_code != 200:
|
|
logger.error('failed to retrieve theme (status code: %s)', r.status_code)
|
|
return None
|
|
self.cache(r.text)
|
|
if r.headers.get('X-Combo-Skeleton-Pages'):
|
|
# X-Combo-Skeleton-Pages header is a dict (page_id -> redirect_url),
|
|
# it is use to create page cache.
|
|
self.combo_skeleton_pages = json.loads(r.headers.get('X-Combo-Skeleton-Pages'))
|
|
if in_thread:
|
|
self.update_all_pages_cache()
|
|
else:
|
|
threading.Thread(target=self.update_all_pages_cache).start()
|
|
return r.text
|
|
|
|
def update_all_pages_cache(self):
|
|
for lang_code, _ in settings.LANGUAGES:
|
|
self.set_language(lang_code)
|
|
# always cache root
|
|
root_url = urllib.parse.urlunparse(urllib.parse.urlparse(self.source)[:2] + ('/', '', '', ''))
|
|
if root_url not in self.combo_skeleton_pages.values():
|
|
self.combo_skeleton_pages['__root'] = root_url
|
|
|
|
page_cache = {}
|
|
for page_redirect_url in self.combo_skeleton_pages.values():
|
|
r = requests.get(
|
|
self.theme_skeleton_url,
|
|
params={'source': page_redirect_url},
|
|
headers={'Accept-Language': lang_code},
|
|
timeout=10,
|
|
)
|
|
if r.status_code != 200:
|
|
# abort
|
|
return
|
|
page_cache[page_redirect_url] = r.text
|
|
|
|
expiry_time = datetime.datetime.now() + datetime.timedelta(seconds=CACHE_REFRESH_TIMEOUT)
|
|
cache.set(
|
|
self.get_page_cache_key(), (page_cache, expiry_time), 2592000
|
|
) # bypass cache level expiration time
|
|
|
|
def cache(self, template_body):
|
|
expiry_time = datetime.datetime.now() + datetime.timedelta(seconds=CACHE_REFRESH_TIMEOUT)
|
|
cache.set(self.cache_key, (template_body, expiry_time), 2592000) # bypass cache level expiration time
|
|
|
|
|
|
def theme_base(request):
|
|
# this context processor adds three variables to context:
|
|
#
|
|
# * theme_base will get evaluated when encountered in a template and return
|
|
# a template string downloaded from settings.THEME_SKELETON_URL.
|
|
#
|
|
# * theme_404 is identical but dedicated to 404 error pages.
|
|
#
|
|
# Both those variables are to be used by "slave" sites, authentic,
|
|
# w.c.s., etc.
|
|
#
|
|
# * theme_base_filename will return the filename to be used as base
|
|
# template by combo, it will be taken from template_vars and will
|
|
# default to theme.html.
|
|
source = request.build_absolute_uri()
|
|
return {
|
|
'theme_base': RemoteTemplate(source).get_template,
|
|
'theme_404': RemoteTemplate('404').get_template,
|
|
'theme_base_filename': template_vars(request).get('theme_base_filename') or 'theme.html',
|
|
}
|
|
|
|
|
|
def portal_agent_url(request):
|
|
def get_portal_agent_url():
|
|
portal_agents = []
|
|
if 'authentic' in settings.PROJECT_NAME:
|
|
portal_agents = [
|
|
x for x in settings.KNOWN_SERVICES.get('combo', {}).values() if x.get('is-portal-agent')
|
|
]
|
|
if len(portal_agents) > 1 and request.user and request.user.is_authenticated and request.user.ou_id:
|
|
ou_slug = request.user.ou.slug
|
|
for portal_agent in portal_agents:
|
|
variables = portal_agent.get('variables') or {}
|
|
if variables.get('ou-slug') == ou_slug:
|
|
return portal_agent['url']
|
|
variables = getattr(settings, 'TEMPLATE_VARS', {})
|
|
return variables.get(settings.HOBO_MANAGER_HOMEPAGE_URL_VAR)
|
|
|
|
return {'manager_homepage_url': get_portal_agent_url}
|
|
|
|
|
|
def hobo_json(request):
|
|
# this context processor gives Hobo itself variables that would be defined
|
|
# from settings loaders based on hobo.json.
|
|
from hobo.deploy.utils import get_hobo_json
|
|
from hobo.multitenant.settings_loaders import TemplateVars
|
|
|
|
context = TemplateVars.get_hobo_json_variables(get_hobo_json())
|
|
context['manager_homepage_url'] = context.get(settings.HOBO_MANAGER_HOMEPAGE_URL_VAR)
|
|
return context
|
|
|
|
|
|
def _authentic2_get_next_url(request):
|
|
from authentic2.utils.misc import get_next_url
|
|
|
|
return get_next_url(request.GET)
|
|
|
|
|
|
def user_urls(request):
|
|
# ugly, but necessary..
|
|
if 'wcs.qommon' in settings.INSTALLED_APPS:
|
|
# noqa pylint: disable=import-error
|
|
from quixote import get_publisher
|
|
|
|
pub = get_publisher()
|
|
template_vars = (pub and pub.get_substitution_variables()) or {}
|
|
logout_url = '/logout?'
|
|
else:
|
|
template_vars = getattr(settings, 'TEMPLATE_VARS', {})
|
|
logout_url = '/logout/?'
|
|
full_path = request.get_full_path()
|
|
if 'authentic' in settings.PROJECT_NAME:
|
|
full_path = _authentic2_get_next_url(request) or full_path
|
|
query_string = urlencode({'next': full_path})
|
|
context = {
|
|
'login_url': '/login/?' + query_string,
|
|
'logout_url': logout_url + urlencode({'next': '/'}),
|
|
}
|
|
absolute_uri = request.build_absolute_uri()
|
|
if 'idp_account_url' in template_vars:
|
|
base_absolute_uri = urllib.parse.urlunparse(urllib.parse.urlparse(absolute_uri)[:3] + (None,) * 3)
|
|
if base_absolute_uri == template_vars['idp_account_url']:
|
|
context['account_url'] = absolute_uri
|
|
else:
|
|
context['account_url'] = (
|
|
template_vars['idp_account_url'] + '?' + urlencode({'next': absolute_uri})
|
|
)
|
|
if 'idp_registration_url' in template_vars:
|
|
context['registration_url'] = (
|
|
template_vars['idp_registration_url'] + '?' + urlencode({'next': absolute_uri})
|
|
)
|
|
if 'authentic' in settings.PROJECT_NAME:
|
|
if request.path == '/login/':
|
|
context['login_url'] = '#'
|
|
context['registration_url'] = '/register/?' + request.GET.urlencode()
|
|
if request.path == '/register/':
|
|
context['login_url'] = '/login/?' + request.GET.urlencode()
|
|
context['registration_url'] = '#'
|
|
if request.path == '/password/reset/':
|
|
context['login_url'] = '/login/?' + request.GET.urlencode()
|
|
context['registration_url'] = '/register/?' + request.GET.urlencode()
|
|
return context
|