combo/combo/public/views.py

541 lines
20 KiB
Python

# combo - content management system
# Copyright (C) 2014 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import re
import django
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import logout as auth_logout
from django.contrib.auth import views as auth_views
from django.contrib.auth.models import User
from django.core import signing
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
from django.db import transaction
from django.http import (Http404, HttpResponse, HttpResponseRedirect,
HttpResponsePermanentRedirect)
from django.shortcuts import render, resolve_url
from django.template import engines
from django.template.loader import get_template, TemplateDoesNotExist
from django.utils import lorem_ipsum, timezone
from django.utils.encoding import force_text
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.six.moves.urllib import parse as urllib
from django.views.decorators.csrf import csrf_exempt
from django.utils.translation import ugettext as _
from django.forms.widgets import Media
from haystack.inputs import AutoQuery
from haystack.query import SearchQuerySet, SQ
if 'mellon' in settings.INSTALLED_APPS:
from mellon.utils import get_idps
from mellon.models import UserSAMLIdentifier
else:
get_idps = lambda: []
UserSAMLIdentifier = None
from combo.data.models import (CellBase, PostException, Page, Redirect,
ParentContentCell, TextCell, PageSnapshot)
from combo.profile.models import Profile
from combo.apps.search.models import SearchCell
from combo import utils
def login(request, *args, **kwargs):
if any(get_idps()):
if not 'next' in request.GET:
return HttpResponseRedirect(resolve_url('mellon_login'))
return HttpResponseRedirect(resolve_url('mellon_login') + '?next='
+ urllib.quote(request.GET.get('next')))
return auth_views.login(request, *args, **kwargs)
def logout(request, next_page=None):
if any(get_idps()):
return HttpResponseRedirect(resolve_url('mellon_logout'))
auth_logout(request)
if next_page is not None:
next_page = resolve_url(next_page)
else:
next_page = '/'
return HttpResponseRedirect(next_page)
def modify_global_context(request, ctx):
if 'user_id' in ctx:
try:
ctx['selected_user'] = User.objects.get(id=ctx['user_id'])
except (User.DoesNotExist, ValueError):
pass
if 'name_id' in ctx and UserSAMLIdentifier:
try:
ctx['selected_user'] = UserSAMLIdentifier.objects.get(name_id=ctx['name_id']).user
except UserSAMLIdentifier.DoesNotExist:
pass
@csrf_exempt
def ajax_page_cell(request, page_pk, cell_reference):
try:
page = Page.objects.get(id=page_pk)
except Page.DoesNotExist:
# check it's not from a snapshots
try:
page = Page.snapshots.get(id=page_pk)
except Page.DoesNotExist:
raise Http404()
# as it's from a snapshot access is limited to managers
if not (request.user and request.user.is_staff):
raise PermissionDenied()
if not page.is_visible(request.user):
raise PermissionDenied()
try:
cell = CellBase.get_cell(cell_reference, page_id=page_pk)
except ObjectDoesNotExist:
raise Http404()
if not cell.is_visible(request.user):
raise PermissionDenied()
exception = None
action_response = None
if request.method == 'POST':
if not hasattr(cell, 'post'):
raise PermissionDenied()
try:
action_response = cell.post(request)
except PostException as e:
exception = e
if not request.is_ajax():
messages.error(request, force_text(e) if force_text(e) != 'None' else _('Error sending data.'))
if action_response:
return HttpResponse(
action_response.content,
content_type=action_response.headers['Content-Type'])
if not request.is_ajax():
return HttpResponseRedirect(cell.page.get_online_url())
response = render_cell(request, cell)
if exception:
response['x-error-message'] = force_text(exception)
return response
def render_cell(request, cell):
context = {
'page': cell.page if cell.page_id else None,
'request': request,
'cell': cell,
'synchronous': True,
'site_base': request.build_absolute_uri('/')[:-1],
'absolute_uri': request.build_absolute_uri
}
if request.GET.get('ctx'):
context.update(signing.loads(request.GET['ctx']))
modify_global_context(request, context)
if cell.page_id:
other_cells = []
for klass in CellBase.get_cell_classes(lambda x: bool(x.modify_global_context)):
other_cells.extend(klass.objects.filter(page_id=cell.page_id))
other_cells = [x for x in other_cells if x.is_visible(user=request.user)]
other_cells.sort(key=lambda x: x.order)
for other_cell in other_cells:
if other_cell.get_reference() != cell.get_reference():
other_cell.modify_global_context(context, request)
elif cell.modify_global_context:
# Cell can pass data through its own __dict__
cell.modify_global_context(context, request)
template = engines['django'].from_string('{% load combo %}{% render_cell cell %}')
return HttpResponse(template.render(context, request), content_type='text/html')
def extend_with_parent_cells(cells, hierarchy):
if len(hierarchy) == 1 and hierarchy[0].slug == 'index':
# home page cannot contain parent cells
return
for cell in cells[:]:
if not isinstance(cell, ParentContentCell):
continue
idx = cells.index(cell)
cells[idx:idx+1] = cell.get_parents_cells(hierarchy=hierarchy[:-1])
def should_check_badges():
check_badges = False
for klass in CellBase.get_cell_classes(lambda x: bool(x.get_badge)):
check_badges = klass.objects.all().exists()
if check_badges:
break
return check_badges
def skeleton(request):
# Skeleton rendering is used to dynamically produce base templates to use
# in other applications, based on configured combo cells.
#
# It takes a ?source= parameter that should contain the URL we want a
# template for; it will be used to match the corresponding page, and thus
# the corresponding content.
#
# If there's no matching page, the ?source= parameter will be evaluated
# against the known services (settings.KNOWN_SERVICES) and an empty page
# will be created to be used as skeleton.
#
# If there was no matching page and no matching service an error will be
# raised. (403 Access Forbidden)
#
# While placeholders holding cells will get their cells rendered, empty
# placeholders will get themself outputted as template blocks, named
# placeholder-$name, and with a default content of a block named $name.
#
# ex:
# {% block placeholder-content %}
# {% block content %}
# {% endblock %}
# {% endblock %}
if 'source' not in request.GET:
raise PermissionDenied()
source = request.GET['source']
if source == '404':
request.extra_context_data = {'site_base': request.build_absolute_uri('/')[:-1]}
response = error404(request)
response.status_code = 200
return response
parsed_source = urlparse.urlparse(source)
netloc = parsed_source.netloc
if parsed_source.scheme == 'https' and netloc.endswith(':443'):
# somme HTTP client (like Mozilla/1.1 (compatible; MSPIE 2.0; Windows
# CE)) will make request with an explicit :443 port in the Host header;
# it will then be used in constructing page URL and will end up in the
# ?source= parameter of this call. Remove it.
netloc = netloc.replace(':443', '')
selected_page = None
same_domain_pages = []
# look in redirect pages after the best match for the source
redirect_pages = Page.objects.exclude(redirect_url__isnull=True).exclude(redirect_url='')
for page in redirect_pages:
try:
redirect_url = utils.get_templated_url(page.redirect_url)
except utils.TemplateError:
continue
if source.startswith(redirect_url):
if selected_page is None or len(redirect_url) > len(selected_page.get_redirect_url()):
selected_page = page
if urlparse.urlparse(redirect_url).netloc == netloc:
same_domain_pages.append(page)
if selected_page is None and same_domain_pages:
# if there was no page found, get a domain match
selected_page = same_domain_pages[0]
if selected_page is None:
# if there's still no page found, look in KNOWN_SERVICES, and
# return an empty page as template
for service_id in settings.KNOWN_SERVICES or {}:
for service_key in settings.KNOWN_SERVICES[service_id]:
service = settings.KNOWN_SERVICES[service_id][service_key]
if urlparse.urlparse(service.get('url')).netloc == netloc:
selected_page = Page()
selected_page.id = '__root'
selected_page.template_name = 'standard'
break
else:
continue
break
else:
raise PermissionDenied()
# add default ParentContentCells to the page
cells = []
for placeholder in selected_page.get_placeholders(request=request):
if placeholder.acquired:
cells.append(ParentContentCell(page=selected_page, placeholder=placeholder.key, order=0))
else:
cells = CellBase.get_cells(page=selected_page)
pages = selected_page.get_parents_and_self()
combo_template = settings.COMBO_PUBLIC_TEMPLATES[selected_page.template_name]
extend_with_parent_cells(cells, hierarchy=pages)
ctx = {
'page': selected_page,
'page_cells': cells,
'pages': pages,
'request': request,
'render_skeleton': True,
'check_badges': should_check_badges(),
'site_base': request.build_absolute_uri('/')[:-1],
}
template_name = combo_template['template']
response = render(request, template_name, ctx)
response['X-Combo-Page-Id'] = str(selected_page.id)
response['X-Combo-Skeleton-Pages'] = json.dumps(
dict([(x.id, utils.get_templated_url(x.redirect_url)) for x in same_domain_pages]))
return response
class StyleDemoPage(object):
def __init__(self, request):
self.request = request
self.template_name = request.GET.get('template') or 'standard'
def __enter__(self):
self.page = Page(public=False, title=_('Style Demo'))
self.page.template_name = self.template_name
self.page.save()
combo_template = settings.COMBO_PUBLIC_TEMPLATES[self.page.template_name]
for i, placeholder in enumerate(self.page.get_placeholders(request=self.request)):
cell = TextCell(page=self.page, placeholder=placeholder.key, order=0)
if placeholder.key == 'footer':
cell.text = '<p>%s</p>' % lorem_ipsum.sentence()
else:
cell.text = '<h2>%s</h2>%s' % (
lorem_ipsum.words(3, common=False),
'\n'.join(['<p>%s</p>' % x for x in lorem_ipsum.paragraphs(3)]))
cell.save()
if i == 0:
# add a template selector in first placeholder
cell = TextCell(page=self.page, placeholder=placeholder.key, order=1)
options_html = []
for template_key, template_dict in settings.COMBO_PUBLIC_TEMPLATES.items():
try:
get_template(template_dict.get('template'))
except TemplateDoesNotExist:
# don't propose templates that do not exist
continue
attr = 'value="%s"' % template_key
if template_key == self.page.template_name:
attr += ' selected="selected"'
options_html.append(u'<option %s>%s</option>' % (attr, template_dict['name']))
cell.text = u'''<form><select name="template">%s</select>
<button>%s</button></form>''' % (
'\n'.join(options_html),
_('Select'))
cell.save()
return self.page
def __exit__(self, type, value, traceback):
self.page.delete()
def style(request):
if not settings.DEBUG:
raise Http404()
messages.success(request, _('Success notice'))
messages.info(request, _('Info notice'))
messages.warning(request, _('Warning notice'))
messages.error(request, _('Error notice'))
with transaction.atomic():
with StyleDemoPage(request) as page:
return publish_page(request, page)
def empty_site(request):
return render(request, 'combo/empty_site.html', {})
def page(request):
request.extra_context_data = {
'absolute_uri': request.build_absolute_uri()
}
url = request.path_info
parts = [x for x in request.path_info.strip('/').split('/') if x]
if len(parts) == 1 and parts[0] == 'index':
return HttpResponsePermanentRedirect('/')
if not parts:
parts = ['index']
if parts == ['index'] and settings.COMBO_INITIAL_LOGIN_PAGE_PATH and (
request.user and not request.user.is_anonymous()):
profile, created = Profile.objects.get_or_create(user=request.user)
if not profile.initial_login_view_timestamp:
# first connection of user, record that and redirect to welcome URL
profile.initial_login_view_timestamp = timezone.now()
profile.save()
return HttpResponseRedirect(settings.COMBO_INITIAL_LOGIN_PAGE_PATH)
if parts == ['index'] and settings.COMBO_WELCOME_PAGE_PATH and (
not request.user or request.user.is_anonymous()):
if not request.session.setdefault('visited', False):
# first visit, the user is not logged in.
request.session['visited'] = True
return HttpResponseRedirect(settings.COMBO_WELCOME_PAGE_PATH)
pages = {x.slug: x for x in Page.objects.filter(slug__in=parts)}
if pages == {} and parts == ['index'] and Page.objects.count() == 0:
return empty_site(request)
i = 0
hierarchy_ids = [None]
while i < len(parts):
try:
page = pages[parts[i]]
except KeyError:
page = None
break
if page.parent_id != hierarchy_ids[-1]:
if i == 0:
# root page should be at root but maybe the page is a child of
# /index/, and as /index/ is silent the page would appear
# directly under /; this is not a suggested practice.
if page.parent.slug != 'index' and page.parent.parent_id is not None:
page = None
break
else:
page = None
break
if page.sub_slug:
if parts[i+1:] == []:
# a sub slug is expected but was not found; redirect to parent
# page as a mitigation.
return HttpResponseRedirect('..')
match = re.match('^' + page.sub_slug + '$', parts[i+1])
if match is None:
page = None
break
request.extra_context_data.update(match.groupdict())
parts = parts[:i+1] + parts[i+2:] # skip variable component
i += 1
hierarchy_ids.append(page.id)
if not url.endswith('/') and settings.APPEND_SLASH:
# this is useful to allow /login, /manage, and other non-page
# URLs to work.
return HttpResponsePermanentRedirect(url + '/')
if page is None:
redirect = Redirect.objects.filter(old_url=url).last()
if redirect:
return HttpResponseRedirect(redirect.page.get_online_url())
raise Http404("combo: can't find the requested page, you might need to create it.")
return publish_page(request, page)
def publish_page(request, page, status=200, template_name=None):
pages = page.get_parents_and_self()
if not page.is_visible(request.user):
if not request.user.is_authenticated():
from django.contrib.auth.views import redirect_to_login
return redirect_to_login(request.build_absolute_uri())
raise PermissionDenied()
if page.redirect_url:
try:
redirect_url = page.get_redirect_url(context=request.extra_context_data)
except utils.TemplateError:
raise Http404("combo: can't compute redirect URL (template error).")
if redirect_url:
return HttpResponseRedirect(redirect_url)
cells = CellBase.get_cells(page=page)
extend_with_parent_cells(cells, hierarchy=pages)
cells = [x for x in cells if x.is_visible(user=request.user)]
ctx = {
'check_badges': should_check_badges(),
'page': page,
'page_cells': cells,
'pages': pages,
'request': request,
'media': sum((cell.media for cell in cells), Media())
}
ctx.update(getattr(request, 'extra_context_data', {}))
modify_global_context(request, ctx)
for cell in cells:
if cell.modify_global_context:
cell.modify_global_context(ctx, request)
if not template_name:
combo_template = settings.COMBO_PUBLIC_TEMPLATES[page.template_name]
template_name = combo_template['template']
return render(request, template_name, ctx, status=status)
def error404(request, *args, **kwargs):
if not hasattr(request, 'user'):
# this happens when the 404 handler is called early on, for example
# when the given hostname doesn't exist as a tenant
from django.views.defaults import page_not_found
return page_not_found(request, *args, **kwargs)
try:
page = Page.objects.get(slug='404')
template_name = None
except Page.DoesNotExist:
page = Page()
page.template_name = 'standard'
template_name = 'combo/404.html'
return publish_page(request, page, status=404, template_name=template_name)
def menu_badges(request):
context = {'request': request}
page_ids = request.GET.getlist('page[]')
cells = []
for klass in CellBase.get_cell_classes(lambda x: bool(x.get_badge)):
cells.extend(klass.objects.filter(page_id__in=page_ids))
badges = {}
for cell in cells:
if cell.page_id in badges:
continue
badge = cell.get_badge(context)
if badge:
badges[cell.page_id] = badge
return HttpResponse(json.dumps(badges), content_type='application/json')
def api_search(request):
for cell in SearchCell.get_cells_by_search_service('_text'):
if not cell.is_visible(request.user):
continue
break
else:
raise Http404()
query = request.GET.get('q') or ''
sqs = SearchQuerySet().filter(SQ(content=AutoQuery(query)) | SQ(title=AutoQuery(query)))
sqs = sqs.highlight()
sqs.load_all()
hits = []
for hit in sqs:
description = None
if hit.model_name == 'page' and hit.highlighted['text']:
description = '<p>%s</p>' % hit.highlighted['text'][0]
hits.append({
'text': hit.title,
'url': hit.url,
'description': description,
})
return HttpResponse(json.dumps({'data': hits}), content_type='application/json')
def snapshot(request, *args, **kwargs):
snapshot = PageSnapshot.objects.get(id=kwargs['pk'])
return publish_page(request, snapshot.get_page())