wcs/wcs/compat.py

274 lines
10 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2013 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import os
from threading import Lock
from contextlib import contextmanager
from django.utils import six
from django.utils.deprecation import MiddlewareMixin
from quixote import get_publisher, get_request
from quixote.errors import PublishError
from quixote.http_request import Upload
from django.http import HttpResponse
from django.conf import settings
from django.template import loader, TemplateDoesNotExist
from django.template.response import TemplateResponse
from django.views.generic.base import TemplateView
from .qommon import force_str, template
from .qommon.publisher import get_cfg, set_publisher_class
from .publisher import WcsPublisher
from .qommon.http_request import HTTPRequest
class TemplateWithFallbackView(TemplateView):
quixote_response = None
def get(self, request, *args, **kwargs):
try:
loader.get_template(self.template_name)
except TemplateDoesNotExist:
return quixote(self.request)
try:
context = self.get_context_data(**kwargs)
self.quixote_response = request.quixote_request.response
except PublishError as exc:
context = {'body': get_publisher().finish_interrupted_request(exc)}
self.quixote_response = get_request().response
except Exception as exc:
context = {'body': get_publisher().finish_failed_request()}
self.quixote_response = get_request().response
if self.quixote_response.content_type != 'text/html' or self.quixote_response.status_code != 200:
response = HttpResponse(context['body'])
response.status_code = self.quixote_response.status_code
response.reason_phrase = self.quixote_response.reason_phrase
elif request.META.get('HTTP_X_POPUP') == 'true':
response = HttpResponse('<div class="popup-content">%s</div>' % context['body'])
elif 'raw' in (getattr(self.quixote_response, 'filter') or {}):
# used for theme preview (generated in /backoffice/ but cannot
# obviously receive the admin template.
response = HttpResponse(context['body'])
else:
response = self.render_to_response(context)
for name, value in self.quixote_response.generate_headers():
if name in ('Connection', 'Content-Length'):
continue
response[name] = value
response.after_jobs = self.quixote_response.after_jobs
return response
def render_to_response(self, context, **response_kwargs):
django_response = super(TemplateWithFallbackView, self).render_to_response(context, **response_kwargs)
if self.quixote_response and self.quixote_response.status_code != 200:
django_response.status_code = self.quixote_response.status_code
django_response.reason_phrase = self.quixote_response.reason_phrase
for name, value in self.quixote_response.generate_headers():
if name in ('Connection', 'Content-Length'):
continue
django_response[name] = value
return django_response
class CompatHTTPRequest(HTTPRequest):
def __init__(self, request):
self.django_request = request
self.django_request.quixote_request = self
self.response = None
request.environ['SCRIPT_NAME'] = str(request.environ['SCRIPT_NAME'])
request.environ['PATH_INFO'] = force_str(request.environ['PATH_INFO'])
self.environ = self.django_request.META
HTTPRequest.__init__(self, None, request.environ)
self.scheme = str(self.django_request.scheme)
def _process_urlencoded(self, length, params):
return self._process_multipart(length, params)
def _process_multipart(self, length, params):
# Make sure request.form doesn't contain unicode strings, converting
# them all to strings in the site charset; it would contain unicode
# strings when the user agent specifies a charset in a mime content
# part, such a behaviour appears with some Nokia phones (6020, 6300)
site_charset = get_publisher().site_charset
# parse multipart data with the charset of the website
if 'charset' not in params:
params['charset'] = site_charset
if not self.form:
self.form = {}
for k in self.django_request.POST:
v = self.django_request.POST[k]
if k.endswith('[]'):
v = [x for x in self.django_request.POST.getlist(k)]
self.form[k] = v
for k, upload_file in self.django_request.FILES.items():
upload = Upload(upload_file.name,
upload_file.content_type,
upload_file.charset)
upload.fp = upload_file.file
self.form[k] = upload
def build_absolute_uri(self):
return self.django_request.build_absolute_uri()
class CompatWcsPublisher(WcsPublisher):
def filter_output(self, request, output):
response = self.get_request().response
if response.status_code == 304:
# clients don't like to receive content with a 304
return ''
if response.content_type != 'text/html':
return output
if not hasattr(response, 'filter') or not response.filter:
return output
if request.META.get('HTTP_X_POPUP') == 'true':
return '<div class="popup-content">%s</div>' % output
if response.filter and response.filter.get('admin_ezt'):
return self.render_response(output)
current_theme = get_cfg('branding', {}).get('theme', 'default')
theme_directory = template.get_theme_directory(current_theme)
if not theme_directory:
return self.render_response(output)
if not os.path.exists(os.path.join(theme_directory, 'templates')):
return self.render_response(output)
if not os.path.exists(os.path.join(theme_directory, 'templates/wcs/base.html')):
return self.render_response(output)
if isinstance(output, template.QommonTemplateResponse):
template_response = output
else:
template_response = template.QommonTemplateResponse(
templates=['wcs/base.html'],
context={'body': output})
return self.render_template(
request,
response,
template_response)
def render_template(self, request, response, template_response):
template_response.add_media()
context = template.get_decorate_vars(
template_response.context.get('body'),
response,
generate_breadcrumb=False,
template_context=template_response.context)
context['request'] = request.django_request
context.update(template_response.context)
django_response = TemplateResponse(request.django_request,
template_response.templates,
context,
content_type=response.content_type,
status=response.status_code)
return django_response
def set_app_dir(self, request):
super(CompatWcsPublisher, self).set_app_dir(request)
settings.THEME_SKELETON_URL = self.get_site_option('theme_skeleton_url')
def process_request(self, request):
self._set_request(request)
try:
self.parse_request(request)
self.init_publish(request)
output = self.try_publish(request)
except PublishError as exc:
output = self.finish_interrupted_request(exc)
except Exception as exc:
output = self.finish_failed_request()
response = request.response
output = self.filter_output(request, output)
if isinstance(output, TemplateResponse):
django_response = output
django_response.render()
else:
content = output
django_response = HttpResponse(content,
content_type=response.content_type,
status=response.status_code,
reason=response.reason_phrase)
if not request.ignore_session:
# it is necessary to save the session one last time as the actual
# rendering may have altered it (for example a form would add its
# token).
self.session_manager.finish_successful_request()
for name, value in response.generate_headers():
if name in ('Connection', 'Content-Length'):
continue
django_response[name] = value
django_response.after_jobs = response.after_jobs
self._clear_request()
return django_response
# keep a lock during quixote processing as it's not meant to work with threads;
# the publisher instance can't be shared for concurrent requests.
quixote_lock = Lock()
def quixote(request):
pub = get_publisher()
return pub.process_request(pub.get_request())
@contextmanager
def request(request):
pub = get_publisher()
yield
pub._clear_request()
class PublishErrorMiddleware(MiddlewareMixin):
def process_exception(self, request, exception):
if not isinstance(exception, PublishError):
return None
request = get_request()
exception_body = exception.render()
django_response = HttpResponse(exception_body,
content_type=request.response.content_type,
status=request.response.status_code,
reason=request.response.reason_phrase)
for name, value in request.response.generate_headers():
if name in ('Connection', 'Content-Length'):
continue
django_response[name] = value
return django_response
set_publisher_class(CompatWcsPublisher)