163 lines
5.3 KiB
Python
163 lines
5.3 KiB
Python
from Defaults import *
|
|
|
|
import lasso
|
|
if not hasattr(lasso, 'SAML2_SUPPORT'):
|
|
lasso.SAML2_SUPPORT = False
|
|
|
|
try:
|
|
from authentic_cfg import *
|
|
except ImportError:
|
|
pass
|
|
|
|
import __builtin__
|
|
import os
|
|
import sys
|
|
|
|
from quixote import enable_ptl
|
|
from quixote.publish import Publisher, get_response
|
|
import quixote.util
|
|
enable_ptl()
|
|
|
|
import gettext, locale
|
|
|
|
translations = {
|
|
'en': gettext.NullTranslations(),
|
|
}
|
|
for lang in ['fr']:
|
|
try:
|
|
translations[lang] = gettext.translation('authentic', languages=[lang])
|
|
except IOError:
|
|
pass
|
|
__builtin__.__dict__['N_'] = lambda x: x
|
|
|
|
|
|
import errors
|
|
import logger
|
|
import sessions
|
|
import misc
|
|
import identities
|
|
|
|
from root import RootDirectory
|
|
|
|
class AuthenticPublisher(Publisher):
|
|
def format_publish_error(self, exc):
|
|
get_response().filter = {}
|
|
if isinstance(exc, errors.AccessError) and hasattr(exc, 'render'):
|
|
return exc.render()
|
|
return errors.format_publish_error(exc)
|
|
|
|
def finish_failed_request(self):
|
|
result = Publisher.finish_failed_request(self)
|
|
(exc_type, exc_value, tb) = sys.exc_info()
|
|
if exc_type is NotImplementedError:
|
|
get_response().set_header('Content-Type', 'text/html') # set back content-type
|
|
return template.error_page(
|
|
_('This feature is not yet implemented.'),
|
|
error_title = _('Sorry'))
|
|
try:
|
|
logger.error('internal server error')
|
|
except:
|
|
pass
|
|
if not self.config.display_exceptions == 'text-in-html':
|
|
if not self.config.display_exceptions:
|
|
get_response().filter = {}
|
|
return errors.InternalServerError().render()
|
|
return result
|
|
# result is here the text output
|
|
get_response().set_header('Content-Type', 'text/html') # set back content-type
|
|
return template.error_page(
|
|
_('The server encountered an internal error and was unable to complete your request.'),
|
|
error_title = _('Internal Server Error'),
|
|
exception = result)
|
|
|
|
def set_config(self):
|
|
misc.reload_cfg()
|
|
debug_cfg = misc.cfg.get('debug', {})
|
|
self.logger.error_email = debug_cfg.get('error_email')
|
|
self.config.display_exceptions = debug_cfg.get('display_exceptions')
|
|
self.config.form_tokens = True
|
|
if debug_cfg.get('logger', True):
|
|
logger.enable()
|
|
else:
|
|
logger.disable()
|
|
|
|
def filter_output(self, request, output):
|
|
response = get_response()
|
|
if response.content_type != 'text/html':
|
|
return output
|
|
if not hasattr(response, 'filter') or not response.filter:
|
|
return output
|
|
return template.decorate(output, response)
|
|
|
|
def install_lang(self, lang = None):
|
|
if lang is None:
|
|
gettext.install('authentic') # will use environment variable to get language
|
|
else:
|
|
translations[lang].install()
|
|
_1 = _
|
|
__builtin__.__dict__['_'] = lambda x: unicode(_1(str(x)), 'utf-8').encode('iso-8859-1')
|
|
__builtin__.__dict__['_1'] = _1
|
|
|
|
def try_publish(self, request):
|
|
lang = misc.cfg.get('language', {}).get('language', None)
|
|
get_response().breadcrumb = []
|
|
self.install_lang(lang)
|
|
return Publisher.try_publish(self, request)
|
|
|
|
|
|
extra_dirs = None
|
|
def register_extra_dir(cls, dir):
|
|
if not cls.extra_dirs:
|
|
cls.extra_dirs = []
|
|
cls.extra_dirs.append(dir)
|
|
register_extra_dir = classmethod(register_extra_dir)
|
|
|
|
def load_extra_dirs(cls):
|
|
for extra_dir in cls.extra_dirs or []:
|
|
if not os.path.exists(extra_dir):
|
|
continue
|
|
sys.path.append(extra_dir)
|
|
for filename in os.listdir(extra_dir):
|
|
if not filename.endswith('.py'):
|
|
continue
|
|
modulename = filename[:-3]
|
|
fp, pathname, description = imp.find_module(modulename, [extra_dir])
|
|
try:
|
|
imp.load_module(modulename, fp, pathname, description)
|
|
except:
|
|
raise
|
|
logger.warn('failed to load extra module: %s' % modulename)
|
|
if fp:
|
|
fp.close()
|
|
load_extra_dirs = classmethod(load_extra_dirs)
|
|
|
|
|
|
class AuthenticVHostPublisher(AuthenticPublisher):
|
|
def try_publish(self, request):
|
|
self.app_dir = os.path.join(APP_DIR, request.get_server().split(':')[0])
|
|
if not os.path.exists(self.app_dir):
|
|
os.mkdir(self.app_dir)
|
|
self.set_config()
|
|
identities.load_store()
|
|
return AuthenticPublisher.try_publish(self, request)
|
|
|
|
|
|
def create_publisher(publisher_class = AuthenticPublisher):
|
|
publisher_class.load_extra_dirs()
|
|
publisher = publisher_class(RootDirectory(),
|
|
session_cookie_name = 'authentic',
|
|
session_cookie_path = '/',
|
|
error_log = ERROR_LOG)
|
|
publisher.app_dir = APP_DIR
|
|
if not os.path.exists(publisher.app_dir):
|
|
os.mkdir(publisher.app_dir)
|
|
publisher.data_dir = DATA_DIR
|
|
publisher.set_session_manager(sessions.StorageSessionManager())
|
|
publisher.set_config()
|
|
identities.load_store()
|
|
return publisher
|
|
|
|
def create_vhost_publisher():
|
|
return create_publisher(AuthenticVHostPublisher)
|
|
|