wcs/wcs/qommon/http_response.py

157 lines
6.1 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import time
import traceback
import sys
from quixote.util import randbytes
import quixote.http_response
from quixote import get_publisher, get_request
from qommon.storage import StorableObject
class AfterJob(StorableObject):
_names = 'afterjobs'
label = None
status = None
creation_time = None
completion_time = None
class HTTPResponse(quixote.http_response.HTTPResponse):
iframe_mode = False
javascript_scripts = None
javascript_code_parts = None
css_includes = None
after_jobs = None
page_template_key = None
def __init__(self, charset=None, **kwargs):
quixote.http_response.HTTPResponse.__init__(self, charset = charset, **kwargs)
if not charset:
self.charset = get_publisher().site_charset
def redirect(self, location, permanent=False):
if not self.iframe_mode:
return quixote.http_response.HTTPResponse.redirect(self, location, permanent)
req = get_request()
server_url = '%s://%s' % (req.get_scheme(), req.get_server())
if location.startswith(server_url):
iframe_server_url = '%s://%s' % (req.get_scheme(), req.get_server(clean=False))
location = iframe_server_url + location[len(server_url):]
return quixote.http_response.HTTPResponse.redirect(self, location, permanent)
def add_javascript(self, script_names):
if not self.javascript_scripts:
self.javascript_scripts = []
mappings = {
'jquery.js': '../../static/xstatic/jquery.js',
'jquery-ui.js': '../../static/xstatic/jquery-ui.js',
}
for script_name in script_names:
mapped_script_name = mappings.get(script_name) or script_name
if not mapped_script_name in self.javascript_scripts:
if script_name == 'qommon.map.js':
self.add_javascript(['jquery.js'])
self.add_javascript(['../../leaflet/leaflet.js'])
self.add_css_include('../../leaflet/leaflet.css')
if script_name == 'jquery-ui.js':
self.add_css_include('../../static/xstatic/themes/smoothness/jquery-ui.min.css')
self.javascript_scripts.append(str(mapped_script_name))
if script_name == 'afterjob.js':
self.add_javascript_code('var QOMMON_ROOT_URL = "%s";\n' % \
get_publisher().get_application_static_files_root_url())
if script_name == 'qommon.geolocation.js':
self.add_javascript(['jquery.js'])
self.add_javascript_code('var WCS_ROOT_URL = "%s";\n' % \
get_publisher().get_frontoffice_url())
def add_javascript_code(self, code):
if not self.javascript_code_parts:
self.javascript_code_parts = []
if not code in self.javascript_code_parts:
self.javascript_code_parts.append(code)
def get_javascript_for_header(self):
s = ''
if self.javascript_scripts:
root_url = get_publisher().get_root_url() + get_publisher().qommon_static_dir
s += '\n'.join(['<script type="text/javascript" src="%sjs/%s"></script>' % (
root_url, str(x)) for x in self.javascript_scripts
if not x[0] == '/'])
s += '\n'
s += '\n'.join(['<script type="text/javascript" src="%sjs/%s"></script>' % (
get_publisher().get_root_url(), x[1:]) for x in self.javascript_scripts
if x[0] == '/'])
s += '\n'
if self.javascript_code_parts:
s += '<script type="text/javascript">'
s += '\n'.join(self.javascript_code_parts)
s += '\n</script>\n'
return s
def add_css_include(self, css_include):
if not self.css_includes:
self.css_includes = []
if not css_include in self.css_includes:
self.css_includes.append(css_include)
def get_css_includes_for_header(self):
if not self.css_includes:
return ''
root_url = get_publisher().get_root_url() + get_publisher().qommon_static_dir
return '\n'.join(['<link rel="stylesheet" type="text/css" href="%scss/%s" />' % (
root_url, x) for x in self.css_includes])
def add_after_job(self, label, cmd, fire_and_forget = False):
if not self.after_jobs:
self.after_jobs = []
job_id = randbytes(8)
job = AfterJob(id = job_id)
job.label = label
job.creation_time = time.time()
job.status = N_('registered')
if fire_and_forget:
job.id = None
else:
job.store()
self.after_jobs.append((job, cmd))
return job
def process_after_jobs(self):
if not self.after_jobs:
return
for job, job_function in self.after_jobs:
if job.completion_time:
continue
job.status = N_('running')
if job.id:
job.store()
try:
job_function(job=job)
except:
get_publisher().notify_of_exception(sys.exc_info())
job.exception = traceback.format_exc()
job.status = N_('failed')
else:
job.status = N_('completed')
job.completion_time = time.time()
if job.id:
job.store()