combo/combo/data/models.py

1394 lines
50 KiB
Python

# combo - content management system
# Copyright (C) 2014 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import copy
import feedparser
import hashlib
import json
import logging
import os
import re
import requests
import subprocess
from django.apps import apps
from django.conf import settings
from django.contrib.auth.models import Group
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist, ValidationError, PermissionDenied
from django.core import serializers
from django.db import models, transaction
from django.db.models.base import ModelBase
from django.db.models.signals import pre_save, post_save, post_delete
from django.db.models import Max
from django.dispatch import receiver
from django.forms import models as model_forms
from django import forms
from django import template
from django.utils import six
from django.utils.encoding import python_2_unicode_compatible, force_text, smart_bytes
from django.utils.html import strip_tags
from django.utils.safestring import mark_safe
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.text import slugify
from django.utils.translation import ugettext_lazy as _
from django.forms.widgets import MediaDefiningClass
from django.template import Context, engines, TemplateDoesNotExist
from django.test.client import RequestFactory
from .fields import RichTextField
from jsonfield import JSONField
from .library import register_cell_class, get_cell_classes, get_cell_class
from combo import utils
from combo.utils import NothingInCacheException
class PostException(Exception):
pass
def element_is_visible(element, user=None):
if element.public:
if getattr(element, 'restricted_to_unlogged', None) is True:
return (user is None or user.is_anonymous())
return True
if user is None or user.is_anonymous():
return False
if user.is_superuser:
return True
page_groups = element.groups.all()
if not page_groups:
groups_ok = True
else:
groups_ok = len(set(page_groups).intersection(user.groups.all())) > 0
if getattr(element, 'restricted_to_unlogged', None) is True:
return not(groups_ok)
return groups_ok
class Placeholder(object):
def __init__(self, key, name=None, acquired=False, render=True, cell=None,
force_synchronous=False):
self.key = key
self.name = name
self.acquired = acquired
self.render = render
self.cell = cell
self.force_synchronous = force_synchronous
def get_name(self):
if self.cell:
return '%s / %s' % (self.cell.get_label(), self.name)
return self.name
class PageManager(models.Manager):
snapshots = False
def __init__(self, *args, **kwargs):
self.snapshots = kwargs.pop('snapshots', False)
super(PageManager, self).__init__(*args, **kwargs)
def get_by_natural_key(self, path):
parts = [x for x in path.strip('/').split('/') if x] or ['index']
return self.get(slug=parts[-1])
def get_queryset(self):
queryset = super(PageManager, self).get_queryset()
if self.snapshots:
return queryset.filter(snapshot__isnull=False)
else:
return queryset.filter(snapshot__isnull=True)
@python_2_unicode_compatible
class Page(models.Model):
objects = PageManager()
snapshots = PageManager(snapshots=True)
title = models.CharField(_('Title'), max_length=150)
slug = models.SlugField(_('Slug'))
sub_slug = models.CharField(_('Sub Slug'), max_length=150, blank=True,
help_text=_('Regular expression to create variadic subpages. '
'Matching named groups are exposed as context variables.'
))
description = models.TextField(_('Description'), blank=True)
template_name = models.CharField(_('Template'), max_length=50)
parent = models.ForeignKey('self', null=True, blank=True)
order = models.PositiveIntegerField()
exclude_from_navigation = models.BooleanField(_('Exclude from navigation'), default=False)
redirect_url = models.CharField(_('Redirect URL'), max_length=200, blank=True)
public = models.BooleanField(_('Public'), default=True)
groups = models.ManyToManyField(Group, verbose_name=_('Groups'), blank=True)
last_update_timestamp = models.DateTimeField(auto_now=True)
picture = models.ImageField(_('Picture'), upload_to='page-pictures/', null=True)
# mark temporarily restored snapshots, it is required to save objects
# (pages and cells) for real for viewing past snapshots as many cells are
# asynchronously loaded and must refer to a real Page object.
snapshot = models.ForeignKey('PageSnapshot', on_delete=models.CASCADE, null=True,
related_name='temporary_page')
# keep a cached list of cell types that are used in the pages.
related_cells = JSONField(blank=True)
_level = None
class Meta:
ordering = ['order']
def __str__(self):
return self.title
def natural_key(self):
return (self.get_online_url().strip('/'), )
def save(self, *args, **kwargs):
if not self.id:
self.related_cells = {'cell_types': []}
if not self.order:
max_order = Page.objects.all().aggregate(Max('order')).get('order__max') or 0
self.order = max_order + 1
if not self.slug:
if Page.objects.count() == 0:
slug = 'index'
else:
base_slug = slugify(self.title)[:40]
slug = base_slug.strip('-')
i = 1
while True:
try:
Page.objects.get(slug=slug, parent_id=self.parent_id)
except ObjectDoesNotExist:
break
i += 1
slug = '%s-%s' % (base_slug, i)
self.slug = slug
if not self.template_name:
self.template_name = settings.COMBO_DEFAULT_PUBLIC_TEMPLATE
return super(Page, self).save(*args, **kwargs)
def get_parents_and_self(self):
pages = [self]
page = self
while page.parent_id:
page = page._parent if hasattr(page, '_parent') else page.parent
pages.append(page)
return list(reversed(pages))
def get_online_url(self):
parts = [x.slug for x in self.get_parents_and_self()]
if parts[0] == 'index':
parts = parts[1:]
if not parts:
return '/'
return '/' + '/'.join(parts) + '/'
def get_page_of_level(self, level):
'''Return page of given level in the page hierarchy.'''
parts = [self]
page = self
while page.parent_id:
page = page._parent if hasattr(page, '_parent') else page.parent
parts.append(page)
parts.reverse()
try:
return parts[level]
except IndexError:
return None
def get_siblings(self):
if hasattr(self, '_parent'):
if self._parent:
return self._parent._children
return Page.objects.filter(parent_id=self.parent_id)
def get_children(self):
if hasattr(self, '_children'):
return self._children
return Page.objects.filter(parent_id=self.id)
def has_children(self):
if hasattr(self, '_children'):
return bool(self._children)
return Page.objects.filter(parent_id=self.id).exists()
def get_template_display_name(self):
try:
return settings.COMBO_PUBLIC_TEMPLATES[self.template_name]['name']
except KeyError:
return _('Unknown (%s)') % self.template_name
def missing_template(self):
template_name = settings.COMBO_PUBLIC_TEMPLATES.get(self.template_name, {}).get('template')
if not template_name:
return True
try:
template.loader.select_template([template_name])
except TemplateDoesNotExist:
return True
return False
def get_placeholders(self, request, traverse_cells=False, template_name=None):
placeholders = []
page_template = settings.COMBO_PUBLIC_TEMPLATES.get(template_name or self.template_name, {})
if page_template.get('placeholders'):
# manual declaration
for key, options in page_template['placeholders'].items():
placeholders.append(Placeholder(key=key, **options))
return placeholders
template_names = []
if page_template.get('template'):
template_names.append(page_template['template'])
template_names.append('combo/page_template.html')
tmpl = template.loader.select_template(template_names)
request = RequestFactory(SERVER_NAME=request.get_host()).get(self.get_online_url())
request.user = None
context = {
'page': self,
'request': request,
'synchronous': True,
'placeholder_search_mode': True,
'placeholders': placeholders,
'traverse_cells': traverse_cells,
}
tmpl.render(context, request)
return placeholders
def get_next_page(self, user=None):
pages = Page.get_as_reordered_flat_hierarchy(Page.objects.all())
this_page = [x for x in pages if x.id == self.id][0]
pages = pages[pages.index(this_page)+1:]
for page in pages:
if page.is_visible(user):
return page
return None
def get_previous_page(self, user=None):
pages = Page.get_as_reordered_flat_hierarchy(Page.objects.all())
pages.reverse()
this_page = [x for x in pages if x.id == self.id][0]
pages = pages[pages.index(this_page)+1:]
for page in pages:
if page.is_visible(user):
return page
return None
@classmethod
def get_as_reordered_flat_hierarchy(cls, object_list):
reordered = []
parenting = collections.defaultdict(list)
for page in object_list:
parenting[page.parent_id].append(page)
def fill_list(object_sublist, level=0, parent=None):
for page in object_sublist:
if page.parent == parent:
page.level = level
reordered.append(page)
if page.id in parenting:
fill_list(object_sublist, level=level+1, parent=page)
fill_list(object_list)
return reordered
@staticmethod
@utils.cache_during_request
def get_with_hierarchy_attributes():
pages = Page.objects.all()
pages_by_id = {}
for page in pages:
pages_by_id[page.id] = page
page._parent = None
page._children = []
for page in pages:
page._parent = pages_by_id[page.parent_id] if page.parent_id else None
if page._parent:
page._parent._children.append(page)
for page in pages:
page._children.sort(key=lambda x: x.order)
return pages_by_id
def visibility(self):
if self.public:
return _('Public')
return _('Private (%s)') % ', '.join([x.name for x in self.groups.all()])
def is_visible(self, user=None):
return element_is_visible(self, user=user)
def get_cells(self):
return CellBase.get_cells(page=self)
def build_cell_cache(self):
cells = CellBase.get_cells(page=self, skip_cell_cache=True)
cell_types = set()
for cell in cells:
cell_types.add(cell.get_cell_type_str())
if cell_types != set(self.related_cells.get('cell_types', [])):
self.related_cells['cell_types'] = list(cell_types)
self.save()
def get_serialized_page(self):
cells = [x for x in self.get_cells() if x.placeholder and not x.placeholder.startswith('_')]
serialized_page = json.loads(serializers.serialize('json', [self],
use_natural_foreign_keys=True, use_natural_primary_keys=True))[0]
del serialized_page['model']
if 'snapshot' in serialized_page:
del serialized_page['snapshot']
if 'related_cells' in serialized_page['fields']:
del serialized_page['fields']['related_cells']
serialized_page['cells'] = json.loads(serializers.serialize('json',
cells, use_natural_foreign_keys=True, use_natural_primary_keys=True))
serialized_page['fields']['groups'] = [x[0] for x in serialized_page['fields']['groups']]
for cell in serialized_page['cells']:
del cell['pk']
del cell['fields']['page']
cell['fields']['groups'] = [x[0] for x in cell['fields']['groups']]
for key in list(cell['fields'].keys()):
if key.startswith('cached_'):
del cell['fields'][key]
return serialized_page
@classmethod
def load_serialized_page(cls, json_page, snapshot=None):
json_page['model'] = 'data.page'
json_page['fields']['groups'] = [[x] for x in json_page['fields']['groups'] if isinstance(x, six.string_types)]
page, created = Page.objects.get_or_create(slug=json_page['fields']['slug'], snapshot=snapshot)
json_page['pk'] = page.id
page = [x for x in serializers.deserialize('json', json.dumps([json_page]))][0]
page.object.snapshot = snapshot
page.save()
for cell in json_page.get('cells'):
cell['fields']['groups'] = [[x] for x in cell['fields']['groups'] if isinstance(x, six.string_types)]
if snapshot:
cell['fields']['page'] = page.object.id
else:
cell['fields']['page'] = page.object.natural_key()
# if there were cells, remove them
for cell in CellBase.get_cells(page_id=page.object.id):
cell.delete()
return page.object # get page out of deserialization object
@classmethod
def load_serialized_cells(cls, cells):
# load new cells
for cell in serializers.deserialize('json', json.dumps(cells)):
cell.save()
# will populate cached_* attributes
cell.object.save()
@classmethod
def load_serialized_pages(cls, json_site):
cells = []
for json_page in json_site:
cls.load_serialized_page(json_page)
cells.extend(json_page.get('cells'))
cls.load_serialized_cells(cells)
# 2nd pass to set parents
for json_page in json_site:
if json_page.get('parent_slug'):
page = Page.objects.get(slug=json_page['fields']['slug'])
page.parent = Page.objects.get(slug=json_page.get('parent_slug'))
page.save()
@classmethod
def export_all_for_json(cls):
ordered_pages = Page.get_as_reordered_flat_hierarchy(cls.objects.all())
return [x.get_serialized_page() for x in ordered_pages]
def get_redirect_url(self, context=None):
return utils.get_templated_url(self.redirect_url, context=context)
def get_last_update_time(self):
cells = CellBase.get_cells(page_id=self.id)
return max([self.last_update_timestamp] + [x.last_update_timestamp for x in cells])
class PageSnapshot(models.Model):
page = models.ForeignKey(Page, on_delete=models.SET_NULL, null=True)
timestamp = models.DateTimeField(auto_now_add=True)
user = models.ForeignKey(settings.AUTH_USER_MODEL, null=True)
comment = models.TextField(blank=True, null=True)
serialization = JSONField(blank=True)
class Meta:
ordering = ('-timestamp',)
@classmethod
def take(cls, page, request=None, comment=None, deletion=False):
snapshot = cls(page=page, comment=comment)
if request and not request.user.is_anonymous():
snapshot.user = request.user
if not deletion:
snapshot.serialization = page.get_serialized_page()
else:
snapshot.serialization = {}
snapshot.comment = comment or _('deletion')
snapshot.save()
def get_page(self):
try:
# try reusing existing page
return Page.snapshots.get(snapshot=self)
except Page.DoesNotExist:
page = Page.load_serialized_page(self.serialization, snapshot=self)
page.load_serialized_cells(self.serialization['cells'])
return page
def restore(self):
with transaction.atomic():
page = Page.load_serialized_page(self.serialization)
page.load_serialized_cells(self.serialization['cells'])
return page
class Redirect(models.Model):
old_url = models.CharField(max_length=512)
page = models.ForeignKey(Page)
creation_timestamp = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ('creation_timestamp',)
class CellMeta(MediaDefiningClass, ModelBase):
pass
@python_2_unicode_compatible
class CellBase(six.with_metaclass(CellMeta, models.Model)):
page = models.ForeignKey(Page)
placeholder = models.CharField(max_length=20)
order = models.PositiveIntegerField()
slug = models.SlugField(_('Slug'), blank=True)
extra_css_class = models.CharField(_('Extra classes for CSS styling'), max_length=100, blank=True)
public = models.BooleanField(_('Public'), default=True)
# restricted_to_unlogged is actually an invert switch, it is used for mark
# a cell as only visibile to unlogged users but also, when groups are set,
# to mark the cell as visible to all but those groups.
restricted_to_unlogged = models.BooleanField(
_('Restrict to unlogged users'), default=False)
groups = models.ManyToManyField(Group, verbose_name=_('Groups'), blank=True)
last_update_timestamp = models.DateTimeField(auto_now=True)
default_form_class = None
visible = True
user_dependant = False
manager_form_template = 'combo/cell_form.html'
template_name = None
# get_badge(self, context); set to None so cell types can be skipped easily
get_badge = None
# get_asset_slots(self); set to None so cell types can be skipped easily
get_asset_slots = None
# message displayed when the cell is loaded asynchronously
loading_message = _('Loading...')
# modify_global_context(self, context, request=None)
# Apply changes to the template context that must visible to all cells in the page
modify_global_context = None
# if set, automatically refresh cell every n seconds
ajax_refresh = None
class Meta:
abstract = True
def __str__(self):
label = self.get_verbose_name()
additional_label = self.get_additional_label()
if label and additional_label:
return u'%s (%s)' % (label, re.sub(r'\r?\n', u' ', force_text(additional_label)))
else:
return force_text(label)
@classmethod
def get_verbose_name(cls):
return cls._meta.verbose_name
def get_additional_label(self):
return ''
@property
def class_name(self):
return self.__class__.__name__.lower()
@property
def css_class_names(self):
return self.class_name + ' ' + self.extra_css_class
@classmethod
def get_cell_classes(cls, class_filter=lambda x: True):
for klass in get_cell_classes():
if not class_filter(klass):
continue
if not klass.is_enabled():
continue
if klass.visible is False:
continue
yield klass
@classmethod
def get_all_cell_types(cls, *args, **kwargs):
cell_types = []
for klass in cls.get_cell_classes(*args, **kwargs):
cell_types.extend(klass.get_cell_types())
return cell_types
@classmethod
def get_cells(cls, cell_filter=None, skip_cell_cache=False, **kwargs):
"""Returns the list of cells of various classes matching **kwargs"""
cells = []
pages = []
if 'page' in kwargs:
pages = [kwargs['page']]
elif 'page__in' in kwargs:
pages = kwargs['page__in']
else:
# if there are not explicit page, limit to non-snapshot pages
kwargs['page__snapshot__isnull'] = True
cell_classes = get_cell_classes()
if pages and not skip_cell_cache:
# if there's a request for some specific pages, limit cell types
# to those that are actually in use in those pages.
cell_types = set()
for page in pages:
if page.related_cells and 'cell_types' in page.related_cells:
cell_types |= set(page.related_cells['cell_types'])
else:
break
else:
cell_classes = [get_cell_class(x) for x in cell_types]
for klass in cell_classes:
if klass is None:
continue
if cell_filter and not cell_filter(klass):
continue
cells.extend(klass.objects.filter(**kwargs))
cells.sort(key=lambda x: x.order)
return cells
def get_reference(self):
"Returns a string that can serve as a unique reference to a cell"""
return str('%s-%s' % (self.get_cell_type_str(), self.id))
@classmethod
def get_cell(cls, reference, **kwargs):
"""Returns the cell matching reference, and eventual **kwargs"""
content_id, cell_id = reference.split('-')
klass = get_cell_class(content_id)
if klass is None:
raise ObjectDoesNotExist()
return klass.objects.get(id=cell_id, **kwargs)
@classmethod
def get_cell_type_str(cls):
return '%s_%s' % (cls._meta.app_label, cls._meta.model_name)
@classmethod
def get_cell_type_group(cls):
return apps.get_app_config(cls._meta.app_label).verbose_name
@classmethod
def get_cell_types(cls):
return [{
'name': cls.get_verbose_name(),
'cell_type_str': cls.get_cell_type_str(),
'group': cls.get_cell_type_group(),
'variant': 'default',
'order': 0,
}]
@classmethod
def is_enabled(cls):
"""Defines if the cell type is enabled for the given site; this is used
to selectively enable cells from extension modules."""
return True
def set_variant(self, variant):
pass
def get_label(self):
return self.get_verbose_name()
def get_default_form_class(self):
if self.default_form_class:
return self.default_form_class
fields = [x.name for x in self._meta.local_concrete_fields
if x.name not in ('id', 'page', 'placeholder', 'order',
'public', 'groups', 'slug',
'extra_css_class', 'last_update_timestamp',
'restricted_to_unlogged')]
if not fields:
return None
return model_forms.modelform_factory(self.__class__, fields=fields)
def get_options_form_class(self):
return model_forms.modelform_factory(self.__class__,
fields=['slug', 'extra_css_class'])
def get_extra_manager_context(self):
return {}
def is_visible(self, user=None):
return element_is_visible(self, user=user)
def is_relevant(self, context):
'''Return whether it's relevant to render this cell in the page
context.'''
return True
def is_user_dependant(self, context):
'''Return whether the cell content varies from user to user.'''
return self.user_dependant
def get_concerned_user(self, context):
'''Return user from UserSearch cell, or connected user.'''
return context.get('selected_user') or getattr(context.get('request'), 'user', None)
def get_cell_extra_context(self, context):
return {'cell': self}
def render(self, context):
context.update(self.get_cell_extra_context(context))
template_names = ['combo/' + self._meta.model_name + '.html']
base_template_name = self._meta.model_name + '.html'
if self.template_name:
base_template_name = os.path.basename(self.template_name)
template_names.append(self.template_name)
if self.slug:
template_names.append('combo/cells/%s/%s' % (self.slug, base_template_name))
template_names.reverse()
tmpl = template.loader.select_template(template_names)
return tmpl.render(context, context.get('request'))
def render_for_search(self):
if not self.is_enabled():
return ''
if self.user_dependant:
return ''
if not self.page.is_visible(user=None):
return ''
if not self.is_visible(user=None):
return ''
request = RequestFactory().get(self.page.get_online_url())
request.user = None # compat
context = {
'page': self.page,
'render_skeleton': False,
'request': request,
'site_base': request.build_absolute_uri('/')[:-1],
'synchronous': True,
'user': None, # compat
'absolute_uri': request.build_absolute_uri,
}
if not self.is_relevant(context):
return ''
from django.utils.six.moves.html_parser import HTMLParser
return HTMLParser().unescape(strip_tags(self.render(context)))
def get_external_links_data(self):
return []
@register_cell_class
class TextCell(CellBase):
text = RichTextField(_('Text'), blank=True, null=True)
template_name = 'combo/text-cell.html'
class Meta:
verbose_name = _('Text')
def is_relevant(self, context):
return bool(self.text)
def get_additional_label(self):
if not self.text:
return None
return utils.ellipsize(self.text)
@classmethod
def get_cell_types(cls):
d = super(TextCell, cls).get_cell_types()
d[0]['order'] = -1
return d
@register_cell_class
class FortuneCell(CellBase):
ajax_refresh = 30
class Meta:
verbose_name = _('Fortune')
def render(self, context):
return subprocess.check_output(['fortune'])
@classmethod
def is_enabled(cls):
try:
subprocess.check_output(['fortune'])
except OSError:
return False
return settings.DEBUG
@register_cell_class
class UnlockMarkerCell(CellBase):
# XXX: this is kept to smooth transitions, it should be removed once all
# sites # have been migrated to ParentContentCell
"""Marks an 'acquired' placeholder as unlocked."""
visible = False
class Meta:
verbose_name = _('Unlock Marker')
def render(self, context):
return ''
@register_cell_class
class MenuCell(CellBase):
depth = models.PositiveIntegerField(_('Depth'),
choices=[(i, i) for i in range(1, 3)], default=1, null=False)
initial_level = models.IntegerField(_('Initial Level'),
choices=[(-1, _('Same as page'))] +
[(i, i) for i in range(1, 3)], default=-1, null=False)
root_page = models.ForeignKey(Page, related_name='root_page',
null=True, blank=True, verbose_name=_('Root Page'))
template_name = 'combo/menu-cell.html'
class Meta:
verbose_name = _('Menu')
def get_default_form_class(self):
from .forms import MenuCellForm
return MenuCellForm
def get_cell_extra_context(self, context):
from combo.public.menu import render_menu
ctx = super(MenuCell, self).get_cell_extra_context(context)
ctx['menu'] = render_menu(context, level=self.initial_level,
root_page=self.root_page, depth=self.depth,
ignore_visibility=False)
return ctx
def render_for_search(self):
return ''
@register_cell_class
class LinkCell(CellBase):
title = models.CharField(_('Title'), max_length=150, blank=True)
url = models.CharField(_('URL'), max_length=200, blank=True)
link_page = models.ForeignKey('data.Page', related_name='link_cell', blank=True,
null=True, verbose_name=_('Internal link'))
anchor = models.CharField(_('Anchor'), max_length=150, blank=True)
template_name = 'combo/link-cell.html'
class Meta:
verbose_name = _('Link')
def get_additional_label(self):
title = self.title
if not title and self.link_page:
title = self.link_page.title
if not title:
return None
return utils.ellipsize(title)
def get_cell_extra_context(self, context):
render_skeleton = context.get('render_skeleton')
request = context.get('request')
extra_context = super(LinkCell, self).get_cell_extra_context(context)
if self.link_page:
extra_context['url'] = self.link_page.get_online_url()
extra_context['title'] = self.title or self.link_page.title
else:
extra_context['url'] = utils.get_templated_url(self.url, context=context)
extra_context['title'] = self.title or self.url
if self.anchor:
extra_context['url'] += '#' + self.anchor
if render_skeleton and not urlparse.urlparse(extra_context['url']).netloc:
# create full URL when used in a skeleton
extra_context['url'] = request.build_absolute_uri(extra_context['url'])
return extra_context
def get_default_form_class(self):
from .forms import LinkCellForm
return LinkCellForm
def render_for_search(self):
return ''
def get_external_links_data(self):
if not self.url:
return []
link_data = self.get_cell_extra_context({})
if link_data.get('title') and link_data.get('url'):
return [link_data]
return []
@register_cell_class
class FeedCell(CellBase):
title = models.CharField(_('Title'), max_length=150, blank=True)
url = models.URLField(_('URL'), blank=True)
limit = models.PositiveSmallIntegerField(_('Maximum number of entries'),
null=True, blank=True)
template_name = 'combo/feed-cell.html'
class Meta:
verbose_name = _('RSS/Atom Feed')
def is_visible(self, user=None):
return bool(self.url) and super(FeedCell, self).is_visible(user=user)
def get_cell_extra_context(self, context):
extra_context = super(FeedCell, self).get_cell_extra_context(context)
if context.get('placeholder_search_mode'):
# don't call webservices when we're just looking for placeholders
return extra_context
cache_key = hashlib.md5(smart_bytes(self.url)).hexdigest()
feed_content = cache.get(cache_key)
if not feed_content:
feed_response = requests.get(utils.get_templated_url(self.url))
if feed_response.status_code == 200:
feed_content = feed_response.content
cache.set(cache_key, feed_content, 600)
if feed_content:
extra_context['feed'] = feedparser.parse(feed_content)
if self.limit:
extra_context['feed']['entries'] = extra_context['feed']['entries'][:self.limit]
if not self.title:
try:
self.title = extra_context['feed']['feed'].title
except (KeyError, AttributeError):
pass
return extra_context
def render(self, context):
cache_key = hashlib.md5(smart_bytes(self.url)).hexdigest()
feed_content = cache.get(cache_key)
if not context.get('synchronous') and feed_content is None:
raise NothingInCacheException()
return super(FeedCell, self).render(context)
@register_cell_class
class ParametersCell(CellBase):
title = models.CharField(_('Title'), max_length=150, blank=True)
url = models.URLField(_('URL'), blank=True)
empty_label = models.CharField(_('Empty label'), max_length=64, default='---')
parameters = JSONField(_('Parameters'), blank=True,
help_text=_('Must be a JSON list, containing dictionaries with 3 keys: '
'name, value and optionnally roles; name must be a string, '
'value must be a dictionary and roles must a list of role '
'names. Role names limit the visibility of the choice.'))
template_name = 'combo/parameters-cell.html'
class Meta:
verbose_name = _('Parameters')
def get_additional_label(self):
return self.title
def is_visible(self, user=None):
return bool(self.parameters) and super(ParametersCell, self).is_visible(user=user)
def validate_schema(self, value):
if not isinstance(value, list):
return False, _('it must be a list')
if not all(isinstance(x, dict) for x in value):
return False, _('it must be a list of dictionaries')
if not all(set(x.keys()) <= set(['roles', 'name', 'value']) for x in value):
return False, _('permitted keys in the dictionaries are name, roles and value')
for x in value:
if 'roles' not in x:
continue
if not isinstance(x['roles'], list):
return False, _('roles must be a list')
if not all(isinstance(y, unicode) for y in x['roles']):
return False, _('roles must be a list of strings')
if len(set(x['roles'])) != len(x['roles']):
return False, _('role\'s names must be unique in a list of roles')
existing = Group.objects.filter(name__in=x['roles']).values_list('name', flat=True)
if len(existing) != len(x['roles']):
l = u', '.join(set(x['roles']) - set(existing))
return False, _('role(s) %s do(es) not exist') % l
if not all(isinstance(x['name'], unicode) for x in value):
return False, _('name must be a string')
if not all(isinstance(x['value'], dict) for x in value):
return False, ('value must be a dictionary')
if not len(set(x['name'] for x in value)) == len(value):
return False, _('names must be unique')
return True, ''
def clean(self):
validated, msg = self.validate_schema(self.parameters)
if not validated:
raise ValidationError(_('Parameters does not validate the expected schema: %s') % msg)
def get_form(self, request):
from .forms import ParametersForm
if not request.user.is_anonymous():
groups = set(request.user.groups.values_list('name', flat=True))
else:
groups = set()
parameters = [param for param in self.parameters
if not param.get('roles') or set(param['roles']) & groups]
return ParametersForm(request.GET, parameters=parameters,
empty_label=self.empty_label,
prefix='parameters-cells-' + str(self.pk))
def modify_global_context(self, context, request):
if not bool(self.parameters):
return
# Store form for later use by get_cell_extra_context
self._form = self.get_form(request)
if self._form.is_valid():
parameters = context['parameters'] if 'parameters' in context else {}
context['parameters'] = parameters
parameters.update(self._form.cleaned_data['choice'])
def get_cell_extra_context(self, context):
ctx = super(ParametersCell, self).get_cell_extra_context(context)
if hasattr(self, '_form'):
ctx['form'] = self._form
ctx['title'] = self.title
ctx['url'] = utils.get_templated_url(self.url)
return ctx
def get_default_form_class(self):
from .forms import ParametersCellForm
return ParametersCellForm
@register_cell_class
class ParentContentCell(CellBase):
class Meta:
verbose_name = _('Same as parent')
def get_parents_cells(self, hierarchy):
try:
pages = [Page.objects.get(slug='index', parent=None)]
except Page.DoesNotExist:
pages = []
pages.extend(hierarchy)
if not pages:
return []
if len(pages) > 1 and pages[0].id == pages[1].id:
# don't duplicate index cells for real children of the index page.
pages = pages[1:]
cells_by_page = {}
for page in pages:
cells_by_page[page.id] = []
for cell in list(CellBase.get_cells(placeholder=self.placeholder, page__in=pages)):
cells_by_page[cell.page_id].append(cell)
cells = cells_by_page[pages[-1].id]
for page in reversed(pages[:-1]):
for i, cell in enumerate(cells):
if isinstance(cell, ParentContentCell):
cells[i:i+1] = cells_by_page[page.id]
break
else:
# no more ParentContentCell, stop folloing the parent page chain
break
return cells
def render(self, context):
return ''
class JsonCellBase(CellBase):
url = None
cache_duration = 60
template_string = None
varnames = None
force_async = False
log_errors = True
timeout = None
actions = {}
additional_data = None
# [
# {'key': ...,
# 'url': ...,
# 'cache_duration': ... (optional)
# },
# ...
# ]
first_data_key = 'json'
_json_content = None
class Meta:
abstract = True
def is_visible(self, user=None):
return bool(self.url) and super(JsonCellBase, self).is_visible(user=user)
def get_cell_parameters_context(self):
return {}
def get_cell_extra_context(self, context, invalidate_cache=False):
extra_context = super(JsonCellBase, self).get_cell_extra_context(context)
if context.get('placeholder_search_mode'):
# don't call webservices when we're just looking for placeholders
return extra_context
if self.varnames and context.get('request'):
for varname in self.varnames:
if varname in context['request'].GET:
context[varname] = context['request'].GET[varname]
self._json_content = None
extra_context.update(self.get_cell_parameters_context())
context.update(self.get_cell_parameters_context())
context['concerned_user'] = self.get_concerned_user(context)
data_urls = [{'key': self.first_data_key, 'url': self.url, 'cache_duration': self.cache_duration,
'log_errors': self.log_errors, 'timeout': self.timeout}]
data_urls.extend(self.additional_data or [])
for data_url_dict in data_urls:
extra_context[data_url_dict['key']] = None
for data_url_dict in data_urls:
data_key = data_url_dict['key']
log_errors = data_url_dict.get('log_errors', self.log_errors)
try:
url = utils.get_templated_url(data_url_dict['url'], context)
except utils.TemplateError as e:
logger = logging.getLogger(__name__)
logger.warning('error in templated URL (%s): %s', data_url_dict['url'], e)
continue
extra_context[data_key + '_url'] = url
if not url:
continue
try:
json_response = utils.requests.get(url,
headers={'Accept': 'application/json'},
remote_service='auto',
cache_duration=data_url_dict.get('cache_duration', self.cache_duration),
without_user=True,
raise_if_not_cached=not(context.get('synchronous')),
invalidate_cache=invalidate_cache,
log_errors=log_errors,
timeout=data_url_dict.get('timeout', self.timeout),
)
except requests.RequestException as e:
extra_context[data_key + '_status'] = -1
extra_context[data_key + '_error'] = force_text(e)
extra_context[data_key + '_exception'] = e
logger = logging.getLogger(__name__)
if log_errors:
logger.warning(u'error on request %r: %s', url, force_text(e))
else:
logger.debug(u'error on request %r: %s', url, force_text(e))
continue
extra_context[data_key + '_status'] = json_response.status_code
if json_response.status_code // 100 == 2:
if json_response.status_code != 204: # 204 = No Content
try:
extra_context[data_key] = json.loads(json_response.content)
except ValueError:
extra_context[data_key + '_error'] = 'invalid_json'
logger = logging.getLogger(__name__)
if log_errors:
logger.error('invalid json content (%s)', url)
else:
logger.debug('invalid json content (%s)', url)
continue
elif json_response.headers.get('content-type') == 'application/json':
try:
extra_context[data_key + '_error'] = json.loads(json_response.content)
except ValueError:
extra_context[data_key + '_error'] = 'invalid_json'
# update context with data key so it can be used in future
# templated URLs
context[data_key] = extra_context[data_key]
# keep cache of first response as it may be used to find the
# appropriate template.
self._json_content = extra_context[self.first_data_key]
return extra_context
@property
def template_name(self):
json_content = self._json_content
if json_content is None:
return 'combo/json-error-cell.html'
if json_content.get('data'):
if isinstance(json_content['data'], list):
first_element = json_content['data'][0]
if isinstance(first_element, dict):
if 'url' in first_element and 'text' in first_element:
return 'combo/json-list-cell.html'
return 'combo/json-cell.html'
def post(self, request):
if not 'action' in request.POST:
raise PermissionDenied()
action = request.POST['action']
if not action in self.actions:
raise PermissionDenied()
error_message = self.actions[action].get('error-message')
timeout = self.actions[action].get('timeout', self.timeout)
method = self.actions[action].get('method', 'POST')
logger = logging.getLogger(__name__)
content = {}
for key, value in request.POST.items():
if key == 'action':
continue
if key.endswith('[]'): # array
content[key[:-2]] = request.POST.getlist(key)
else:
content[key] = value
context = copy.copy(content)
context.update(self.get_cell_parameters_context())
context['request'] = request
context['synchronous'] = True
try:
url = utils.get_templated_url(self.actions[action]['url'], context)
except utils.TemplateError as e:
logger.warning('error in templated URL (%s): %s', self.actions[action]['url'], e)
raise PostException(error_message)
json_response = utils.requests.request(
method,
url,
headers={'Accept': 'application/json'},
remote_service='auto',
json=content,
without_user=True)
if json_response.status_code // 100 != 2: # 2xx
logger.error('error POSTing data to URL (%s)', url)
raise PostException(error_message)
if self.cache_duration:
self.get_cell_extra_context(context, invalidate_cache=True)
if self.actions[action].get('response', 'cell') == 'raw':
# response: raw in the config will get the response directly sent
# to the client.
return json_response
# default behaviour, the cell will render itself.
return None
def render(self, context):
if self.force_async and not context.get('synchronous'):
raise NothingInCacheException()
if self.template_string:
tmpl = engines['django'].from_string(self.template_string)
context.update(self.get_cell_extra_context(context))
return tmpl.render(context, context.get('request'))
return super(JsonCellBase, self).render(context)
@register_cell_class
class JsonCell(JsonCellBase):
title = models.CharField(_('Title'), max_length=150, blank=True)
url = models.URLField(_('URL'), blank=True)
template_string = models.TextField(_('Display Template'), blank=True, null=True)
cache_duration = models.PositiveIntegerField(
_('Cache duration'), default=60)
force_async = models.BooleanField(_('Force asynchronous mode'),
default=JsonCellBase.force_async)
varnames_str = models.CharField(_('Variable names'), max_length=200, blank=True,
help_text=_('Comma separated list of query-string variables '
'to be copied in template context'))
timeout = models.PositiveIntegerField(_('Request timeout'), default=0,
help_text=_('In seconds. Use 0 for default system timeout'))
class Meta:
verbose_name = _('JSON Feed')
@property
def varnames(self):
return [vn.strip() for vn in self.varnames_str.split(',') if vn.strip()]
class ConfigJsonCellManager(models.Manager):
def get_queryset(self):
queryset = super(ConfigJsonCellManager, self).get_queryset()
return queryset.filter(key__in=settings.JSON_CELL_TYPES.keys())
@register_cell_class
class ConfigJsonCell(JsonCellBase):
objects = ConfigJsonCellManager()
key = models.CharField(max_length=50)
parameters = JSONField(blank=True)
@classmethod
def get_cell_types(cls):
l = []
for key, definition in settings.JSON_CELL_TYPES.items():
l.append({
'name': definition['name'],
'variant': key,
'group': _('Extra'),
'cell_type_str': cls.get_cell_type_str(),
})
l.sort(key=lambda x: x.get('name'))
return l
def get_label(self):
return settings.JSON_CELL_TYPES[self.key]['name']
def set_variant(self, variant):
self.key = variant
@property
def css_class_names(self):
return super(ConfigJsonCell, self).css_class_names + ' ' + self.key
@property
def ajax_refresh(self):
return settings.JSON_CELL_TYPES[self.key].get('auto_refresh', None)
@property
def url(self):
return settings.JSON_CELL_TYPES[self.key]['url']
@property
def cache_duration(self):
return settings.JSON_CELL_TYPES[self.key].get('cache_duration',
JsonCellBase.cache_duration)
@property
def varnames(self):
return settings.JSON_CELL_TYPES[self.key].get('varnames')
@property
def force_async(self):
return settings.JSON_CELL_TYPES[self.key].get('force_async',
JsonCellBase.force_async)
@property
def log_errors(self):
return settings.JSON_CELL_TYPES[self.key].get('log_errors',
JsonCellBase.log_errors)
@property
def timeout(self):
return settings.JSON_CELL_TYPES[self.key].get('timeout',
JsonCellBase.timeout)
@property
def actions(self):
return settings.JSON_CELL_TYPES[self.key].get('actions',
JsonCellBase.actions)
@property
def additional_data(self):
return settings.JSON_CELL_TYPES[self.key].get('additional-data')
@property
def template_name(self):
return 'combo/json/%s.html' % self.key
@property
def loading_message(self):
return settings.JSON_CELL_TYPES[self.key].get('loading-message',
CellBase.loading_message)
def get_default_form_class(self):
formdef = settings.JSON_CELL_TYPES[self.key].get('form')
if not formdef:
return None
from .forms import ConfigJsonForm
# create a subclass of ConfigJsonForm with 'formdef' (the list of
# fields) as an attribute.
config_form_class = type(str('%sConfigClass' % self.key),
(ConfigJsonForm,), {'formdef': formdef})
return config_form_class
def get_cell_parameters_context(self):
context = copy.copy(self.parameters or {})
context['parameters'] = self.parameters
return context
class ExternalLinkSearchItem(models.Model):
# Link to an external site.
#
# Those are automatically collected during by the "update_index" command,
# that calls get_external_links_data from all available cells, to be used
# by the general search engine.
title = models.CharField(_('Title'), max_length=150)
text = models.TextField(blank=True)
url = models.CharField(_('URL'), max_length=200, blank=True)
last_update_timestamp = models.DateTimeField(auto_now=True)
@receiver(pre_save, sender=Page)
def create_redirects(sender, instance, raw, **kwargs):
if raw or not instance.id or instance.snapshot_id:
return
try:
old_page = Page.objects.get(id=instance.id)
except Page.DoesNotExist:
return
if old_page.slug == instance.slug and old_page.parent_id == instance.parent_id:
return
affected_pages = level_pages = [old_page]
while True:
level_pages = Page.objects.filter(parent_id__in=[x.id for x in level_pages]).select_related('parent')
if len(level_pages) == 0:
break
affected_pages.extend(level_pages)
for page in affected_pages:
Redirect(page=page, old_url=page.get_online_url()).save()
@receiver(post_save)
@receiver(post_delete)
def cell_maintain_page_cell_cache(sender, instance=None, **kwargs):
if not issubclass(sender, CellBase):
return
if not instance.page_id:
return
page = instance.page
page.build_cell_cache()