878 lines
32 KiB
Python
878 lines
32 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2010 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import cPickle
|
|
try:
|
|
from hashlib import md5 as md5_new
|
|
except:
|
|
from md5 import new as md5_new
|
|
|
|
import ConfigParser
|
|
import imp
|
|
import os
|
|
import fcntl
|
|
import socket
|
|
import sys
|
|
import time
|
|
import traceback
|
|
import __builtin__
|
|
__builtin__.__dict__['_'] = lambda x: x
|
|
__builtin__.__dict__['N_'] = lambda x: x
|
|
|
|
import gettext
|
|
import locale
|
|
import linecache
|
|
import formatter
|
|
from StringIO import StringIO
|
|
|
|
try:
|
|
import elementtree.ElementTree as ET
|
|
except ImportError:
|
|
try:
|
|
import xml.etree.ElementTree as ET
|
|
except ImportError:
|
|
ET = None
|
|
|
|
from quixote.publish import Publisher, get_request, get_response, get_publisher, redirect
|
|
from http_request import HTTPRequest
|
|
from http_response import HTTPResponse, AfterJob
|
|
|
|
from cron import CronJob
|
|
from substitution import Substitutions
|
|
|
|
import errors
|
|
import template
|
|
import logging
|
|
import logging.handlers
|
|
import logger
|
|
import storage
|
|
import strftime
|
|
import urllib
|
|
|
|
class ImmediateRedirectException(Exception):
|
|
def __init__(self, location):
|
|
self.location = location
|
|
|
|
class QommonPublisher(Publisher):
|
|
APP_NAME = None
|
|
APP_DIR = None
|
|
DATA_DIR = None
|
|
ERROR_LOG = None
|
|
USE_LONG_TRACES = True
|
|
|
|
admin_help_url = None
|
|
backoffice_help_url = None
|
|
backoffice_feed_url = None
|
|
|
|
root_directory_class = None
|
|
backoffice_directory_class = None
|
|
admin_directory_class = None
|
|
|
|
session_manager_class = None
|
|
user_class = None
|
|
unpickler_class = None
|
|
|
|
after_login_url = ''
|
|
qommon_static_dir = 'qo/'
|
|
qommon_admin_css = 'css/dc2/admin.css'
|
|
default_theme = 'default'
|
|
|
|
site_options = None
|
|
site_charset = 'iso-8859-1'
|
|
default_configuration_path = None
|
|
auto_create_appdir = True
|
|
missing_appdir_redirect = None
|
|
use_sms_feature = True
|
|
app_translations = dict()
|
|
statsd = None
|
|
|
|
def get_root_url(self):
|
|
if self.get_request():
|
|
return self.get_request().environ['SCRIPT_NAME'] + '/'
|
|
else:
|
|
return '/'
|
|
|
|
def get_application_static_files_root_url(self):
|
|
# Typical applications will have their static files under the same root
|
|
# directory as themselves; this method allows others to host them under
|
|
# some other path, or even on some totally different hostname.
|
|
return self.get_root_url()
|
|
|
|
def get_frontoffice_url(self):
|
|
frontoffice_url = get_cfg('misc', {}).get('frontoffice-url', None)
|
|
if frontoffice_url:
|
|
return frontoffice_url
|
|
req = self.get_request()
|
|
if req:
|
|
return '%s://%s%s' % (req.get_scheme(), req.get_server(),
|
|
urllib.quote(req.environ.get('SCRIPT_NAME')))
|
|
return 'http://%s' % os.path.basename(get_publisher().app_dir)
|
|
|
|
def get_backoffice_url(self):
|
|
backoffice_url = get_cfg('misc', {}).get('backoffice-url', None)
|
|
if backoffice_url:
|
|
return backoffice_url
|
|
req = self.get_request()
|
|
if req:
|
|
return '%s://%s%s/backoffice' % (req.get_scheme(), req.get_server(),
|
|
urllib.quote(req.environ.get('SCRIPT_NAME')))
|
|
return 'http://%s/backoffice' % os.path.basename(self.app_dir)
|
|
|
|
def get_global_eval_dict(self):
|
|
import datetime
|
|
from decimal import Decimal
|
|
return {'datetime': datetime,
|
|
'Decimal': Decimal}
|
|
|
|
def format_publish_error(self, exc):
|
|
get_response().filter = {}
|
|
if isinstance(exc, errors.AccessError) and hasattr(exc, 'render'):
|
|
return exc.render()
|
|
return errors.format_publish_error(exc)
|
|
|
|
def finish_interrupted_request(self, exc):
|
|
# it is exactly the same as in the base class, but using our own
|
|
# HTTPResponse class
|
|
if not self.config.display_exceptions and exc.private_msg:
|
|
exc.private_msg = None # hide it
|
|
request = get_request()
|
|
request.response = HTTPResponse(status=exc.status_code)
|
|
output = self.format_publish_error(exc)
|
|
self.session_manager.finish_successful_request()
|
|
return output
|
|
|
|
def _generate_plaintext_error(self, request, original_response,
|
|
exc_type, exc_value, tb, limit = None):
|
|
debug_cfg = self.cfg.get('debug', {})
|
|
if not self.USE_LONG_TRACES:
|
|
if not request:
|
|
# this happens when an exception is raised by an afterjob
|
|
request = HTTPRequest(None, {})
|
|
if not request.form:
|
|
request.form = {}
|
|
return Publisher._generate_plaintext_error(self, request,
|
|
original_response, exc_type, exc_value, tb)
|
|
|
|
error_file = StringIO()
|
|
|
|
if limit is None:
|
|
if hasattr(sys, 'tracebacklimit'):
|
|
limit = sys.tracebacklimit
|
|
print >>error_file, "Exception:"
|
|
print >>error_file, " type = '%s', value = '%s'" % (exc_type, exc_value)
|
|
print >>error_file
|
|
|
|
# format the traceback
|
|
print >>error_file, 'Stack trace (most recent call first):'
|
|
while tb.tb_next:
|
|
tb = tb.tb_next
|
|
frame = tb.tb_frame
|
|
n = 0
|
|
while frame and (limit is None or n < limit):
|
|
function = frame.f_code.co_name
|
|
filename = frame.f_code.co_filename
|
|
exclineno = frame.f_lineno
|
|
locals = frame.f_locals.items()
|
|
|
|
print >>error_file, ' File "%s", line %s, in %s' % (filename, exclineno, function)
|
|
linecache.checkcache(filename)
|
|
for lineno in range(exclineno-2,exclineno+3):
|
|
line = linecache.getline(filename, lineno, frame.f_globals)
|
|
if line:
|
|
if lineno == exclineno:
|
|
print >>error_file, '>%5s %s' % (lineno, line.rstrip())
|
|
else:
|
|
print >>error_file, ' %5s %s' % (lineno, line.rstrip())
|
|
print >>error_file
|
|
if locals:
|
|
print >>error_file, " locals: "
|
|
for key, value in locals:
|
|
print >>error_file, " %s =" % key,
|
|
try:
|
|
print >>error_file, repr(value),
|
|
except:
|
|
print >>error_file, "<ERROR WHILE PRINTING VALUE>",
|
|
print >>error_file
|
|
print >>error_file
|
|
frame = frame.f_back
|
|
n = n + 1
|
|
|
|
# include request and response dumps
|
|
if request:
|
|
error_file.write('\n')
|
|
error_file.write(request.dump())
|
|
error_file.write('\n')
|
|
|
|
return error_file.getvalue()
|
|
|
|
def notify_of_exception(self, exc_tuple, context=None):
|
|
exc_type, exc_value, tb = exc_tuple
|
|
error_summary = traceback.format_exception_only(exc_type, exc_value)
|
|
error_summary = error_summary[0][0:-1] # de-listify and strip newline
|
|
if context:
|
|
error_summary = '%s %s' % (context, error_summary)
|
|
|
|
plain_error_msg = self._generate_plaintext_error(
|
|
get_request(),
|
|
self,
|
|
exc_type, exc_value,
|
|
tb)
|
|
|
|
try:
|
|
self.logger.log_internal_error(error_summary, plain_error_msg)
|
|
except socket.error:
|
|
# will happen if there is no mail server available and exceptions
|
|
# were configured to be mailed.
|
|
pass
|
|
except OSError:
|
|
# this could happen on file descriptor exhaustion
|
|
pass
|
|
|
|
def finish_successful_request(self):
|
|
Publisher.finish_successful_request(self)
|
|
self.statsd.increment('successful-request')
|
|
|
|
def finish_failed_request(self):
|
|
# duplicate at lot from parent class, just to use our own HTTPResponse
|
|
request = get_request()
|
|
original_response = request.response
|
|
request.response = HTTPResponse()
|
|
|
|
if self.statsd: # maybe unset if very early failure
|
|
self.statsd.increment('failed-request')
|
|
|
|
(exc_type, exc_value, tb) = sys.exc_info()
|
|
|
|
if exc_type is NotImplementedError:
|
|
get_response().set_header('Content-Type', 'text/html') # set back content-type
|
|
return template.error_page(
|
|
_('This feature is not yet implemented.'),
|
|
error_title = _('Sorry'))
|
|
|
|
error_summary = traceback.format_exception_only(exc_type, exc_value)
|
|
error_summary = error_summary[0][0:-1] # de-listify and strip newline
|
|
|
|
plain_error_msg = self._generate_plaintext_error(request,
|
|
original_response,
|
|
exc_type, exc_value,
|
|
tb)
|
|
|
|
request.response.set_header('Content-Type', 'text/html')
|
|
if not self.config.display_exceptions:
|
|
# DISPLAY_EXCEPTIONS is false, so return the most
|
|
# secure (and cryptic) page.
|
|
error_page = self._generate_internal_error(request)
|
|
elif self.config.display_exceptions == 'html':
|
|
# Generate a spiffy HTML display using cgitb
|
|
error_page = self._generate_cgitb_error(request,
|
|
original_response, exc_type, exc_value, tb)
|
|
elif self.config.display_exceptions == 'text-in-html':
|
|
error_page = template.error_page(
|
|
_('The server encountered an internal error and was unable to complete your request.'),
|
|
error_title = _('Internal Server Error'),
|
|
exception = plain_error_msg)
|
|
else:
|
|
# Generate a plaintext page containing the traceback
|
|
request.response.set_header('Content-Type', 'text/plain')
|
|
error_page = plain_error_msg
|
|
|
|
try:
|
|
self.logger.log_internal_error(error_summary, plain_error_msg)
|
|
except socket.error:
|
|
# will happen if there is no mail server available and exceptions
|
|
# were configured to be mailed.
|
|
pass
|
|
|
|
try:
|
|
self.get_app_logger().error('internal server error')
|
|
except:
|
|
pass
|
|
|
|
if exc_type is SystemExit:
|
|
raise
|
|
|
|
request.response.set_status(500)
|
|
self.session_manager.finish_failed_request()
|
|
|
|
return error_page
|
|
|
|
def filter_output(self, request, output):
|
|
response = get_response()
|
|
if response.status_code == 304:
|
|
# clients don't like to receive content with a 304
|
|
return ''
|
|
if response.content_type != 'text/html':
|
|
return output
|
|
if not hasattr(response, 'filter') or not response.filter:
|
|
return output
|
|
return template.decorate(output, response)
|
|
|
|
def get_translation(self, lang):
|
|
'''Retrieve a translation object for the given language and
|
|
the current theme site.'''
|
|
current_theme = self.cfg.get('branding', {}).get('theme', 'default')
|
|
app_translation_path = os.path.join(self.app_dir, 'themes',
|
|
current_theme, '%s.mo' % lang)
|
|
if os.path.exists(app_translation_path):
|
|
trans = gettext.GNUTranslations(file(app_translation_path))
|
|
trans.add_fallback(self.translations[lang])
|
|
else:
|
|
trans = self.translations[lang]
|
|
return trans
|
|
|
|
def install_lang(self, lang = None):
|
|
if lang is None or not self.translations.has_key(lang):
|
|
gettext.install(self.APP_NAME) # will use environment variable to get language
|
|
else:
|
|
self.get_translation(lang).install()
|
|
_1 = _
|
|
__builtin__.__dict__['_'] = lambda x: unicode(_1(str(x)), 'utf-8').encode(self.site_charset)
|
|
__builtin__.__dict__['_1'] = _1
|
|
|
|
def load_site_options(self):
|
|
self.site_options = ConfigParser.ConfigParser()
|
|
site_options_filename = os.path.join(self.app_dir, 'site-options.cfg')
|
|
if not os.path.exists(site_options_filename):
|
|
return
|
|
try:
|
|
self.site_options.read(site_options_filename)
|
|
except:
|
|
self.get_app_logger().error('failed to read site options file')
|
|
return
|
|
|
|
def has_site_option(self, option):
|
|
if self.site_options is None:
|
|
self.load_site_options()
|
|
try:
|
|
return (self.site_options.get('options', option) == 'true')
|
|
except ConfigParser.NoSectionError:
|
|
return False
|
|
except ConfigParser.NoOptionError:
|
|
return False
|
|
|
|
def get_site_option(self, option):
|
|
if self.site_options is None:
|
|
self.load_site_options()
|
|
try:
|
|
return self.site_options.get('options', option)
|
|
except ConfigParser.NoSectionError:
|
|
return None
|
|
except ConfigParser.NoOptionError:
|
|
return None
|
|
|
|
def set_config(self, request = None):
|
|
self.reload_cfg()
|
|
self.site_options = None # reset at the beginning of a request
|
|
debug_cfg = self.cfg.get('debug', {})
|
|
self.logger.error_email = debug_cfg.get('error_email')
|
|
self.config.display_exceptions = debug_cfg.get('display_exceptions')
|
|
self.config.form_tokens = True
|
|
|
|
if request:
|
|
canonical_hostname = request.get_server(clean = False).lower().split(':')[0].rstrip('.')
|
|
if canonical_hostname.count('.') >= 2 and self.etld:
|
|
try:
|
|
socket.inet_aton(canonical_hostname)
|
|
except socket.error:
|
|
# not an IP address
|
|
try:
|
|
base_name = self.etld.parse(canonical_hostname)[1]
|
|
except:
|
|
pass
|
|
else:
|
|
self.config.session_cookie_domain = '.'.join(
|
|
canonical_hostname.split('.')[-2-base_name.count('.'):])
|
|
|
|
md5_hash = md5_new()
|
|
md5_hash.update(self.app_dir)
|
|
self.config.session_cookie_name = self.APP_NAME + '-' + md5_hash.hexdigest()[:6]
|
|
self.config.session_cookie_path = '/'
|
|
|
|
debug_cfg = self.cfg.get('debug', {})
|
|
if debug_cfg.get('logger', False):
|
|
self._app_logger = self.get_app_logger(force = True)
|
|
pass
|
|
else:
|
|
class NullLogger(object):
|
|
def error(*args): pass
|
|
def warn(*args): pass
|
|
def info(*args): pass
|
|
def debug(*args): pass
|
|
self._app_logger = NullLogger()
|
|
|
|
def set_app_dir(self, request):
|
|
'''
|
|
Set the application directory, creating it if possible and authorized.
|
|
'''
|
|
canonical_hostname = request.get_server(clean = False).lower().split(':')[0].rstrip('.')
|
|
if canonical_hostname.startswith('iframe.') or canonical_hostname.startswith('iframe-'):
|
|
canonical_hostname = canonical_hostname[7:]
|
|
script_name = request.get_header('SCRIPT_NAME', '').strip('/')
|
|
self.app_dir = os.path.join(self.APP_DIR, canonical_hostname)
|
|
|
|
if script_name:
|
|
if script_name.startswith('iframe.'):
|
|
script_name = script_name[7:]
|
|
script_name = script_name.replace('/', '+')
|
|
self.app_dir += '+' + script_name
|
|
|
|
if not os.path.exists(self.app_dir):
|
|
if not self.auto_create_appdir:
|
|
if self.missing_appdir_redirect:
|
|
raise ImmediateRedirectException(self.missing_appdir_redirect)
|
|
else:
|
|
raise errors.TraversalError()
|
|
try:
|
|
os.makedirs(self.app_dir)
|
|
except OSError, e:
|
|
pass
|
|
|
|
def initialize_app_dir(self):
|
|
'''If empty initialize the application directory with default
|
|
configuration. Returns True if initialization has been done.'''
|
|
if self.default_configuration_path and len(os.listdir(self.app_dir)) == 0:
|
|
# directory just got created, we should import some configuration...
|
|
if os.path.isabs(self.default_configuration_path):
|
|
path = self.default_configuration_path
|
|
else:
|
|
path = os.path.join(self.DATA_DIR, self.default_configuration_path)
|
|
self.cfg = self.import_cfg(path)
|
|
self.write_cfg()
|
|
return True
|
|
return False
|
|
|
|
def try_publish(self, request):
|
|
try:
|
|
self.set_app_dir(request)
|
|
except ImmediateRedirectException, e:
|
|
return redirect(e.location)
|
|
|
|
from vendor import pystatsd
|
|
self.statsd = pystatsd.Client(
|
|
host=request.get_environ('QOMMON_STATSD_HOSTNAME', 'localhost'),
|
|
prefix='%s.%s' % (
|
|
self.APP_NAME,
|
|
os.path.split(self.app_dir)[-1].replace('+', '-').replace('.', '-')))
|
|
|
|
self.initialize_app_dir()
|
|
|
|
canonical_hostname = request.get_server(False).lower().split(':')[0].rstrip('.')
|
|
if canonical_hostname.startswith('iframe.') or canonical_hostname.startswith('iframe-'):
|
|
request.response.iframe_mode = True
|
|
script_name = request.get_header('SCRIPT_NAME', '').strip('/')
|
|
if script_name.startswith('iframe.'):
|
|
request.response.iframe_mode = True
|
|
|
|
if request.response.iframe_mode:
|
|
request.response.page_template_key = 'iframe'
|
|
if request.get_environ('QOMMON_PAGE_TEMPLATE_KEY'):
|
|
request.response.page_template_key = request.get_environ('QOMMON_PAGE_TEMPLATE_KEY')
|
|
|
|
self.set_config(request)
|
|
request.language = self.get_site_language()
|
|
self.install_lang(request.language)
|
|
self.substitutions.reset()
|
|
self.substitutions.feed(self)
|
|
self.substitutions.feed(request)
|
|
for extra_source in self.extra_sources:
|
|
self.substitutions.feed(extra_source(self, request))
|
|
return Publisher.try_publish(self, request)
|
|
|
|
def get_site_language(self):
|
|
lang = self.cfg.get('language', {}).get('language', None)
|
|
if lang == 'HTTP':
|
|
request = self.get_request()
|
|
if not request:
|
|
return None
|
|
lang = None
|
|
accepted_languages = request.get_header('Accept-Language')
|
|
if accepted_languages:
|
|
accepted_languages = [x.strip() for x in accepted_languages.split(',')]
|
|
# forget about subtag and quality value
|
|
accepted_languages = [x.split('-')[0] for x in accepted_languages]
|
|
for l in accepted_languages:
|
|
if l in self.translations.keys():
|
|
return l
|
|
return lang
|
|
|
|
def get_admin_module(cls):
|
|
return None
|
|
get_admin_module = classmethod(get_admin_module)
|
|
|
|
def get_backoffice_module(cls):
|
|
return None
|
|
get_backoffice_module = classmethod(get_backoffice_module)
|
|
|
|
def get_admin_root(self):
|
|
return self.root_directory.admin
|
|
|
|
def get_backoffice_root(self):
|
|
try:
|
|
return self.root_directory.backoffice
|
|
except AttributeError:
|
|
return None
|
|
|
|
ident_methods = None
|
|
def register_ident_methods(self):
|
|
try:
|
|
import lasso
|
|
except ImportError:
|
|
lasso = None
|
|
classes = []
|
|
if lasso:
|
|
import qommon.ident.idp
|
|
classes.append(qommon.ident.idp.IdPAuthMethod)
|
|
import qommon.ident.password
|
|
classes.append(qommon.ident.password.PasswordAuthMethod)
|
|
self.ident_methods = {}
|
|
for klass in classes:
|
|
self.ident_methods[klass.key] = klass
|
|
klass.register()
|
|
|
|
cronjobs = None
|
|
def register_cronjob(cls, cronjob):
|
|
if not cls.cronjobs:
|
|
cls.cronjobs = []
|
|
cls.cronjobs.append(cronjob)
|
|
register_cronjob = classmethod(register_cronjob)
|
|
|
|
def clean_sessions(self):
|
|
cleaning_lock_file = os.path.join(self.app_dir, 'cleaning_sessions.lock')
|
|
fd = os.open(cleaning_lock_file, os.O_RDONLY | os.O_CREAT, 0666)
|
|
try:
|
|
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except IOError:
|
|
# lock is currently held, that is fine.
|
|
return
|
|
try:
|
|
manager = self.session_manager_class()
|
|
now = time.time()
|
|
one_week_ago = now - 7*86400
|
|
one_month_ago = now - 30*86400
|
|
for session_key in manager.keys():
|
|
try:
|
|
session = manager.get(session_key)
|
|
if session._access_time < one_week_ago or session._creation_time < one_month_ago:
|
|
del manager[session.id]
|
|
except (AttributeError, KeyError):
|
|
# the key is impossible to load or the session is malformed
|
|
# delete it anyway, but only catch KeyError (other errors could be useful)
|
|
try:
|
|
del manager[session_key]
|
|
except KeyError:
|
|
pass
|
|
continue
|
|
finally:
|
|
os.close(fd)
|
|
|
|
def clean_afterjobs(self):
|
|
now = time.time()
|
|
for job in AfterJob.select():
|
|
if job.status == 'completed' and (now - job.completion_time) > 3600:
|
|
# completed for more than one hour
|
|
job.remove_self()
|
|
elif (now - job.creation_time) > 2 * 86400:
|
|
# started more than two days ago, probably aborted job
|
|
job.remove_self()
|
|
|
|
def load_effective_tld_names(cls):
|
|
filename = os.path.join(cls.DATA_DIR, 'vendor', 'effective_tld_names.dat')
|
|
if not os.path.exists(filename):
|
|
cls.etld = None
|
|
return
|
|
from vendor import etld
|
|
cls.etld = etld.etld(filename)
|
|
load_effective_tld_names = classmethod(load_effective_tld_names)
|
|
|
|
def create_publisher(cls):
|
|
cls.load_extra_dirs()
|
|
cls.load_translations()
|
|
cls.register_cronjob(CronJob(cls.clean_sessions, minutes = range(0, 60, 5)))
|
|
cls.register_cronjob(CronJob(cls.clean_afterjobs, minutes = [0]))
|
|
cls.load_effective_tld_names()
|
|
|
|
publisher = cls(cls.root_directory_class(),
|
|
session_cookie_name = cls.APP_NAME,
|
|
session_cookie_path = '/',
|
|
error_log = cls.ERROR_LOG)
|
|
publisher.substitutions = Substitutions()
|
|
publisher.app_dir = cls.APP_DIR
|
|
publisher.data_dir = cls.DATA_DIR
|
|
if not os.path.exists(publisher.app_dir):
|
|
os.mkdir(publisher.app_dir)
|
|
|
|
publisher.register_ident_methods()
|
|
publisher.set_session_manager(cls.session_manager_class())
|
|
publisher.set_config()
|
|
return publisher
|
|
create_publisher = classmethod(create_publisher)
|
|
|
|
extra_dirs = None
|
|
def register_extra_dir(cls, dir):
|
|
if not cls.extra_dirs:
|
|
cls.extra_dirs = []
|
|
cls.extra_dirs.append(dir)
|
|
register_extra_dir = classmethod(register_extra_dir)
|
|
|
|
def load_extra_dirs(cls):
|
|
for extra_dir in cls.extra_dirs or []:
|
|
if not os.path.exists(extra_dir):
|
|
continue
|
|
sys.path.append(extra_dir)
|
|
for filename in os.listdir(extra_dir):
|
|
if not filename.endswith('.py'):
|
|
continue
|
|
modulename = filename[:-3]
|
|
fp, pathname, description = imp.find_module(modulename, [extra_dir])
|
|
try:
|
|
imp.load_module(modulename, fp, pathname, description)
|
|
except:
|
|
raise
|
|
self.get_app_logger().warn('failed to load extra module: %s' % modulename)
|
|
if fp:
|
|
fp.close()
|
|
load_extra_dirs = classmethod(load_extra_dirs)
|
|
|
|
|
|
translation_domains = None
|
|
def register_translation_domain(cls, name):
|
|
if not cls.translation_domains:
|
|
cls.translation_domains = []
|
|
cls.translation_domains.append(name)
|
|
register_translation_domain = classmethod(register_translation_domain)
|
|
|
|
supported_languages = None
|
|
def load_translations(cls):
|
|
cls.translations = {
|
|
'en': gettext.NullTranslations(),
|
|
}
|
|
|
|
for lang in cls.supported_languages or []:
|
|
try:
|
|
cls.translations[lang] = gettext.translation(cls.APP_NAME, languages=[lang])
|
|
except IOError:
|
|
pass
|
|
|
|
if cls.translation_domains:
|
|
for domain in cls.translation_domains:
|
|
for lang in cls.supported_languages or []:
|
|
if not lang in cls.translations.keys():
|
|
continue
|
|
try:
|
|
trans = gettext.translation(domain, languages=[lang])
|
|
except IOError:
|
|
continue
|
|
cls.translations[lang]._catalog.update(trans._catalog)
|
|
load_translations = classmethod(load_translations)
|
|
|
|
cfg = None
|
|
def write_cfg(self):
|
|
s = cPickle.dumps(self.cfg)
|
|
filename = os.path.join(self.app_dir, 'config.pck')
|
|
storage.atomic_write(filename, s)
|
|
|
|
def reload_cfg(self):
|
|
filename = os.path.join(self.app_dir, 'config.pck')
|
|
try:
|
|
self.cfg = cPickle.load(file(filename))
|
|
except:
|
|
self.cfg = {}
|
|
|
|
def export_cfg(self):
|
|
if not ET:
|
|
# XXX: appropriate exception class
|
|
raise 'missing ElementTree'
|
|
root = ET.Element('settings')
|
|
for k in self.cfg:
|
|
part = ET.SubElement(root, k)
|
|
for k2 in self.cfg[k]:
|
|
elem = ET.SubElement(part, k2)
|
|
val = self.cfg[k][k2]
|
|
if val is None:
|
|
pass
|
|
|
|
elif type(val) is dict:
|
|
for k3, v3 in val.items():
|
|
ET.SubElement(elem, k3).text = str(v3)
|
|
|
|
elif type(val) is list:
|
|
for v in val:
|
|
ET.SubElement(elem, 'item').text = v
|
|
|
|
elif type(val) in (str, unicode):
|
|
elem.text = val
|
|
|
|
else:
|
|
elem.text = str(val)
|
|
|
|
return ET.tostring(root)
|
|
|
|
def import_cfg(self, filename):
|
|
if not ET:
|
|
# XXX: appropriate exception class
|
|
self.get_app_logger().warn('failed to import config from; ElementTree is missing')
|
|
raise 'missing ElementTree'
|
|
|
|
try:
|
|
tree = ET.parse(open(filename))
|
|
except:
|
|
self.get_app_logger().warn('failed to import config from; failed to parse: %s' % filename)
|
|
raise
|
|
|
|
if tree.getroot().tag != 'settings':
|
|
self.get_app_logger().warn('failed to import config; not a settings file: %s' % filename)
|
|
return
|
|
|
|
cfg = {}
|
|
for elem in tree.getroot().getchildren():
|
|
sub_cfg = {}
|
|
cfg[elem.tag] = sub_cfg
|
|
for child in elem.getchildren():
|
|
sub_cfg[child.tag] = None
|
|
if child.getchildren():
|
|
if child.getchildren()[0].tag == 'item':
|
|
# list
|
|
sub_cfg[child.tag] = []
|
|
for c in child.getchildren():
|
|
if c.text in ('True', 'False'):
|
|
value = (c.text == 'True')
|
|
else:
|
|
value = c.text.encode(self.site_charset)
|
|
sub_cfg[child.tag].append(value)
|
|
else:
|
|
# dict
|
|
sub_cfg[child.tag] = {}
|
|
for c in child.getchildren():
|
|
if c.text in ('True', 'False'):
|
|
value = (c.text == 'True')
|
|
else:
|
|
value = c.text.encode(self.site_charset)
|
|
sub_cfg[child.tag][c.tag] = c
|
|
else:
|
|
text = child.text
|
|
if text is None:
|
|
sub_cfg[child.tag] = None
|
|
continue
|
|
try:
|
|
sub_cfg[child.tag] = int(text)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
else:
|
|
continue
|
|
if text in ('False', 'True'):
|
|
sub_cfg[child.tag] = eval(text)
|
|
else:
|
|
sub_cfg[child.tag] = text.encode(self.site_charset)
|
|
|
|
return cfg
|
|
|
|
def process(self, stdin, env):
|
|
request = HTTPRequest(stdin, env)
|
|
self.response = self.process_request(request)
|
|
return self.response
|
|
|
|
_app_logger = None
|
|
def get_app_logger_filename(self):
|
|
return os.path.join(self.app_dir, '%s.log' % self.APP_NAME)
|
|
|
|
def get_app_logger(self, force=False):
|
|
debug = self.cfg.get('debug', {}).get('debug_mode', False)
|
|
if self._app_logger and not force:
|
|
return self._app_logger
|
|
|
|
self._app_logger = logging.getLogger(self.APP_NAME + self.app_dir)
|
|
if not self._app_logger.filters:
|
|
self._app_logger.addFilter(logger.BotFilter())
|
|
logfile = self.get_app_logger_filename()
|
|
|
|
if not self._app_logger.handlers:
|
|
hdlr = logging.handlers.RotatingFileHandler(logfile, 'a', 2**20, 100)
|
|
# max size = 1M
|
|
formatter = logger.Formatter(
|
|
'%(asctime)s %(levelname)s %(address)s '\
|
|
'%(session_id)s %(path)s %(user_id)s - %(message)s')
|
|
hdlr.setFormatter(formatter)
|
|
self._app_logger.addHandler(hdlr)
|
|
if debug:
|
|
self._app_logger.setLevel(logging.DEBUG)
|
|
else:
|
|
self._app_logger.setLevel(logging.INFO)
|
|
return self._app_logger
|
|
def sitecharset2utf8(self, str):
|
|
'''Convert a string in site encoding to UTF-8'''
|
|
return str.decode(self.site_charset).encode('utf-8')
|
|
def utf82sitecharset(self, str):
|
|
return str.decode('utf-8').encode(self.site_charset)
|
|
|
|
def get_substitution_variables(self):
|
|
import misc
|
|
return {
|
|
'site_name': get_cfg('misc', {}).get('sitename', None),
|
|
'site_theme': get_cfg('branding', {}).get('theme', self.default_theme),
|
|
'site_url': self.get_frontoffice_url(),
|
|
'site_url_backoffice': self.get_backoffice_url(),
|
|
'site_lang': (get_request() and get_request().language) or 'en',
|
|
'today': strftime.strftime(misc.date_format(), time.localtime()),
|
|
'now': misc.localstrftime(time.localtime()),
|
|
}
|
|
|
|
extra_sources = []
|
|
def register_extra_source(cls, source):
|
|
'''Register a new static source of substitution variable for this publisher class.
|
|
|
|
source must be a callable taking two arguments: a publisher and a
|
|
request, and returning an object with a get_substitution_variables()
|
|
method.
|
|
'''
|
|
if not cls.extra_sources:
|
|
cls.extra_sources = []
|
|
cls.extra_sources.append(source)
|
|
register_extra_source = classmethod(register_extra_source)
|
|
|
|
|
|
def get_cfg(key, default = None):
|
|
r = get_publisher().cfg.get(key, default)
|
|
if not r:
|
|
return {}
|
|
return r
|
|
|
|
def get_logger():
|
|
return get_publisher().get_app_logger()
|
|
|
|
_publisher_class = None
|
|
|
|
def set_publisher_class(klass):
|
|
global _publisher_class
|
|
_publisher_class = klass
|
|
|
|
def get_publisher_class():
|
|
return _publisher_class
|
|
|
|
def sitecharset2utf8(str):
|
|
'''Convert a string in site encoding to UTF-8'''
|
|
return get_publisher().sitecharset2utf8(str)
|
|
|
|
def utf82sitecharset(str):
|
|
return get_publisher().utf82sitecharset(str)
|
|
|
|
|
|
Substitutions.register('site_name', category=N_('General'), comment=N_('Site Name'))
|
|
Substitutions.register('site_theme', category=N_('General'), comment=N_('Current Theme Name'))
|
|
Substitutions.register('site_url', category=N_('General'), comment=N_('Site URL'))
|
|
Substitutions.register('site_url_backoffice', category=N_('General'), comment=N_('Site URL (backoffice)'))
|
|
Substitutions.register('today', category=N_('General'), comment=N_('Current Date'))
|
|
Substitutions.register('now', category=N_('General'), comment=N_('Current Date & Time'))
|