wcs/wcs/qommon/publisher.py

990 lines
37 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import collections
from django.utils.six.moves import builtins
from django.utils.six.moves import configparser as ConfigParser
from django.utils.six.moves import cPickle
from django.utils.six.moves.urllib import parse as urllib
import datetime
from decimal import Decimal
import imp
import inspect
import os
import fcntl
import hashlib
import json
import locale
import random
import re
import socket
import sys
import time
import traceback
builtins.__dict__['N_'] = lambda x: x
import linecache
import requests
import xml.etree.ElementTree as ET
from django.conf import settings
from django.http import Http404
from django.utils import six
from django.utils import translation
from django.utils.encoding import force_text, force_bytes
from django.utils.six import StringIO
from django.utils.translation import gettext, ngettext
from quixote.publish import Publisher, get_request, get_response, get_publisher, redirect
from .http_request import HTTPRequest
from .http_response import HTTPResponse, AfterJob
from .cron import CronJob
from .substitution import Substitutions, CompatibilityNamesDict
from . import force_str
from . import errors
from . import template
import logging
import logging.handlers
from . import logger
from . import storage
class ImmediateRedirectException(Exception):
def __init__(self, location):
self.location = location
class QommonPublisher(Publisher, object):
APP_NAME = None
APP_DIR = None
DATA_DIR = None
ERROR_LOG = None
USE_LONG_TRACES = True
root_directory_class = None
backoffice_directory_class = None
admin_directory_class = None
session_manager_class = None
user_class = None
unpickler_class = None
after_login_url = ''
qommon_static_dir = 'static/'
qommon_admin_css = 'css/dc2/admin.css'
default_theme = 'default'
site_options = None
site_charset = 'utf-8'
default_configuration_path = None
auto_create_appdir = True
missing_appdir_redirect = None
use_sms_feature = True
gettext = lambda self, message: message
ngettext = lambda self, msgid1, msgid2, n: msgid1
app_dir = None
@property
def form_tokens_dir(self):
return os.path.join(self.app_dir, 'form_tokens')
def get_root_url(self):
if self.get_request():
return self.get_request().environ['SCRIPT_NAME'] + '/'
else:
return '/'
def get_application_static_files_root_url(self):
# Typical applications will have their static files under the same root
# directory as themselves; this method allows others to host them under
# some other path, or even on some totally different hostname.
return self.get_root_url()
def get_frontoffice_url(self, without_script_name=False):
frontoffice_url = get_cfg('misc', {}).get('frontoffice-url', None)
if frontoffice_url:
return frontoffice_url
req = self.get_request()
if req:
if without_script_name:
return '%s://%s' % (req.get_scheme(), req.get_server())
else:
return '%s://%s%s' % (req.get_scheme(), req.get_server(),
urllib.quote(req.environ.get('SCRIPT_NAME')))
return 'https://%s' % os.path.basename(get_publisher().app_dir)
def get_backoffice_url(self):
backoffice_url = get_cfg('misc', {}).get('backoffice-url', None)
if backoffice_url:
return backoffice_url
req = self.get_request()
if req:
return '%s://%s%s/backoffice' % (req.get_scheme(), req.get_server(),
urllib.quote(req.environ.get('SCRIPT_NAME')))
return 'https://%s/backoffice' % os.path.basename(self.app_dir)
def get_global_eval_dict(self):
from . import evalutils as utils
def compat_locals():
frame = inspect.getouterframes(inspect.currentframe())[1][0]
x = CompatibilityNamesDict(frame.f_locals)
return x
return {'datetime': datetime,
'Decimal': Decimal,
'locals': compat_locals,
'vars': compat_locals,
'random': random.SystemRandom(),
're': re,
'date': utils.date,
'days': utils.days,
'utils': utils,}
def format_publish_error(self, exc):
get_response().filter = {}
if isinstance(exc, errors.AccessError) and hasattr(exc, 'render'):
return exc.render()
return errors.format_publish_error(exc)
def finish_interrupted_request(self, exc):
# it is exactly the same as in the base class, but using our own
# HTTPResponse class
if not self.config.display_exceptions and exc.private_msg:
exc.private_msg = None # hide it
request = get_request()
original_response = request.response
request.response = HTTPResponse(status=exc.status_code)
if request.is_json():
request.response.set_content_type('application/json')
return json.dumps({'err': 1, 'err_class': exc.title, 'err_desc': exc.public_msg})
if isinstance(exc, errors.TraversalError):
raise Http404()
output = self.format_publish_error(exc)
self.session_manager.finish_successful_request()
return output
def _generate_plaintext_error(self, request, original_response,
exc_type, exc_value, tb, limit = None):
if exc_value and six.PY2:
# do not fail on exception strings using non-ascii chars
exc_value = force_text(str(exc_value), errors='ignore').encode('ascii')
if not self.USE_LONG_TRACES:
if not request:
# this happens when an exception is raised by an afterjob
request = HTTPRequest(None, {})
if not request.form:
request.form = {}
return Publisher._generate_plaintext_error(self, request,
original_response, exc_type, exc_value, tb)
error_file = StringIO()
if limit is None:
if hasattr(sys, 'tracebacklimit'):
limit = sys.tracebacklimit
print("Exception:", file=error_file)
print(" type = '%s', value = '%s'" % (exc_type, exc_value), file=error_file)
print('', file=error_file)
# format the traceback
print('Stack trace (most recent call first):', file=error_file)
while tb.tb_next:
tb = tb.tb_next
frame = tb.tb_frame
n = 0
while frame and (limit is None or n < limit):
function = frame.f_code.co_name
filename = frame.f_code.co_filename
exclineno = frame.f_lineno
locals = sorted(frame.f_locals.items(), key=lambda item: item[0])
print(' File "%s", line %s, in %s' % (filename, exclineno, function), file=error_file)
linecache.checkcache(filename)
for lineno in range(exclineno-2,exclineno+3):
line = linecache.getline(filename, lineno, frame.f_globals)
if line:
if lineno == exclineno:
print('>%5s %s' % (lineno, line.rstrip()), file=error_file)
else:
print(' %5s %s' % (lineno, line.rstrip()), file=error_file)
print('', file=error_file)
if locals:
print(" locals: ", file=error_file)
for key, value in locals:
print(" %s =" % key, end=' ', file=error_file)
try:
repr_value = repr(value)
if len(repr_value) > 10000:
repr_value = repr_value[:10000] + ' [...]'
print(repr_value, file=error_file)
except:
print("<ERROR WHILE PRINTING VALUE>", file=error_file)
print('', file=error_file)
frame = frame.f_back
n = n + 1
# include request and response dumps
if request:
error_file.write('\n')
error_file.write(request.dump())
error_file.write('\n')
return error_file.getvalue()
def finish_successful_request(self):
if not self.get_request().ignore_session:
self.session_manager.finish_successful_request()
def finish_failed_request(self):
# duplicate at lot from parent class, just to use our own HTTPResponse
request = get_request()
original_response = request.response
request.response = HTTPResponse()
(exc_type, exc_value, tb) = sys.exc_info()
if exc_type is NotImplementedError:
get_response().set_header('Content-Type', 'text/html') # set back content-type
return template.error_page(
_('This feature is not yet implemented.'),
error_title = _('Sorry'))
error_summary = traceback.format_exception_only(exc_type, exc_value)
error_summary = error_summary[0][0:-1] # de-listify and strip newline
error_summary = force_text(str(error_summary), errors='ignore').encode('ascii')
plain_error_msg = self._generate_plaintext_error(request,
original_response,
exc_type, exc_value,
tb)
if request.is_json():
request.response.set_content_type('application/json')
d = {'err': 1}
if self.config.display_exceptions:
d['err_class'] = exc_type.__name__
d['err_desc'] = error_summary
error_page = json.dumps(d)
else:
request.response.set_header('Content-Type', 'text/html')
if not self.config.display_exceptions:
# DISPLAY_EXCEPTIONS is false, so return the most
# secure (and cryptic) page.
error_page = self._generate_internal_error(request)
elif self.config.display_exceptions == 'html':
# Generate a spiffy HTML display using cgitb
error_page = self._generate_cgitb_error(request,
original_response, exc_type, exc_value, tb)
elif self.config.display_exceptions == 'text-in-html':
error_page = template.error_page(
_('The server encountered an internal error and was unable to complete your request.'),
error_title = _('Internal Server Error'),
exception = plain_error_msg)
else:
# Generate a plaintext page containing the traceback
request.response.set_header('Content-Type', 'text/plain')
error_page = plain_error_msg
try:
self.logger.log_internal_error(error_summary, plain_error_msg)
except socket.error:
# wilr happen if there is no mail server available and exceptions
# were configured to be mailed.
pass
try:
self.get_app_logger().error('internal server error')
except:
pass
if exc_type is SystemExit:
raise exc_type
request.response.set_status(500)
self.session_manager.finish_failed_request()
return error_page
def filter_output(self, request, output):
response = get_response()
if response.status_code == 304:
# clients don't like to receive content with a 304
return ''
if response.content_type != 'text/html':
return output
if not hasattr(response, 'filter') or not response.filter:
return output
try:
return self.render_response(output)
except:
# Something went wrong applying the template, maybe an error in it;
# fail the request properly, to get the error into the logs, the
# trace sent by email, etc.
return self.finish_failed_request()
def render_response(self, content):
if isinstance(content, template.QommonTemplateResponse):
content = template.render(content.templates, content.context)
return template.decorate(content, self.get_request().response)
def install_lang(self, request):
lang = request.language
if lang is None or not lang in [x[0] for x in settings.LANGUAGES]:
lang = 'en'
translation.activate(lang)
request.LANGUAGE_CODE = lang
self.gettext = translation.gettext
self.ngettext = translation.ngettext
def load_site_options(self):
self.site_options = ConfigParser.ConfigParser()
site_options_filename = os.path.join(self.app_dir, 'site-options.cfg')
if not os.path.exists(site_options_filename):
return
try:
self.site_options.read(site_options_filename)
except:
self.get_app_logger().error('failed to read site options file')
return
def has_site_option(self, option):
if self.site_options is None:
self.load_site_options()
try:
return (self.site_options.get('options', option) == 'true')
except ConfigParser.NoSectionError:
return False
except ConfigParser.NoOptionError:
return False
def get_site_option(self, option, section='options'):
if self.site_options is None:
self.load_site_options()
try:
return self.site_options.get(section, option)
except ConfigParser.NoSectionError:
return None
except ConfigParser.NoOptionError:
return None
def set_config(self, request = None):
self.reload_cfg()
self.site_options = None # reset at the beginning of a request
debug_cfg = self.cfg.get('debug', {})
self.logger.error_email = debug_cfg.get('error_email')
self.config.display_exceptions = debug_cfg.get('display_exceptions')
self.config.form_tokens = True
self.config.session_cookie_httponly = True
if request:
if request.get_scheme() == 'https':
self.config.session_cookie_secure = True
md5_hash = hashlib.md5()
md5_hash.update(force_bytes(self.app_dir))
self.config.session_cookie_name = self.APP_NAME + '-' + md5_hash.hexdigest()[:6]
self.config.session_cookie_path = '/'
if debug_cfg.get('logger', True):
self._app_logger = self.get_app_logger(force = True)
else:
class NullLogger(object):
def error(self, *args):
pass
def warn(self, *args):
pass
def info(self, *args):
pass
def debug(self, *args):
pass
self._app_logger = NullLogger()
def set_app_dir(self, request):
'''
Set the application directory, creating it if possible and authorized.
'''
self.site_options = None # reset at the beginning of a request
canonical_hostname = request.get_server(clean = False).lower().split(':')[0].rstrip('.')
script_name = request.get_header('SCRIPT_NAME', '').strip('/')
self.app_dir = os.path.join(self.APP_DIR, canonical_hostname)
if script_name:
script_name = script_name.replace('/', '+')
self.app_dir += '+' + script_name
if not os.path.exists(self.app_dir):
if not self.auto_create_appdir:
if self.missing_appdir_redirect:
raise ImmediateRedirectException(self.missing_appdir_redirect)
else:
raise Http404()
try:
os.makedirs(self.app_dir)
except OSError as e:
pass
try:
os.mkdir(self.form_tokens_dir)
except OSError: # already exists
pass
def initialize_app_dir(self):
'''If empty initialize the application directory with default
configuration. Returns True if initialization has been done.'''
if self.default_configuration_path and len(os.listdir(self.app_dir)) == 0:
# directory just got created, we should import some configuration...
if os.path.isabs(self.default_configuration_path):
path = self.default_configuration_path
else:
path = os.path.join(self.DATA_DIR, self.default_configuration_path)
self.cfg = self.import_cfg(path)
self.write_cfg()
return True
return False
def init_publish(self, request):
self.set_app_dir(request)
self.initialize_app_dir()
self.set_config(request)
request.language = self.get_site_language()
self.install_lang(request)
self.init_publisher_substitutions(request)
def init_publisher_substitutions(self, request):
self.substitutions = Substitutions()
self.substitutions.feed(self)
self.substitutions.feed(request)
for extra_source in self.extra_sources:
self.substitutions.feed(extra_source(self, request))
def get_site_language(self):
lang = self.cfg.get('language', {}).get('language', None)
if lang == 'HTTP':
request = self.get_request()
if not request:
return None
lang = None
accepted_languages = request.get_header('Accept-Language')
if accepted_languages:
accepted_languages = [x.strip() for x in accepted_languages.split(',')]
# forget about subtag and quality value
accepted_languages = [x.split('-')[0] for x in accepted_languages]
for l in accepted_languages:
if l in self.translations.keys():
return l
elif lang is None:
default_locale = locale.getdefaultlocale()
if default_locale and default_locale[0]:
lang = default_locale[0].split('_')[0]
return lang
@classmethod
def get_admin_module(cls):
return None
@classmethod
def get_backoffice_module(cls):
return None
def get_admin_root(self):
return self.root_directory.admin
def get_backoffice_root(self):
try:
return self.root_directory.backoffice
except AttributeError:
return None
ident_methods = None
def register_ident_methods(self):
try:
import lasso
except ImportError:
lasso = None
classes = []
if lasso:
from .ident import idp
classes.append(idp.IdPAuthMethod)
from .ident import franceconnect
classes.append(franceconnect.FCAuthMethod)
from .ident import password
classes.append(password.PasswordAuthMethod)
self.ident_methods = {}
for klass in classes:
self.ident_methods[klass.key] = klass
klass.register()
cronjobs = None
@classmethod
def register_cronjob(cls, cronjob):
if not cls.cronjobs:
cls.cronjobs = []
cls.cronjobs.append(cronjob)
def clean_nonces(self, delta=60, now=None):
nonce_dir = os.path.join(get_publisher().app_dir, 'nonces')
now = now or time.time()
if not os.path.exists(nonce_dir):
return
cleaning_lock_file = os.path.join(self.app_dir, 'cleaning_nonces.lock')
fd = os.open(cleaning_lock_file, os.O_RDONLY | os.O_CREAT, 0o666)
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
# lock is currently held, that is fine.
return
try:
for nonce in os.listdir(nonce_dir):
nonce_path = os.path.join(nonce_dir, nonce)
# we need delta so that os.utime in is_url_signed has time to update file timestamp
if delta < now - os.stat(nonce_path)[8]:
os.unlink(nonce_path)
finally:
os.close(fd)
def clean_sessions(self):
cleaning_lock_file = os.path.join(self.app_dir, 'cleaning_sessions.lock')
fd = os.open(cleaning_lock_file, os.O_RDONLY | os.O_CREAT, 0o666)
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
# lock is currently held, that is fine.
return
try:
manager = self.session_manager_class(session_class=self.session_class)
now = time.time()
last_usage_limit = now - 3*86400
creation_limit = now - 30*86400
for session_key in manager.keys():
try:
session = manager.get(session_key)
if session._access_time < last_usage_limit or session._creation_time < creation_limit:
del manager[session.id]
except (AttributeError, KeyError):
# the key is impossible to load or the session is malformed
# delete it anyway, but only catch KeyError (other errors could be useful)
try:
del manager[session_key]
except KeyError:
pass
continue
# also delete obsolete form_tokens that would have be missed when
# cleaning sessions.
form_tokens_dir = self.form_tokens_dir
if os.path.exists(form_tokens_dir):
for filename in os.listdir(form_tokens_dir):
if os.stat(os.path.join(form_tokens_dir, filename)).st_mtime < creation_limit:
try:
os.unlink(os.path.join(form_tokens_dir, filename))
except OSError:
pass
finally:
os.close(fd)
def clean_afterjobs(self):
now = time.time()
for job in AfterJob.select():
if job.status == 'completed' and (now - job.completion_time) > 3600:
# completed for more than one hour
job.remove_self()
elif (now - job.creation_time) > 2 * 86400:
# started more than two days ago, probably aborted job
job.remove_self()
def clean_tempfiles(self):
now = time.time()
one_month_ago = now - 30*86400
dirname = os.path.join(self.app_dir, 'tempfiles')
if not os.path.exists(dirname):
return
for filename in os.listdir(dirname):
if os.stat(os.path.join(dirname, filename))[8] < one_month_ago:
try:
os.unlink(os.path.join(dirname, filename))
except OSError:
pass
@classmethod
def register_cronjobs(cls):
cls.register_cronjob(CronJob(cls.clean_sessions, minutes=range(0, 60, 5)))
cls.register_cronjob(CronJob(cls.clean_nonces, minutes=range(0, 60, 5)))
cls.register_cronjob(CronJob(cls.clean_afterjobs, hourly=True))
cls.register_cronjob(CronJob(cls.clean_tempfiles, hourly=True))
_initialized = False
@classmethod
def init_publisher_class(cls):
if cls._initialized:
return
cls._initialized = True
cls.load_extra_dirs()
@classmethod
def create_publisher(cls, **kwargs):
publisher = cls(cls.root_directory_class(),
session_cookie_name=cls.APP_NAME,
session_cookie_path='/',
logger=logger.ApplicationLogger(error_log=cls.ERROR_LOG))
publisher.substitutions = Substitutions()
publisher.app_dir = cls.APP_DIR
publisher.data_dir = cls.DATA_DIR
if not os.path.exists(publisher.app_dir):
os.mkdir(publisher.app_dir)
publisher.register_ident_methods()
publisher.set_config()
return publisher
def detach(self):
# reset structures that would otherwise be shared between threads
self.pgconn = None
self._app_logger = None
self.init_publisher_substitutions(self.get_request())
extra_dirs = None
@classmethod
def register_extra_dir(cls, dir):
if not cls.extra_dirs:
cls.extra_dirs = []
cls.extra_dirs.append(dir)
@classmethod
def load_extra_dirs(cls):
for extra_dir in cls.extra_dirs or []:
if not os.path.exists(extra_dir):
continue
sys.path.append(extra_dir)
for filename in os.listdir(extra_dir):
if not filename.endswith('.py'):
continue
modulename = filename[:-3]
fp, pathname, description = imp.find_module(modulename, [extra_dir])
try:
imp.load_module(modulename, fp, pathname, description)
except Exception as e:
print('failed to load extra module: %s (%s)' % (modulename, e), file=sys.stderr)
if fp:
fp.close()
translation_domains = None
@classmethod
def register_translation_domain(cls, name):
if not cls.translation_domains:
cls.translation_domains = []
cls.translation_domains.append(name)
supported_languages = None
cfg = None
def write_cfg(self):
s = cPickle.dumps(self.cfg, protocol=2)
filename = os.path.join(self.app_dir, 'config.pck')
storage.atomic_write(filename, s)
def reload_cfg(self):
filename = os.path.join(self.app_dir, 'config.pck')
try:
self.cfg = cPickle.load(open(filename, 'rb'))
except:
self.cfg = {}
def export_cfg(self):
root = ET.Element('settings')
for k in self.cfg:
part = ET.SubElement(root, k)
for k2 in self.cfg[k]:
elem = ET.SubElement(part, k2)
val = self.cfg[k][k2]
if val is None:
pass
elif type(val) is dict:
for k3, v3 in val.items():
ET.SubElement(elem, k3).text = str(v3)
elif type(val) is list:
for v in val:
ET.SubElement(elem, 'item').text = v
elif isinstance(val, six.string_types):
elem.text = val
else:
elem.text = str(val)
return ET.tostring(root)
def import_cfg(self, filename):
try:
tree = ET.parse(open(filename))
except:
self.get_app_logger().warn('failed to import config from; failed to parse: %s', filename)
raise
if tree.getroot().tag != 'settings':
self.get_app_logger().warn('failed to import config; not a settings file: %s', filename)
return
cfg = {}
for elem in tree.getroot():
sub_cfg = {}
cfg[elem.tag] = sub_cfg
for child in elem:
sub_cfg[child.tag] = None
if list(child):
if list(child)[0].tag == 'item':
# list
sub_cfg[child.tag] = []
for c in child:
value = force_str(c.text)
if value in ('True', 'False'):
value = (value == 'True')
sub_cfg[child.tag].append(value)
else:
# dict
sub_cfg[child.tag] = {}
for c in child:
value = force_str(c.text)
if value in ('True', 'False'):
value = (value == 'True')
sub_cfg[child.tag][c.tag] = c
else:
text = child.text
if text is None:
sub_cfg[child.tag] = None
continue
text = force_str(text)
try:
sub_cfg[child.tag] = int(text)
except (ValueError, TypeError):
pass
else:
continue
if text in ('False', 'True'):
sub_cfg[child.tag] = bool(text == 'True')
else:
sub_cfg[child.tag] = text
return cfg
def process(self, stdin, env):
request = HTTPRequest(stdin, env)
self.response = self.process_request(request)
return self.response
_app_logger = None
def get_app_logger_filename(self):
return os.path.join(self.app_dir, '%s.log' % self.APP_NAME)
def get_app_logger(self, force=False):
debug = self.cfg.get('debug', {}).get('debug_mode', False)
if self._app_logger and not force:
return self._app_logger
self._app_logger = logging.getLogger(self.APP_NAME + self.app_dir)
if not self._app_logger.filters:
self._app_logger.addFilter(logger.BotFilter())
logfile = self.get_app_logger_filename()
if not self._app_logger.handlers:
hdlr = logging.handlers.RotatingFileHandler(logfile, 'a', 2**20, 100)
# max size = 1M
formatter = logger.Formatter(
'%(asctime)s %(levelname)s %(address)s '\
'%(session_id)s %(path)s %(user_id)s - %(message)s')
hdlr.setFormatter(formatter)
self._app_logger.addHandler(hdlr)
if debug:
self._app_logger.setLevel(logging.DEBUG)
else:
self._app_logger.setLevel(logging.INFO)
return self._app_logger
def sitecharset2utf8(self, str):
'''Convert a string in site encoding to UTF-8'''
return str.decode(self.site_charset).encode('utf-8')
def utf82sitecharset(self, str):
return str.decode('utf-8').encode(self.site_charset)
def get_default_position(self):
default_position = self.cfg.get('misc', {}).get('default-position', None)
if not default_position:
default_position = self.get_site_option('default_position')
if not default_position:
default_position = '50.84;4.36'
return default_position
def get_default_templatetags_libraries(self):
libraries = self.get_site_option('default_templatetags_libraries') or ''
return 'qommon l10n %s' % libraries
def get_map_attributes(self):
attrs = {}
attrs['data-def-lat'], attrs['data-def-lng'] = self.get_default_position().split(';')
if self.get_site_option('map-bounds-top-left'):
attrs['data-max-bounds-lat1'], attrs['data-max-bounds-lng1'] = \
self.get_site_option('map-bounds-top-left').split(';')
attrs['data-max-bounds-lat2'], attrs['data-max-bounds-lng2'] = \
self.get_site_option('map-bounds-bottom-right').split(';')
attrs['data-map-attribution'] = self.get_site_option('map-attribution') or \
_("Map data &copy; "\
"<a href='https://openstreetmap.org'>OpenStreetMap</a> contributors, "\
"<a href='http://creativecommons.org/licenses/by-sa/2.0/'>CC-BY-SA</a>")
attrs['data-tile-urltemplate'] = self.get_site_option('map-tile-urltemplate') or \
'https://tiles.entrouvert.org/hdm/{z}/{x}/{y}.png'
return attrs
def get_reverse_geocoding_service_url(self):
url = self.get_site_option('reverse_geocoding_service_url')
if url:
return url
url = self.get_site_option('nominatim_url') or 'https://nominatim.entrouvert.org'
url += '/reverse'
reverse_zoom_level = self.get_site_option('nominatim_reverse_zoom_level') or 18
url += '?zoom=%s' % reverse_zoom_level
key = self.get_site_option('nominatim_key')
if key:
url += '&key=%s' % key
return url
def get_geocoding_service_url(self):
url = self.get_site_option('geocoding_service_url')
if url:
return url
url = self.get_site_option('nominatim_url') or 'https://nominatim.entrouvert.org'
url += '/search'
key = self.get_site_option('nominatim_key')
if key:
url += '?key=%s' % key
return url
def get_supported_authentication_contexts(self):
contexts = collections.OrderedDict()
labels = {
'fedict': _('Belgian eID'),
'franceconnect': _('FranceConnect'),
}
if self.get_site_option('auth-contexts'):
for context in self.get_site_option('auth-contexts').split(','):
context = context.strip()
contexts[context] = labels[context]
return contexts
def get_authentication_saml_contexts(self, context):
return {
'fedict': [
# custom context, provided by authentic fedict plugin:
'urn:oasis:names:tc:SAML:2.0:ac:classes:SmartcardPKI',
# native fedict contexts:
'urn:be:fedict:iam:fas:citizen:eid',
'urn:be:fedict:iam:fas:citizen:token',
'urn:be:fedict:iam:fas:enterprise:eid',
'urn:be:fedict:iam:fas:enterprise:token'],
'franceconnect': [
'urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport',
]
}[context]
def get_lazy_variables_modes(self):
# possible modes:
# * django-condition: used to evaluate django conditions
# * python-condition: used to evaluate python conditions
# * lazy: used to force lazy mode in tests and in context processor
modes = self.get_site_option('lazy-variables-modes')
if modes:
return [x.strip() for x in modes.split(',')]
return ['lazy', 'django-condition']
def get_substitution_variables(self):
from . import misc
from wcs.variables import LazyDateObject
d = {
'site_name': get_cfg('misc', {}).get('sitename', None),
'site_theme': get_cfg('branding', {}).get('theme', self.default_theme),
'site_url': self.get_frontoffice_url(),
'site_url_backoffice': self.get_backoffice_url(),
'site_lang': (get_request() and hasattr(get_request(), 'language') and get_request().language) or 'en',
'today': LazyDateObject(datetime.date.today),
'now': LazyDateObject(datetime.datetime.now),
'is_in_backoffice': (self.get_request() and self.get_request().is_in_backoffice()),
}
if self.site_options is None:
self.load_site_options()
try:
d.update(dict(self.site_options.items('variables')))
except ConfigParser.NoSectionError:
pass
d['manager_homepage_url'] = d.get('portal_agent_url')
d['manager_homepage_title'] = d.get('portal_agent_title')
return d
extra_sources = []
@classmethod
def register_extra_source(cls, source):
'''Register a new static source of substitution variable for this publisher class.
source must be a callable taking two arguments: a publisher and a
request, and returning an object with a get_substitution_variables()
method.
'''
if not cls.extra_sources:
cls.extra_sources = []
cls.extra_sources.append(source)
@classmethod
def get_tenants(cls):
for tenant in sorted(os.listdir(cls.APP_DIR)):
if tenant in ('collectstatic', 'scripts', 'skeletons'):
continue
if tenant.endswith('.invalid'):
continue
if not os.path.isdir(os.path.join(cls.APP_DIR, tenant)):
continue
if not os.access(os.path.join(cls.APP_DIR, tenant), os.W_OK):
continue
yield tenant
def get_cfg(key, default = None):
r = get_publisher().cfg.get(key, default)
if not r:
return {}
return r
def get_logger():
return get_publisher().get_app_logger()
def set_publisher_class(klass):
builtins.__dict__['__publisher_class'] = klass
def get_publisher_class():
return builtins.__dict__.get('__publisher_class')
Substitutions.register('site_name', category=N_('General'), comment=N_('Site Name'))
Substitutions.register('site_theme', category=N_('General'), comment=N_('Current Theme Name'))
Substitutions.register('site_url', category=N_('General'), comment=N_('Site URL'))
Substitutions.register('site_url_backoffice', category=N_('General'), comment=N_('Site URL (backoffice)'))
Substitutions.register('today', category=N_('General'), comment=N_('Current Date'))
Substitutions.register('now', category=N_('General'), comment=N_('Current Date & Time'))