combo/combo/public/views.py

673 lines
25 KiB
Python

# combo - content management system
# Copyright (C) 2014 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import re
from itertools import chain
import django
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import logout as auth_logout
from django.contrib.auth import views as auth_views
from django.contrib.auth.models import User
from django.core import signing
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
from django.db import transaction
from django.db.models import Q
from django.forms.widgets import Media
from django.http import (
Http404,
HttpResponse,
HttpResponseBadRequest,
HttpResponsePermanentRedirect,
HttpResponseRedirect,
)
from django.shortcuts import render, resolve_url
from django.template import engines
from django.template.loader import TemplateDoesNotExist, get_template
from django.utils import lorem_ipsum, timezone
from django.utils.encoding import force_text
from django.utils.six.moves.urllib import parse as urllib
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ugettext as _
from django.views.decorators.csrf import csrf_exempt
from django.views.defaults import page_not_found
if 'mellon' in settings.INSTALLED_APPS:
from mellon.utils import get_idps
else:
get_idps = lambda: []
from combo import utils
from combo.apps.assets.models import Asset
from combo.apps.search.models import SearchCell
from combo.data.models import (
CellBase,
Page,
PageSnapshot,
ParentContentCell,
PostException,
Redirect,
TextCell,
extract_context_from_sub_slug,
)
from combo.profile.models import Profile
from combo.profile.utils import get_user_from_name_id
class LoginView(auth_views.LoginView):
def dispatch(self, request, *args, **kwargs):
if any(get_idps()):
if 'next' not in request.GET:
return HttpResponseRedirect(resolve_url('mellon_login'))
try:
quoted_next_url = urllib.quote(request.GET.get('next'))
except KeyError:
return HttpResponseBadRequest('invalid value for "next" parameter')
return HttpResponseRedirect(resolve_url('mellon_login') + '?next=' + quoted_next_url)
return super().dispatch(request, *args, **kwargs)
login = LoginView.as_view()
def logout(request, next_page=None):
if any(get_idps()):
return HttpResponseRedirect(resolve_url('mellon_logout'))
auth_logout(request)
if next_page is not None:
next_page = resolve_url(next_page)
else:
next_page = '/'
return HttpResponseRedirect(next_page)
def modify_global_context(request, ctx):
if 'user_id' in ctx:
try:
ctx['selected_user'] = User.objects.get(id=ctx['user_id'])
except (User.DoesNotExist, ValueError):
pass
if 'name_id' in ctx:
ctx['selected_user'] = get_user_from_name_id(ctx['name_id'])
@csrf_exempt
def ajax_page_cell(request, page_pk, cell_reference):
try:
page = Page.objects.get(id=page_pk)
except Page.DoesNotExist:
# check it's not from a snapshots
try:
page = Page.snapshots.get(id=page_pk)
except Page.DoesNotExist:
raise Http404()
# as it's from a snapshot access is limited to managers
if not (request.user and request.user.is_staff):
raise PermissionDenied()
if not page.is_visible(request.user):
raise PermissionDenied()
try:
cell = CellBase.get_cell(cell_reference, page_id=page_pk)
except ObjectDoesNotExist:
raise Http404()
if not cell.is_visible(user=request.user):
raise PermissionDenied()
exception = None
action_response = None
if request.method == 'POST':
if not hasattr(cell, 'post'):
raise PermissionDenied()
try:
action_response = cell.post(request)
except PostException as e:
exception = e
if not request.is_ajax():
messages.error(
request, force_text(e) if force_text(e) != 'None' else _('Error sending data.')
)
if action_response:
response = HttpResponse(
action_response.content, content_type=action_response.headers.get('Content-Type')
)
if 'Content-Disposition' in action_response.headers:
response['Content-Disposition'] = action_response.headers['Content-Disposition']
return response
if not request.is_ajax():
return HttpResponseRedirect(cell.page.get_online_url())
response = render_cell(request, cell)
if exception:
response['x-error-message'] = force_text(exception)
return response
def render_cell(request, cell):
context = {
'page': cell.page if cell.page_id else None,
'request': request,
'cell': cell,
'synchronous': True,
'site_base': request.build_absolute_uri('/')[:-1],
'absolute_uri': request.build_absolute_uri,
}
if request.GET.get('ctx'):
try:
context.update(signing.loads(request.GET['ctx']))
except signing.BadSignature:
return HttpResponseBadRequest('bad signature')
modify_global_context(request, context)
if cell.page_id:
other_cells = []
for klass in CellBase.get_cell_classes(lambda x: bool(x.modify_global_context)):
other_cells.extend(
klass.objects.filter(page_id=cell.page_id).exclude(
placeholder__in=['_auto_tile', '_dashboard', '_suggested_tile']
)
)
other_cells = [x for x in other_cells if x.is_visible(user=request.user)]
other_cells.sort(key=lambda x: x.order)
for other_cell in other_cells:
if other_cell.get_reference() != cell.get_reference():
other_cell.modify_global_context(context, request)
elif cell.modify_global_context:
# Cell can pass data through its own __dict__
cell.modify_global_context(context, request)
template = engines['django'].from_string('{% render_cell cell %}')
return HttpResponse(template.render(context, request), content_type='text/html')
def extend_with_parent_cells(cells, hierarchy):
if len(hierarchy) == 1 and hierarchy[0].slug == 'index':
# home page cannot contain parent cells
return
seen = {}
for cell in cells[:]:
if not isinstance(cell, ParentContentCell):
continue
idx = cells.index(cell)
parent_cells = cell.get_parents_cells(hierarchy=hierarchy[:-1])
# keep cells that were not already seen and mark cells as seen,
# simultaneously.
parent_cells = [
seen.setdefault((x.__class__, x.id), True) and x
for x in parent_cells
if (x.__class__, x.id) not in seen
]
cells[idx : idx + 1] = parent_cells
def should_check_badges():
check_badges = False
for klass in CellBase.get_cell_classes(lambda x: bool(x.get_badge)):
check_badges = klass.objects.all().exists()
if check_badges:
break
return check_badges
def mark_duplicated_slugs(cells):
# mark duplicated slugs to avoid using them in HTML id attributes.
cell_by_slugs = {}
for cell in cells:
if cell.slug not in cell_by_slugs:
cell_by_slugs[cell.slug] = []
cell_by_slugs[cell.slug].append(cell)
for slug, slug_cells in cell_by_slugs.items():
for cell in slug_cells:
cell.use_slug_as_id = bool(len(slug_cells) == 1)
def skeleton(request):
# Skeleton rendering is used to dynamically produce base templates to use
# in other applications, based on configured combo cells.
#
# It takes a ?source= parameter that should contain the URL we want a
# template for; it will be used to match the corresponding page, and thus
# the corresponding content.
#
# If there's no matching page, the ?source= parameter will be evaluated
# against the known services (settings.KNOWN_SERVICES) and an empty page
# will be created to be used as skeleton.
#
# If there was no matching page and no matching service an error will be
# raised. (403 Access Forbidden)
#
# While placeholders holding cells will get their cells rendered, empty
# placeholders will get themself outputted as template blocks, named
# placeholder-$name, and with a default content of a block named $name.
#
# ex:
# {% block placeholder-content %}
# {% block content %}
# {% endblock %}
# {% endblock %}
if 'source' not in request.GET:
raise PermissionDenied()
source = request.GET['source']
if source == '404':
request.extra_context_data = {
'site_base': request.build_absolute_uri('/')[:-1],
'force_absolute_url': True,
}
response = error404(request, exception=Http404())
response.status_code = 200
return response
parsed_source = urlparse.urlparse(source)
netloc = parsed_source.netloc
if parsed_source.scheme == 'https' and netloc.endswith(':443'):
# somme HTTP client (like Mozilla/1.1 (compatible; MSPIE 2.0; Windows
# CE)) will make request with an explicit :443 port in the Host header;
# it will then be used in constructing page URL and will end up in the
# ?source= parameter of this call. Remove it.
netloc = netloc.replace(':443', '')
selected_page = None
same_domain_pages = []
# look in redirect pages after the best match for the source, in case of
# several exact matches take the latest.
redirect_pages = Page.objects.exclude(redirect_url__isnull=True).exclude(redirect_url='')
for page in redirect_pages:
try:
redirect_url = utils.get_templated_url(page.redirect_url)
except utils.TemplateError:
continue
if not redirect_url:
continue
if source.startswith(redirect_url):
if selected_page is None or len(redirect_url) >= len(selected_page.get_redirect_url()):
selected_page = page
if urlparse.urlparse(redirect_url).netloc == netloc:
same_domain_pages.append(page)
if selected_page is None and same_domain_pages:
# if there was no page found, get a domain match
selected_page = same_domain_pages[0]
if selected_page is None:
# if there's still no page found, look in KNOWN_SERVICES, and
# return an empty page as template
for service_id in settings.KNOWN_SERVICES or {}:
for service_key in settings.KNOWN_SERVICES[service_id]:
service = settings.KNOWN_SERVICES[service_id][service_key]
if urlparse.urlparse(service.get('url')).netloc == netloc:
selected_page = Page()
selected_page.id = '__root'
selected_page.template_name = 'standard'
break
else:
continue
break
else:
raise PermissionDenied()
# add default ParentContentCells to the page
cells = []
for placeholder in selected_page.get_placeholders(request=request):
if placeholder.acquired:
cells.append(ParentContentCell(page=selected_page, placeholder=placeholder.key, order=0))
else:
cells = CellBase.get_cells(page=selected_page)
pages = selected_page.get_parents_and_self()
combo_template = settings.COMBO_PUBLIC_TEMPLATES[selected_page.template_name]
extend_with_parent_cells(cells, hierarchy=pages)
mark_duplicated_slugs(cells)
# mark duplicated slugs to avoid using them in HTML id attributes.
cell_by_slugs = {}
for cell in cells:
if cell.slug not in cell_by_slugs:
cell_by_slugs[cell.slug] = []
cell_by_slugs[cell.slug].append(cell)
for slug, slug_cells in cell_by_slugs.items():
for cell in slug_cells:
cell.use_slug_as_id = bool(len(slug_cells) == 1)
ctx = {
'page': selected_page,
'page_cells': cells,
'pages': pages,
'request': request,
'render_skeleton': True,
'force_absolute_url': True,
'check_badges': should_check_badges(),
'site_base': request.build_absolute_uri('/')[:-1],
}
template_name = combo_template['template']
response = render(request, template_name, ctx)
response['X-Combo-Page-Id'] = str(selected_page.id)
response['X-Combo-Skeleton-Pages'] = json.dumps(
dict([(x.id, utils.get_templated_url(x.redirect_url)) for x in same_domain_pages])
)
return response
class StyleDemoPage(object):
def __init__(self, request):
self.request = request
self.template_name = request.GET.get('template') or 'standard'
def __enter__(self):
self.page = Page(public=False, title=_('Style Demo'))
self.page.template_name = self.template_name
self.page.save()
combo_template = settings.COMBO_PUBLIC_TEMPLATES[self.page.template_name]
for i, placeholder in enumerate(self.page.get_placeholders(request=self.request)):
cell = TextCell(page=self.page, placeholder=placeholder.key, order=0)
if placeholder.key == 'footer':
cell.text = '<p>%s</p>' % lorem_ipsum.sentence()
else:
cell.text = '<h2>%s</h2><p>%s' % (
lorem_ipsum.words(3, common=False), # title
lorem_ipsum.paragraphs(1)[0],
) # 1st paragraph
if i == 0:
cell.text = cell.text.strip('.')
cell.text += ' <a href="#">%s</a>.</p>' % lorem_ipsum.words(3, common=False) # link
cell.text += '<h3>%s</h3>' % lorem_ipsum.words(3, common=False).title() # subtitle
else:
cell.text += '</p>'
cell.text += '\n'.join(['<p>%s</p>' % x for x in lorem_ipsum.paragraphs(2, common=False)])
if i == 1:
cell.extra_css_class = 'foldable'
cell.save()
cell.save()
if i == 0:
# add a template selector in first placeholder
cell = TextCell(page=self.page, placeholder=placeholder.key, order=1)
options_html = []
for template_key, template_dict in settings.COMBO_PUBLIC_TEMPLATES.items():
try:
get_template(template_dict.get('template'))
except TemplateDoesNotExist:
# don't propose templates that do not exist
continue
attr = 'value="%s"' % template_key
if template_key == self.page.template_name:
attr += ' selected="selected"'
options_html.append(u'<option %s>%s</option>' % (attr, template_dict['name']))
cell.text = u'''<form><select name="template">%s</select>
<button>%s</button></form>''' % (
'\n'.join(options_html),
_('Select'),
)
cell.save()
if i == 1:
# add a list of links in second placeholder
cell = TextCell(page=self.page, placeholder=placeholder.key, order=1)
cell.text = '<h2>%s</h2><ul>' % _('Title')
cell.extra_css_class = 'links-list'
for i in range(5):
cell.text += '<li><a href="#">%s</a></li>' % lorem_ipsum.words(3, common=False)
cell.text += '</ul>'
cell.save()
return self.page
def __exit__(self, type, value, traceback):
self.page.delete()
def style(request):
if not settings.DEBUG:
raise Http404()
messages.success(request, _('Success notice'))
messages.info(request, _('Info notice'))
messages.warning(request, _('Warning notice'))
messages.error(request, _('Error notice'))
with transaction.atomic():
with StyleDemoPage(request) as page:
return publish_page(request, page)
def empty_site(request):
return render(request, 'combo/empty_site.html', {})
def page(request):
request.extra_context_data = {'absolute_uri': request.build_absolute_uri()}
url = request.path_info
parts = [x for x in request.path_info.strip('/').split('/') if x]
if len(parts) == 1 and parts[0] == 'index':
return HttpResponsePermanentRedirect('/')
if not parts:
parts = ['index']
if (
parts == ['index']
and settings.COMBO_INITIAL_LOGIN_PAGE_PATH
and (request.user and not request.user.is_anonymous)
):
profile, created = Profile.objects.get_or_create(user=request.user)
if not profile.initial_login_view_timestamp:
# first connection of user, record that and redirect to welcome URL
profile.initial_login_view_timestamp = timezone.now()
profile.save()
return HttpResponseRedirect(settings.COMBO_INITIAL_LOGIN_PAGE_PATH)
if (
parts == ['index']
and settings.COMBO_WELCOME_PAGE_PATH
and (not request.user or request.user.is_anonymous)
):
if not request.session.setdefault('visited', False):
# first visit, the user is not logged in.
request.session['visited'] = True
return HttpResponseRedirect(settings.COMBO_WELCOME_PAGE_PATH)
pages = {}
for page in Page.objects.filter(slug__in=parts):
if not page.slug in pages:
pages[page.slug] = []
pages[page.slug].append(page)
if pages == {} and parts == ['index'] and Page.objects.count() == 0:
return empty_site(request)
i = 0
hierarchy_ids = [None]
while i < len(parts):
slug_pages = pages.get(parts[i])
if slug_pages is None or len(slug_pages) == 0:
page = None
break
elif len(slug_pages) == 1:
page = slug_pages[0]
else:
# multiple pages with same slugs
try:
page = [x for x in slug_pages if x.parent_id == hierarchy_ids[-1]][0]
except IndexError:
page = None
break
if page.parent_id != hierarchy_ids[-1]:
if i == 0:
# root page should be at root but maybe the page is a child of
# /index/, and as /index/ is silent the page would appear
# directly under /; this is not a suggested practice.
if page.parent.slug != 'index' and page.parent.parent_id is not None:
page = None
break
else:
page = None
break
if page.sub_slug:
if parts[i + 1 :] == []:
# a sub slug is expected but was not found; redirect to parent
# page as a mitigation.
return HttpResponseRedirect('..')
extra = extract_context_from_sub_slug(page.sub_slug, parts[i + 1])
if extra is None:
page = None
break
request.extra_context_data.update(extra)
parts = parts[: i + 1] + parts[i + 2 :] # skip variable component
i += 1
hierarchy_ids.append(page.id)
if not url.endswith('/') and settings.APPEND_SLASH:
# this is useful to allow /login, /manage, and other non-page URLs to
# work. re.sub is used to replace repeated slashes by single ones,
# this prevents a double slash at the start to redirect to a
# //whatever service, which would be interpreted as http[s]://whatever/.
return HttpResponsePermanentRedirect(re.sub('/+', '/', url) + '/')
if page is None:
redirect = Redirect.objects.filter(old_url=url).last()
if redirect:
return HttpResponseRedirect(redirect.page.get_online_url())
raise Http404("combo: can't find the requested page, you might need to create it.")
return publish_page(request, page)
def publish_page(request, page, status=200, template_name=None):
pages = page.get_parents_and_self()
if not page.is_visible(request.user):
if not request.user.is_authenticated:
from django.contrib.auth.views import redirect_to_login
return redirect_to_login(request.build_absolute_uri())
raise PermissionDenied()
if page.redirect_url:
context = {'request': request}
context.update(request.extra_context_data or {})
try:
redirect_url = page.get_redirect_url(context=context)
except utils.TemplateError:
raise Http404("combo: can't compute redirect URL (template error).")
if redirect_url:
return HttpResponseRedirect(redirect_url)
cells = CellBase.get_cells(
page=page,
select_related={'data_linkcell': ['link_page']},
prefetch_validity_info=True,
cells_exclude=Q(placeholder__in=['_auto_tile', '_dashboard', '_suggested_tile']),
)
extend_with_parent_cells(cells, hierarchy=pages)
cells = [x for x in cells if x.is_visible(user=request.user)]
mark_duplicated_slugs(cells)
# load assets
for cell in cells:
cell._asset_keys = cell.get_asset_slot_keys()
asset_keys = list(set(chain(*[c._asset_keys.keys() for c in cells])))
assets = list(Asset.objects.filter(key__in=asset_keys))
for cell in cells:
cell._assets = {a.key: a for a in assets if a.key in cell._asset_keys.keys()}
ctx = {
'check_badges': should_check_badges(),
'page': page,
'page_cells': cells,
'pages': pages,
'request': request,
'media': sum((cell.media for cell in cells), Media()),
}
ctx.update(getattr(request, 'extra_context_data', {}))
modify_global_context(request, ctx)
if getattr(settings, 'COMBO_TEST_ALWAYS_RENDER_CELLS_SYNCHRONOUSLY', False):
ctx['synchronous'] = True
for cell in cells:
if cell.modify_global_context:
cell.modify_global_context(ctx, request)
if not template_name:
combo_template = settings.COMBO_PUBLIC_TEMPLATES[page.template_name]
template_name = combo_template['template']
return render(request, template_name, ctx, status=status)
def error404(request, *args, **kwargs):
if not hasattr(request, 'user'):
# this happens when the 404 handler is called early on, for example
# when the given hostname doesn't exist as a tenant
return page_not_found(request, *args, **kwargs)
if Page.objects.exists() and all(
(not x.is_visible(request.user) for x in Page.objects.filter(parent_id__isnull=True))
):
# if none of the first-level pages can be viewed by the user, display
# native django error page.
return page_not_found(request, *args, template_name='combo/native-404.html', **kwargs)
try:
page = Page.objects.get(slug='404')
template_name = None
except Page.DoesNotExist:
page = Page.objects.filter(slug='index', parent=None).first() or Page()
page.redirect_url = None
page.public = True
page.template_name = 'standard'
template_name = 'combo/404.html'
return publish_page(request, page, status=404, template_name=template_name)
def mellon_page_hook(context):
page = Page()
page.title = 'Hello'
page.template_name = 'standard'
context['page'] = page
home = Page.objects.filter(slug='index', parent=None).first()
if home:
context['page_cells'] = CellBase.get_cells(page=home)
def menu_badges(request):
context = {'request': request}
page_ids = [x for x in request.GET.getlist('page[]') if x.isdigit()]
cells = []
for klass in CellBase.get_cell_classes(lambda x: bool(x.get_badge)):
cells.extend(klass.objects.filter(page_id__in=page_ids))
badges = {}
for cell in cells:
if cell.page_id in badges:
continue
badge = cell.get_badge(context)
if badge:
badges[cell.page_id] = badge
return HttpResponse(json.dumps(badges), content_type='application/json')
menu_badges.mellon_no_passive = True
def snapshot(request, *args, **kwargs):
snapshot = PageSnapshot.objects.get(id=kwargs['pk'])
return publish_page(request, snapshot.get_page())