wcs/wcs/qommon/publisher.py

1032 lines
38 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import builtins
import codecs
import collections
import configparser
import datetime
import hashlib
import importlib
import inspect
import io
import json
import linecache
import locale
import logging
import os
import pickle
import random
import re
import sys
import time
import traceback
import urllib.parse
from decimal import Decimal
from django.conf import settings
from django.http import Http404
from django.utils import translation
from django.utils.encoding import force_bytes, force_text
from quixote.publish import Publisher, get_publisher, get_request, get_response
from wcs.qommon.storage import Less
from . import _, errors, force_str, logger, storage, template
from .cron import CronJob
from .http_request import HTTPRequest
from .http_response import AfterJob, HTTPResponse
from .substitution import CompatibilityNamesDict, Substitutions
from .vendor import locket
try:
import sentry_sdk
except ImportError:
sentry_sdk = None
class ImmediateRedirectException(Exception):
def __init__(self, location):
self.location = location
class UnknownTenantError(Exception):
pass
class Tenant:
def __init__(self, directory):
self.directory = directory
self.hostname = os.path.basename(directory)
class SiteOptionsBoolean:
# support class for values from site-options [variables] section that
# can be used as if strings as well as booleans
true_strings = ('yes', 'true', 'on')
false_strings = ('no', 'false', 'off')
def __init__(self, value):
if isinstance(value, str):
self.as_str = value
self.value = bool(value.lower() in self.true_strings)
else:
self.value = bool(value)
self.as_str = str(value)
def __bool__(self):
return self.value
def __eq__(self, other):
return bool(self) is bool(SiteOptionsBoolean(other))
def __str__(self):
return self.as_str
class QommonPublisher(Publisher):
# noqa pylint: disable=too-many-public-methods
APP_NAME = None
APP_DIR = None
DATA_DIR = None
ERROR_LOG = None
USE_LONG_TRACES = True
root_directory_class = None
backoffice_directory_class = None
admin_directory_class = None
session_manager_class = None
user_class = None
unpickler_class = None
after_login_url = ''
qommon_static_dir = 'static/'
qommon_admin_css = 'css/dc2/admin.css'
default_theme = 'django'
site_options = None
site_charset = 'utf-8'
missing_appdir_redirect = None
use_sms_feature = True
gettext = lambda self, message: message
ngettext = lambda self, msgid1, msgid2, n: msgid1
pgettext = lambda self, context, message: message
app_dir = None
@property
def form_tokens_dir(self):
return os.path.join(self.app_dir, 'form_tokens')
def get_root_url(self):
if self.get_request():
return self.get_request().environ['SCRIPT_NAME'] + '/'
else:
return '/'
def get_application_static_files_root_url(self):
# Typical applications will have their static files under the same root
# directory as themselves; this method allows others to host them under
# some other path, or even on some totally different hostname.
return self.get_root_url()
def get_frontoffice_url(self, without_script_name=False):
frontoffice_url = get_cfg('misc', {}).get('frontoffice-url', None)
if frontoffice_url:
return frontoffice_url
req = self.get_request()
if req:
if without_script_name:
return '%s://%s' % (req.get_scheme(), req.get_server())
else:
return '%s://%s%s' % (
req.get_scheme(),
req.get_server(),
urllib.parse.quote(req.environ.get('SCRIPT_NAME')),
)
return 'https://%s' % os.path.basename(get_publisher().app_dir)
def get_backoffice_url(self):
return urllib.parse.urljoin(self.get_frontoffice_url(), '/backoffice')
def get_global_eval_dict(self):
from . import evalutils as utils
def compat_locals():
frame = inspect.getouterframes(inspect.currentframe())[1][0]
x = CompatibilityNamesDict(frame.f_locals)
return x
return {
'datetime': datetime,
'Decimal': Decimal,
'codecs': codecs,
'force_str': force_str,
'force_bytes': force_bytes,
'force_text': force_text,
'locals': compat_locals,
'vars': compat_locals,
'random': random.SystemRandom(),
're': re,
'date': utils.date,
'days': utils.days,
'utils': utils,
}
def format_publish_error(self, exc):
get_response().filter = {}
if isinstance(exc, errors.PublishError) and hasattr(exc, 'render'):
return exc.render()
return errors.format_publish_error(exc)
def finish_interrupted_request(self, exc):
# it is exactly the same as in the base class, but using our own
# HTTPResponse class
if not self.config.display_exceptions and exc.private_msg:
exc.private_msg = None # hide it
request = get_request()
request.response = HTTPResponse(status=exc.status_code)
if exc.status_code == 401:
# include WWW-Authenticate header
request.response.headers['WWW-Authenticate'] = 'Basic realm="%s"' % exc.realm
if request.is_json():
request.response.set_content_type('application/json')
return json.dumps(
{
'err': 1,
'err_class': str(exc.title),
'err_desc': str(exc.public_msg) if exc.public_msg else None,
}
)
request.response.set_robots_no_index()
if isinstance(exc, errors.TraversalError):
raise Http404()
output = self.format_publish_error(exc)
self.session_manager.finish_successful_request()
return output
def _generate_plaintext_error(self, request, original_response, exc_type, exc_value, tb, limit=None):
if not self.USE_LONG_TRACES:
if not request:
# this happens when an exception is raised by an afterjob
request = HTTPRequest(None, {})
if not request.form:
request.form = {}
return Publisher._generate_plaintext_error(
self, request, original_response, exc_type, exc_value, tb
)
error_file = io.StringIO()
if limit is None:
if hasattr(sys, 'tracebacklimit'):
limit = sys.tracebacklimit
print("Exception:", file=error_file)
print(" type = '%s', value = '%s'" % (exc_type, exc_value), file=error_file)
print('', file=error_file)
# format the traceback
print('Stack trace (most recent call first):', file=error_file)
while tb.tb_next:
tb = tb.tb_next
frame = tb.tb_frame
n = 0
while frame and (limit is None or n < limit):
function = frame.f_code.co_name
filename = frame.f_code.co_filename
exclineno = frame.f_lineno
locals = sorted(frame.f_locals.items(), key=lambda item: item[0])
print(' File "%s", line %s, in %s' % (filename, exclineno, function), file=error_file)
linecache.checkcache(filename)
for lineno in range(exclineno - 2, exclineno + 3):
line = linecache.getline(filename, lineno, frame.f_globals)
if line:
if lineno == exclineno:
print('>%5s %s' % (lineno, line.rstrip()), file=error_file)
else:
print(' %5s %s' % (lineno, line.rstrip()), file=error_file)
print('', file=error_file)
if locals:
print(" locals: ", file=error_file)
for key, value in locals:
print(" %s =" % key, end=' ', file=error_file)
try:
repr_value = repr(value)
if len(repr_value) > 10000:
repr_value = repr_value[:10000] + ' [...]'
print(repr_value, file=error_file)
except Exception:
print("<ERROR WHILE PRINTING VALUE>", file=error_file)
print('', file=error_file)
frame = frame.f_back
n = n + 1
# include request and response dumps
if request:
error_file.write('\n')
error_file.write(request.dump())
error_file.write('\n')
return error_file.getvalue()
def finish_successful_request(self):
if not self.get_request().ignore_session:
self.session_manager.finish_successful_request()
def can_sentry(self):
return (sentry_sdk is not None) and (sentry_sdk.Hub.current.client is not None)
def finish_failed_request(self):
# duplicate at lot from parent class, just to use our own HTTPResponse
request = get_request()
original_response = request.response
request.response = HTTPResponse()
request.response.set_robots_no_index()
(exc_type, exc_value, tb) = sys.exc_info()
if exc_type is NotImplementedError:
get_response().set_header('Content-Type', 'text/html') # set back content-type
return template.error_page(_('This feature is not yet implemented.'), error_title=_('Sorry'))
if self.can_sentry():
sentry_sdk.capture_exception(sys.exc_info())
error_summary = traceback.format_exception_only(exc_type, exc_value)
error_summary = error_summary[0][0:-1] # de-listify and strip newline
plain_error_msg = self._generate_plaintext_error(request, original_response, exc_type, exc_value, tb)
if request.is_json():
request.response.set_content_type('application/json')
d = {'err': 1}
if self.config.display_exceptions:
d['err_class'] = exc_type.__name__
d['err_desc'] = error_summary
error_page = json.dumps(d)
else:
request.response.set_header('Content-Type', 'text/html')
if not self.config.display_exceptions:
# DISPLAY_EXCEPTIONS is false, so return the most
# secure (and cryptic) page.
error_page = self._generate_internal_error(request)
else:
# Generate a plaintext page containing the traceback
request.response.set_header('Content-Type', 'text/plain')
error_page = plain_error_msg
try:
self.logger.log_internal_error(error_summary, plain_error_msg)
except OSError:
# wilr happen if there is no mail server available and exceptions
# were configured to be mailed.
pass
if exc_type is SystemExit:
raise exc_type
request.response.set_status(500)
self.session_manager.finish_failed_request()
return error_page
def install_lang(self, request=None):
if request:
lang = request.language
else:
lang = self.get_site_language(request)
if lang is None or lang not in [x[0] for x in settings.LANGUAGES]:
lang = 'en'
translation.activate(lang)
self.gettext = translation.gettext
self.ngettext = translation.ngettext
self.pgettext = translation.pgettext
if request:
request.LANGUAGE_CODE = lang
def load_site_options(self):
self.site_options = configparser.ConfigParser()
site_options_filename = os.path.join(self.app_dir, 'site-options.cfg')
if not os.path.exists(site_options_filename):
return
try:
self.site_options.read(site_options_filename, encoding='utf-8')
except Exception:
self.get_app_logger().error('failed to read site options file')
return
def has_site_option(self, option, default=False):
if self.site_options is None:
self.load_site_options()
try:
return self.site_options.get('options', option) == 'true'
except (configparser.NoSectionError, configparser.NoOptionError):
return default
def get_site_option(self, option, section='options'):
defaults = {
'options': {
'unused-files-behaviour': 'remove',
'relatable-hosts': '',
},
}
if self.site_options is None:
self.load_site_options()
try:
return self.site_options.get(section, option)
except configparser.NoSectionError:
return defaults.get(section, {}).get(option)
except configparser.NoOptionError:
return defaults.get(section, {}).get(option)
def get_site_storages(self):
if self.site_options is None:
self.load_site_options()
storages = {}
for section, definition in self.site_options.items():
if section.startswith('storage-') and 'label' in definition and 'class' in definition:
storage_id = section[8:]
storages[storage_id] = dict(definition)
storages[storage_id]['id'] = storage_id
return storages
def set_config(self, request=None):
self.reload_cfg()
self.site_options = None # reset at the beginning of a request
self._cached_user_fields_formdef = None
debug_cfg = self.cfg.get('debug', {})
self.logger.error_email = debug_cfg.get('error_email')
self.logger.error_email_from = self.cfg.get('emails', {}).get('from')
self.config.display_exceptions = debug_cfg.get('debug_mode')
self.config.form_tokens = True
self.config.session_cookie_httponly = True
self.config.allowed_methods = ['GET', 'HEAD', 'POST', 'PUT']
if request:
if request.get_scheme() == 'https':
self.config.session_cookie_secure = True
md5_hash = hashlib.md5()
md5_hash.update(force_bytes(self.app_dir))
self.config.session_cookie_name = 'sessionid-%s-%s' % (self.APP_NAME, md5_hash.hexdigest()[:6])
self.config.session_cookie_path = '/'
self._app_logger = self.get_app_logger(force=True)
def set_app_dir(self, request):
"""
Set the application directory, creating it if possible and authorized.
"""
self.site_options = None # reset at the beginning of a request
canonical_hostname = request.get_server(clean=False).lower().split(':')[0].rstrip('.')
try:
self.set_tenant_by_hostname(canonical_hostname, request=request)
except UnknownTenantError:
if self.missing_appdir_redirect:
raise ImmediateRedirectException(self.missing_appdir_redirect)
raise Http404()
try:
os.mkdir(self.form_tokens_dir)
except OSError: # already exists
pass
def init_publish(self, request):
self.set_app_dir(request)
self._http_adapter = None
request.language = self.get_site_language(request)
self.install_lang(request)
self.init_publisher_substitutions(request)
def init_publisher_substitutions(self, request):
self.substitutions = Substitutions()
self.substitutions.feed(self)
self.substitutions.feed(request)
for extra_source in self.extra_sources:
self.substitutions.feed(extra_source(self, request))
def get_site_language(self, request=None):
if request is None:
request = self.get_request()
lang = self.cfg.get('language', {}).get('language', None)
if lang == 'HTTP':
if request is None:
return None
lang = None
accepted_languages = request.get_header('Accept-Language')
if accepted_languages:
accepted_languages = [x.strip() for x in accepted_languages.split(',')]
# forget about subtag and quality value
accepted_languages = [x.split('-')[0] for x in accepted_languages]
known_languages = [x[0] for x in settings.LANGUAGES]
for lang in accepted_languages:
if lang in known_languages:
return lang
lang = None
if lang is None:
default_locale = locale.getdefaultlocale()
if default_locale and default_locale[0]:
lang = default_locale[0].split('_')[0]
return lang
def get_backoffice_root(self):
try:
return self.root_directory.backoffice
except AttributeError:
return None
ident_methods = None
def register_ident_methods(self):
try:
import lasso
except ImportError:
lasso = None
classes = []
if lasso:
from .ident import idp
classes.append(idp.IdPAuthMethod)
from .ident import franceconnect
classes.append(franceconnect.FCAuthMethod)
from .ident import password
classes.append(password.PasswordAuthMethod)
self.ident_methods = {}
for klass in classes:
self.ident_methods[klass.key] = klass
klass.register()
cronjobs = None
@classmethod
def register_cronjob(cls, cronjob):
if not cls.cronjobs:
cls.cronjobs = []
# noqa pylint: disable=not-an-iterable
if cronjob.name and any(x for x in cls.cronjobs if x.name == cronjob.name):
# already registered
return
cls.cronjobs.append(cronjob)
def clean_nonces(self, delta=60, now=None, **kwargs):
nonce_dir = os.path.join(get_publisher().app_dir, 'nonces')
if not os.path.exists(nonce_dir):
return
cleaning_lock_file = os.path.join(self.app_dir, 'cleaning_nonces.lock')
try:
now = now or time.time()
with locket.lock_file(cleaning_lock_file, timeout=0):
for nonce in os.listdir(nonce_dir):
nonce_path = os.path.join(nonce_dir, nonce)
# we need delta so that os.utime in is_url_signed has time to update file timestamp
if delta < now - os.stat(nonce_path)[8]:
os.unlink(nonce_path)
except locket.LockError:
pass
def clean_sessions(self, **kwargs):
cleaning_lock_file = os.path.join(self.app_dir, 'cleaning_sessions.lock')
try:
with locket.lock_file(cleaning_lock_file, timeout=0):
manager = self.session_manager_class(session_class=self.session_class)
now = time.time()
last_usage_limit = now - 3 * 86400
creation_limit = now - 30 * 86400
for session_key in manager.keys():
try:
session = manager.get(session_key)
if session._access_time < last_usage_limit or session._creation_time < creation_limit:
del manager[session.id]
except (AttributeError, KeyError):
# the key is impossible to load or the session is malformed
# delete it anyway, but only catch KeyError (other errors could be useful)
try:
del manager[session_key]
except KeyError:
pass
continue
# also delete obsolete form_tokens that would have be missed when
# cleaning sessions.
form_tokens_dir = self.form_tokens_dir
if os.path.exists(form_tokens_dir):
for filename in os.listdir(form_tokens_dir):
try:
if os.stat(os.path.join(form_tokens_dir, filename)).st_mtime < creation_limit:
os.unlink(os.path.join(form_tokens_dir, filename))
except FileNotFoundError:
pass
except locket.LockError:
pass
def clean_afterjobs(self, **kwargs):
now = time.time()
for job_id in AfterJob.keys():
job = AfterJob.get(job_id)
if job.status == 'completed' and (now - job.completion_time) > 3600:
# completed for more than one hour
job.remove_self()
elif (now - job.creation_time) > 2 * 86400:
# started more than two days ago, probably aborted job
job.remove_self()
def clean_tokens(self, **kwargs):
token_class = getattr(self, 'token_class', None)
if token_class:
token_class.clean()
def _clean_files(self, limit, dirname, check_method=None):
if not os.path.exists(dirname):
return
for filename in os.listdir(dirname):
if os.stat(os.path.join(dirname, filename))[8] < limit:
if check_method is not None and not check_method(filename):
continue
try:
os.unlink(os.path.join(dirname, filename))
except OSError:
pass
def clean_tempfiles(self, **kwargs):
now = time.time()
one_month_ago = now - 30 * 86400
self._clean_files(one_month_ago, os.path.join(self.app_dir, 'tempfiles'))
def clean_models(self, **kwargs):
from wcs.workflows import Workflow
now = time.time()
two_days_ago = now - 2 * 86400
filenames_used = set()
for workflow in Workflow.select(ignore_errors=True):
for item in workflow.get_all_items():
if item.key != 'export_to_model':
continue
if not item.model_file:
continue
filenames_used.add(item.model_file.filename)
self._clean_files(
two_days_ago, os.path.join(self.app_dir, 'models'), check_method=lambda x: x not in filenames_used
)
def clean_thumbnails(self, **kwargs):
now = time.time()
one_month_ago = now - 30 * 86400
self._clean_files(one_month_ago, os.path.join(self.app_dir, 'thumbs'))
def clean_loggederrors(self, **kwargs):
if not self.loggederror_class:
return
clauses = [
Less(
'latest_occurence_timestamp',
(datetime.datetime.now() - datetime.timedelta(days=30)).timetuple(),
)
]
for error in self.loggederror_class.select(clause=clauses):
self.loggederror_class.remove_object(error.id)
@classmethod
def register_cronjobs(cls):
cls.register_cronjob(CronJob(cls.clean_sessions, minutes=range(0, 60, 5), name='clean_sessions'))
cls.register_cronjob(CronJob(cls.clean_nonces, minutes=range(0, 60, 5), name='clean_nonces'))
cls.register_cronjob(CronJob(cls.clean_afterjobs, minutes=[0], name='clean_afterjobs'))
cls.register_cronjob(CronJob(cls.clean_tokens, minutes=[0], name='clean_tokens'))
cls.register_cronjob(CronJob(cls.clean_tempfiles, minutes=[0], name='clean_tempfiles'))
cls.register_cronjob(CronJob(cls.clean_models, minutes=[0], name='clean_models'))
cls.register_cronjob(CronJob(cls.clean_thumbnails, minutes=[0], name='clean_thumbnails'))
cls.register_cronjob(CronJob(cls.clean_loggederrors, hours=[3], name='clean_loggederrors'))
_initialized = False
@classmethod
def init_publisher_class(cls):
if cls._initialized:
return
cls._initialized = True
cls.load_extra_dirs()
@classmethod
def create_publisher(cls, **kwargs):
publisher = cls(
cls.root_directory_class(),
session_cookie_name=cls.APP_NAME,
session_cookie_path='/',
logger=logger.ApplicationLogger(),
)
publisher.substitutions = Substitutions()
publisher.app_dir = cls.APP_DIR
publisher.data_dir = cls.DATA_DIR
if not os.path.exists(publisher.app_dir):
os.mkdir(publisher.app_dir)
publisher.register_ident_methods()
publisher.set_config()
return publisher
def detach(self):
# reset structures that would otherwise be shared between threads
self.pgconn = None
self._app_logger = None
self.init_publisher_substitutions(self.get_request())
extra_dirs = None
@classmethod
def register_extra_dir(cls, dir):
if not cls.extra_dirs:
cls.extra_dirs = []
cls.extra_dirs.append(dir)
@classmethod
def load_extra_dirs(cls):
for extra_dir in cls.extra_dirs or []:
if not os.path.exists(extra_dir):
continue
sys.path.append(extra_dir)
for filename in os.listdir(extra_dir):
if not filename.endswith('.py'):
continue
modulename = filename[:-3]
spec = importlib.util.spec_from_file_location(modulename, os.path.join(extra_dir, filename))
module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(module)
except Exception as e:
print('failed to load extra module: %s (%s)' % (modulename, e), file=sys.stderr)
translation_domains = None
@classmethod
def register_translation_domain(cls, name):
if not cls.translation_domains:
cls.translation_domains = []
cls.translation_domains.append(name)
supported_languages = None
cfg = None
def write_cfg(self):
s = pickle.dumps(self.cfg, protocol=2)
filename = os.path.join(self.app_dir, 'config.pck')
storage.atomic_write(filename, s)
def reload_cfg(self):
filename = os.path.join(self.app_dir, 'config.pck')
try:
with open(filename, 'rb') as fd:
self.cfg = pickle.load(fd, encoding='utf-8')
except Exception:
self.cfg = {}
def process(self, stdin, env):
request = HTTPRequest(stdin, env)
self.response = self.process_request(request)
return self.response
_app_logger = None
def get_app_logger(self, force=False):
if self._app_logger and not force:
return self._app_logger
self._app_logger = logging.getLogger(self.APP_NAME + self.app_dir)
if not self._app_logger.handlers:
hdlr = logging.StreamHandler() # -> sys.stderr
# do not include date/time as they will be automatically added by journald
formatter = logger.Formatter('({levelname:.1s}) {tenant} {address} {path} - {message}', style='{')
hdlr.setFormatter(formatter)
self._app_logger.addHandler(hdlr)
if self.cfg.get('debug', {}).get('debug_mode', False):
self._app_logger.setLevel(logging.DEBUG)
else:
self._app_logger.setLevel(logging.INFO)
return self._app_logger
def sitecharset2utf8(self, str):
'''Convert a string in site encoding to UTF-8'''
return str.decode(self.site_charset).encode('utf-8')
def utf82sitecharset(self, str):
return str.decode('utf-8').encode(self.site_charset)
def get_default_position(self):
default_position = self.cfg.get('misc', {}).get('default-position', None)
if not default_position:
default_position = self.get_site_option('default_position')
if not default_position:
default_position = '50.84;4.36'
return default_position
def get_default_zoom_level(self):
return self.cfg.get('misc', {}).get('default-zoom-level', '13')
def get_map_attributes(self):
attrs = {}
attrs['data-def-lat'], attrs['data-def-lng'] = self.get_default_position().split(';')
if self.get_site_option('map-bounds-top-left'):
attrs['data-max-bounds-lat1'], attrs['data-max-bounds-lng1'] = self.get_site_option(
'map-bounds-top-left'
).split(';')
attrs['data-max-bounds-lat2'], attrs['data-max-bounds-lng2'] = self.get_site_option(
'map-bounds-bottom-right'
).split(';')
attrs['data-map-attribution'] = self.get_site_option('map-attribution') or _(
"Map data &copy; "
"<a href='https://openstreetmap.org'>OpenStreetMap</a> contributors, "
"<a href='http://creativecommons.org/licenses/by-sa/2.0/'>CC-BY-SA</a>"
)
attrs['data-tile-urltemplate'] = (
self.get_site_option('map-tile-urltemplate') or 'https://tiles.entrouvert.org/hdm/{z}/{x}/{y}.png'
)
return attrs
def get_reverse_geocoding_service_url(self):
url = self.get_site_option('reverse_geocoding_service_url')
if url:
return url
url = self.get_site_option('nominatim_url') or 'https://nominatim.entrouvert.org'
url += '/reverse'
reverse_zoom_level = self.get_site_option('nominatim_reverse_zoom_level') or 18
url += '?zoom=%s' % reverse_zoom_level
key = self.get_site_option('nominatim_key')
if key:
url += '&key=%s' % key
return url
def get_geocoding_service_url(self):
url = self.get_site_option('geocoding_service_url')
if url:
return url
url = self.get_site_option('nominatim_url') or 'https://nominatim.entrouvert.org'
url += '/search'
key = self.get_site_option('nominatim_key')
if key:
url += '?key=%s' % key
if self.get_site_option('map-bounds-top-left'):
url += '&' if '?' in url else '?'
top, left = self.get_site_option('map-bounds-top-left').split(';')
bottom, right = self.get_site_option('map-bounds-bottom-right').split(';')
url += 'viewbox=%s,%s,%s,%s&bounded=1' % (left, top, right, bottom)
return url
def get_working_day_calendar(self):
return self.get_site_option('working_day_calendar') or settings.WORKING_DAY_CALENDAR
def get_supported_authentication_contexts(self):
contexts = collections.OrderedDict()
labels = {
'fedict': _('Belgian eID'),
'franceconnect': _('FranceConnect'),
}
if self.get_site_option('auth-contexts'):
for context in self.get_site_option('auth-contexts').split(','):
context = context.strip()
contexts[context] = labels[context]
return contexts
def get_authentication_saml_contexts(self, context):
return {
'fedict': [
# custom context, provided by authentic fedict plugin:
'urn:oasis:names:tc:SAML:2.0:ac:classes:SmartcardPKI',
# native fedict contexts:
'urn:be:fedict:iam:fas:citizen:eid',
'urn:be:fedict:iam:fas:citizen:token',
'urn:be:fedict:iam:fas:enterprise:eid',
'urn:be:fedict:iam:fas:enterprise:token',
],
'franceconnect': [
'urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport',
],
}[context]
def get_lazy_variables_modes(self):
# possible modes:
# * django-condition: used to evaluate django conditions
# * python-condition: used to evaluate python conditions
# * lazy: used to force lazy mode in tests and in context processor
modes = self.get_site_option('lazy-variables-modes')
if modes:
return [x.strip() for x in modes.split(',')]
return ['lazy', 'django-condition']
def get_email_well_known_domains(self):
emails_cfg = get_cfg('emails', {})
well_known_domains = emails_cfg.get('well_known_domains')
if not well_known_domains:
well_known_domains = [
'gmail.com',
'msn.com',
'hotmail.com',
'hotmail.fr',
'wanadoo.fr',
'free.fr',
'yahoo.fr',
'numericable.fr',
'laposte.net',
'orange.fr',
'yahoo.com',
]
return well_known_domains
def get_email_valid_known_domains(self):
emails_cfg = get_cfg('emails', {})
valid_known_domains = emails_cfg.get('valid_known_domains')
if not valid_known_domains:
valid_known_domains = ['yopmail.com', 'laposte.fr']
return valid_known_domains
def get_substitution_variables(self):
from wcs.variables import LazyDateObject
d = {
'site_name': get_cfg('misc', {}).get('sitename', None),
'site_theme': get_cfg('branding', {}).get('theme', self.default_theme),
'site_url': self.get_frontoffice_url(),
'site_url_backoffice': self.get_backoffice_url(),
'site_lang': (get_request() and hasattr(get_request(), 'language') and get_request().language)
or 'en',
'today': LazyDateObject(datetime.date.today),
'now': LazyDateObject(datetime.datetime.now),
'is_in_backoffice': (self.get_request() and self.get_request().is_in_backoffice()),
'null': None,
'true': True,
'false': False,
}
if self.site_options is None:
self.load_site_options()
try:
site_options_vars = dict(self.site_options.items('variables', raw=True))
except configparser.NoSectionError:
site_options_vars = {}
for k, v in site_options_vars.items():
if v.lower() in SiteOptionsBoolean.true_strings + SiteOptionsBoolean.false_strings:
site_options_vars[k] = SiteOptionsBoolean(v)
d.update(site_options_vars)
d['manager_homepage_url'] = d.get('portal_agent_url')
d['manager_homepage_title'] = d.get('portal_agent_title')
return d
def is_relatable_url(self, url):
parsed_url = urllib.parse.urlparse(url)
if not parsed_url.netloc:
return True
if parsed_url.netloc == urllib.parse.urlparse(self.get_frontoffice_url()).netloc:
return True
if parsed_url.netloc == urllib.parse.urlparse(self.get_backoffice_url()).netloc:
return True
if parsed_url.netloc in [x.strip() for x in self.get_site_option('relatable-hosts').split(',')]:
return True
try:
if parsed_url.netloc in self.site_options.options('api-secrets'):
return True
except configparser.NoSectionError:
pass
return False
extra_sources = []
@classmethod
def register_extra_source(cls, source):
"""Register a new static source of substitution variable for this publisher class.
source must be a callable taking two arguments: a publisher and a
request, and returning an object with a get_substitution_variables()
method.
"""
if not cls.extra_sources:
cls.extra_sources = []
cls.extra_sources.append(source)
def set_tenant(self, tenant, **kwargs):
self.tenant = tenant
self.app_dir = tenant.directory
self.set_config(**kwargs)
def set_tenant_by_hostname(self, hostname, **kwargs):
for base_dir in (os.path.join(self.APP_DIR, 'tenants'), self.APP_DIR):
tenant_dir = os.path.join(base_dir, hostname)
if os.path.exists(tenant_dir):
self.set_tenant(Tenant(tenant_dir), **kwargs)
break
else:
raise UnknownTenantError(hostname)
@classmethod
def get_tenants(cls):
seen = set()
for base_dir in (os.path.join(cls.APP_DIR, 'tenants'), cls.APP_DIR):
if not os.path.exists(base_dir):
continue
for tenant in sorted(os.listdir(base_dir)):
if tenant[0] in ('.', '_'):
continue
if tenant in ('collectstatic', 'scripts', 'skeletons', 'spooler', 'tenants'):
continue
if tenant.endswith('.invalid'):
continue
tenant_dir = os.path.join(base_dir, tenant)
if not os.path.isdir(tenant_dir):
continue
if not os.access(tenant_dir, os.W_OK):
continue
if tenant in seen:
# avoid going twice over same tenants, in case of a tenants/ symlink to
# /var/lib/wcs/.
continue
seen.add(tenant)
yield Tenant(tenant_dir)
def get_cfg(key, default=None):
r = get_publisher().cfg.get(key, default)
if not r:
return {}
return r
def get_logger():
return get_publisher().get_app_logger()
def set_publisher_class(klass):
builtins.__dict__['__publisher_class'] = klass
def get_publisher_class():
return builtins.__dict__.get('__publisher_class')
Substitutions.register('site_name', category=_('General'), comment=_('Site Name'))
Substitutions.register('site_theme', category=_('General'), comment=_('Current Theme Name'))
Substitutions.register('site_url', category=_('General'), comment=_('Site URL'))
Substitutions.register('site_url_backoffice', category=_('General'), comment=_('Site URL (backoffice)'))
Substitutions.register('today', category=_('General'), comment=_('Current Date'))
Substitutions.register('now', category=_('General'), comment=_('Current Date & Time'))