1077 lines
38 KiB
Python
1077 lines
38 KiB
Python
# combo - content management system
|
|
# Copyright (C) 2014 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import copy
|
|
import feedparser
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import requests
|
|
import subprocess
|
|
import urlparse
|
|
|
|
from django.apps import apps
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import Group
|
|
from django.core.cache import cache
|
|
from django.core.exceptions import ObjectDoesNotExist, ValidationError, PermissionDenied
|
|
from django.core import serializers
|
|
from django.db import models
|
|
from django.db.models.base import ModelBase
|
|
from django.db.models import Max
|
|
from django.forms import models as model_forms
|
|
from django import forms
|
|
from django import template
|
|
from django.utils.html import strip_tags
|
|
from django.utils.safestring import mark_safe
|
|
from django.utils.text import slugify
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from django.forms.widgets import MediaDefiningClass
|
|
from django.template import Context, RequestContext, Template
|
|
from django.test.client import RequestFactory
|
|
|
|
from .fields import RichTextField
|
|
import cmsplugin_blurp.utils
|
|
|
|
from jsonfield import JSONField
|
|
|
|
from .library import register_cell_class, get_cell_classes, get_cell_class
|
|
|
|
from combo import utils
|
|
from combo.utils import NothingInCacheException
|
|
|
|
|
|
class PostException(Exception):
|
|
pass
|
|
|
|
|
|
def element_is_visible(element, user=None):
|
|
if hasattr(element, 'restricted_to_unlogged') and element.restricted_to_unlogged:
|
|
return bool(user is None or user.is_anonymous())
|
|
if element.public:
|
|
return True
|
|
if user is None:
|
|
return False
|
|
if user.is_anonymous():
|
|
return False
|
|
if user.is_superuser:
|
|
return True
|
|
page_groups = element.groups.all()
|
|
if not page_groups:
|
|
# user is logged in, no group restriction in place
|
|
return True
|
|
return len(set(page_groups).intersection(user.groups.all())) > 0
|
|
|
|
|
|
class PageManager(models.Manager):
|
|
def get_by_natural_key(self, path):
|
|
parts = [x for x in path.strip('/').split('/') if x] or ['index']
|
|
return self.get(slug=parts[-1])
|
|
|
|
|
|
class Page(models.Model):
|
|
objects = PageManager()
|
|
|
|
title = models.CharField(_('Title'), max_length=150)
|
|
slug = models.SlugField(_('Slug'))
|
|
template_name = models.CharField(_('Template'), max_length=50)
|
|
parent = models.ForeignKey('self', null=True, blank=True)
|
|
order = models.PositiveIntegerField()
|
|
exclude_from_navigation = models.BooleanField(_('Exclude from navigation'), default=False)
|
|
redirect_url = models.CharField(_('Redirect URL'), max_length=200, blank=True)
|
|
|
|
public = models.BooleanField(_('Public'), default=True)
|
|
groups = models.ManyToManyField(Group, verbose_name=_('Groups'), blank=True)
|
|
last_update_timestamp = models.DateTimeField(auto_now=True)
|
|
|
|
picture = models.ImageField(_('Picture'), upload_to='page-pictures/', null=True)
|
|
|
|
_level = None
|
|
_children = None
|
|
|
|
class Meta:
|
|
ordering = ['order']
|
|
|
|
def __unicode__(self):
|
|
return self.title
|
|
|
|
def natural_key(self):
|
|
return (self.get_online_url().strip('/'), )
|
|
|
|
def save(self, *args, **kwargs):
|
|
if not self.order:
|
|
max_order = Page.objects.all().aggregate(Max('order')).get('order__max') or 0
|
|
self.order = max_order + 1
|
|
if not self.slug:
|
|
if Page.objects.count() == 0:
|
|
slug = 'index'
|
|
else:
|
|
base_slug = slugify(self.title)[:40]
|
|
slug = base_slug.strip('-')
|
|
i = 1
|
|
while True:
|
|
try:
|
|
Page.objects.get(slug=slug)
|
|
except ObjectDoesNotExist:
|
|
break
|
|
i += 1
|
|
slug = '%s-%s' % (base_slug, i)
|
|
self.slug = slug
|
|
if not self.template_name:
|
|
self.template_name = settings.COMBO_DEFAULT_PUBLIC_TEMPLATE
|
|
return super(Page, self).save(*args, **kwargs)
|
|
|
|
def get_parents_and_self(self):
|
|
pages = [self]
|
|
page = self
|
|
while page.parent_id:
|
|
page = page.parent
|
|
pages.append(page)
|
|
return list(reversed(pages))
|
|
|
|
def get_online_url(self):
|
|
parts = [x.slug for x in self.get_parents_and_self()]
|
|
if parts[0] == 'index':
|
|
parts = parts[1:]
|
|
if not parts:
|
|
return '/'
|
|
return '/' + '/'.join(parts) + '/'
|
|
|
|
def get_page_of_level(self, level):
|
|
'''Return page of given level in the page hierarchy.'''
|
|
parts = [self]
|
|
page = self
|
|
while page.parent_id:
|
|
page = page.parent
|
|
parts.append(page)
|
|
parts.reverse()
|
|
try:
|
|
return parts[level]
|
|
except IndexError:
|
|
return None
|
|
|
|
def get_siblings(self):
|
|
return Page.objects.filter(parent=self.parent)
|
|
|
|
def get_children(self):
|
|
return Page.objects.filter(parent_id=self.id)
|
|
|
|
def has_children(self):
|
|
return Page.objects.filter(parent_id=self.id).exists()
|
|
|
|
def get_template_display_name(self):
|
|
return settings.COMBO_PUBLIC_TEMPLATES[self.template_name]['name']
|
|
|
|
def get_next_page(self, user=None):
|
|
pages = Page.get_as_reordered_flat_hierarchy(Page.objects.all())
|
|
this_page = [x for x in pages if x.id == self.id][0]
|
|
pages = pages[pages.index(this_page)+1:]
|
|
for page in pages:
|
|
if page.is_visible(user):
|
|
return page
|
|
return None
|
|
|
|
def get_previous_page(self, user=None):
|
|
pages = Page.get_as_reordered_flat_hierarchy(Page.objects.all())
|
|
pages.reverse()
|
|
this_page = [x for x in pages if x.id == self.id][0]
|
|
pages = pages[pages.index(this_page)+1:]
|
|
for page in pages:
|
|
if page.is_visible(user):
|
|
return page
|
|
return None
|
|
|
|
@classmethod
|
|
def get_as_reordered_flat_hierarchy(cls, object_list):
|
|
reordered = []
|
|
def fill_list(object_sublist, level=0, parent=None):
|
|
for page in object_sublist:
|
|
page._children = [x for x in object_list if x.parent_id == page.id]
|
|
if page.parent == parent:
|
|
page.level = level
|
|
reordered.append(page)
|
|
fill_list(object_sublist, level=level+1, parent=page)
|
|
fill_list(object_list)
|
|
return reordered
|
|
|
|
def visibility(self):
|
|
if self.public:
|
|
return _('Public')
|
|
return _('Private (%s)') % ', '.join([x.name for x in self.groups.all()])
|
|
|
|
def is_visible(self, user=None):
|
|
return element_is_visible(self, user=user)
|
|
|
|
def get_cells(self):
|
|
return CellBase.get_cells(page_id=self.id)
|
|
|
|
def get_serialized_page(self):
|
|
cells = self.get_cells()
|
|
serialized_page = json.loads(serializers.serialize('json', [self],
|
|
use_natural_foreign_keys=True, use_natural_primary_keys=True))[0]
|
|
del serialized_page['model']
|
|
serialized_page['cells'] = json.loads(serializers.serialize('json',
|
|
cells, use_natural_foreign_keys=True, use_natural_primary_keys=True))
|
|
for cell in serialized_page['cells']:
|
|
del cell['pk']
|
|
del cell['fields']['page']
|
|
for key in cell['fields'].keys():
|
|
if key.startswith('cached_'):
|
|
del cell['fields'][key]
|
|
return serialized_page
|
|
|
|
@classmethod
|
|
def load_serialized_page(cls, json_page):
|
|
json_page['model'] = 'data.page'
|
|
page, created = Page.objects.get_or_create(slug=json_page['fields']['slug'])
|
|
json_page['pk'] = page.id
|
|
page = [x for x in serializers.deserialize('json', json.dumps([json_page]))][0]
|
|
page.save()
|
|
for cell in json_page.get('cells'):
|
|
cell['fields']['page'] = page.object.natural_key()
|
|
|
|
# if there were cells, remove them
|
|
for cell in CellBase.get_cells(page_id=page.object.id):
|
|
cell.delete()
|
|
|
|
@classmethod
|
|
def load_serialized_cells(cls, cells):
|
|
# load new cells
|
|
for cell in serializers.deserialize('json', json.dumps(cells)):
|
|
cell.save()
|
|
|
|
@classmethod
|
|
def load_serialized_pages(cls, json_site):
|
|
cells = []
|
|
for json_page in json_site:
|
|
cls.load_serialized_page(json_page)
|
|
cells.extend(json_page.get('cells'))
|
|
cls.load_serialized_cells(cells)
|
|
|
|
# 2nd pass to set parents
|
|
for json_page in json_site:
|
|
if json_page.get('parent_slug'):
|
|
page = Page.objects.get(slug=json_page['fields']['slug'])
|
|
page.parent = Page.objects.get(slug=json_page.get('parent_slug'))
|
|
page.save()
|
|
|
|
@classmethod
|
|
def export_all_for_json(cls):
|
|
ordered_pages = Page.get_as_reordered_flat_hierarchy(cls.objects.all())
|
|
return [x.get_serialized_page() for x in ordered_pages]
|
|
|
|
def get_redirect_url(self):
|
|
return utils.get_templated_url(self.redirect_url)
|
|
|
|
def get_last_update_time(self):
|
|
cells = CellBase.get_cells(page_id=self.id)
|
|
return max([self.last_update_timestamp] + [x.last_update_timestamp for x in cells])
|
|
|
|
|
|
class CellMeta(MediaDefiningClass, ModelBase):
|
|
pass
|
|
|
|
|
|
class CellBase(models.Model):
|
|
__metaclass__ = CellMeta
|
|
|
|
page = models.ForeignKey(Page)
|
|
placeholder = models.CharField(max_length=20)
|
|
order = models.PositiveIntegerField()
|
|
slug = models.SlugField(_('Slug'), blank=True)
|
|
extra_css_class = models.CharField(_('Extra classes for CSS styling'), max_length=100, blank=True)
|
|
|
|
public = models.BooleanField(_('Public'), default=True)
|
|
restricted_to_unlogged = models.BooleanField(
|
|
_('Restrict to unlogged users'), default=False)
|
|
groups = models.ManyToManyField(Group, verbose_name=_('Groups'), blank=True)
|
|
last_update_timestamp = models.DateTimeField(auto_now=True)
|
|
|
|
default_form_class = None
|
|
visible = True
|
|
user_dependant = False
|
|
manager_form_template = 'combo/cell_form.html'
|
|
template_name = None
|
|
|
|
# get_badge(self, context); set to None so cell types can be skipped easily
|
|
get_badge = None
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def __unicode__(self):
|
|
label = unicode(self.get_verbose_name())
|
|
additional_label = self.get_additional_label()
|
|
if label and additional_label:
|
|
return '%s (%s)' % (label, additional_label)
|
|
else:
|
|
return label
|
|
|
|
@classmethod
|
|
def get_verbose_name(cls):
|
|
return cls._meta.verbose_name
|
|
|
|
def get_additional_label(self):
|
|
return ''
|
|
|
|
@property
|
|
def css_class_names(self):
|
|
return self.__class__.__name__.lower() + ' ' + self.extra_css_class
|
|
|
|
@classmethod
|
|
def get_all_cell_types(cls):
|
|
cell_types = []
|
|
for klass in get_cell_classes():
|
|
if not klass.is_enabled():
|
|
continue
|
|
if klass.visible is False:
|
|
continue
|
|
cell_types.extend(klass.get_cell_types())
|
|
return cell_types
|
|
|
|
@classmethod
|
|
def get_cells(cls, cell_filter=None, **kwargs):
|
|
"""Returns the list of cells of various classes matching **kwargs"""
|
|
cells = []
|
|
for klass in get_cell_classes():
|
|
if cell_filter and not cell_filter(klass):
|
|
continue
|
|
cells.extend(klass.objects.filter(**kwargs))
|
|
cells.sort(lambda x, y: cmp(x.order, y.order))
|
|
return cells
|
|
|
|
def get_reference(self):
|
|
"Returns a string that can serve as a unique reference to a cell"""
|
|
return str('%s-%s' % (self.get_cell_type_str(), self.id))
|
|
|
|
@classmethod
|
|
def get_cell(cls, reference, **kwargs):
|
|
"""Returns the cell matching reference, and eventual **kwargs"""
|
|
content_id, cell_id = reference.split('-')
|
|
try:
|
|
klass = get_cell_class(content_id)
|
|
except KeyError:
|
|
raise ObjectDoesNotExist()
|
|
return klass.objects.get(id=cell_id, **kwargs)
|
|
|
|
@classmethod
|
|
def get_cell_type_str(cls):
|
|
return '%s_%s' % (cls._meta.app_label, cls._meta.model_name)
|
|
|
|
@classmethod
|
|
def get_cell_type_group(cls):
|
|
return apps.get_app_config(cls._meta.app_label).verbose_name
|
|
|
|
@classmethod
|
|
def get_cell_types(cls):
|
|
return [{
|
|
'name': cls.get_verbose_name(),
|
|
'cell_type_str': cls.get_cell_type_str(),
|
|
'group': cls.get_cell_type_group(),
|
|
'variant': 'default',
|
|
'order': 0,
|
|
}]
|
|
|
|
@classmethod
|
|
def is_enabled(cls):
|
|
"""Defines if the cell type is enabled for the given site; this is used
|
|
to selectively enable cells from extension modules."""
|
|
return True
|
|
|
|
def set_variant(self, variant):
|
|
pass
|
|
|
|
def get_label(self):
|
|
return self.get_verbose_name()
|
|
|
|
def get_default_form_class(self):
|
|
if self.default_form_class:
|
|
return self.default_form_class
|
|
|
|
fields = [x.name for x in self._meta.local_concrete_fields
|
|
if x.name not in ('id', 'page', 'placeholder', 'order',
|
|
'public', 'groups', 'slug',
|
|
'extra_css_class', 'last_update_timestamp',
|
|
'restricted_to_unlogged')]
|
|
if not fields:
|
|
return None
|
|
|
|
return model_forms.modelform_factory(self.__class__, fields=fields)
|
|
|
|
def get_options_form_class(self):
|
|
return model_forms.modelform_factory(self.__class__,
|
|
fields=['slug', 'extra_css_class'])
|
|
|
|
def get_visibility_form_class(self):
|
|
# generate a form with the 'public' model attribute inverted to create
|
|
# a 'Private' checkbox.
|
|
class OppositeBooleanField(forms.BooleanField):
|
|
def prepare_value(self, value):
|
|
return not value # toggle the value when loaded from the model
|
|
|
|
def to_python(self, value):
|
|
value = super(OppositeBooleanField, self).to_python(value)
|
|
return not value
|
|
|
|
def formfield_callback(field):
|
|
if field.name == 'public':
|
|
return OppositeBooleanField(
|
|
label=_('Restrict to logged-in users'),
|
|
required=False)
|
|
return field.formfield()
|
|
|
|
return model_forms.modelform_factory(self.__class__,
|
|
fields=['restricted_to_unlogged', 'public', 'groups'],
|
|
formfield_callback=formfield_callback)
|
|
|
|
def get_extra_manager_context(self):
|
|
return {}
|
|
|
|
def is_visible(self, user=None):
|
|
return element_is_visible(self, user=user)
|
|
|
|
def is_relevant(self, context):
|
|
'''Return whether it's relevant to render this cell in the page
|
|
context.'''
|
|
return True
|
|
|
|
def is_user_dependant(self, context):
|
|
'''Return whether the cell content varies from user to user.'''
|
|
return self.user_dependant
|
|
|
|
def get_concerned_user(self, context):
|
|
'''Return user from UserSearch cell, or connected user.'''
|
|
return context.get('selected_user') or getattr(context.get('request'), 'user', None)
|
|
|
|
def get_cell_extra_context(self, context):
|
|
return {'cell': self}
|
|
|
|
def render(self, context):
|
|
context.update(self.get_cell_extra_context(context))
|
|
template_names = ['combo/' + self._meta.model_name + '.html']
|
|
base_template_name = self._meta.model_name + '.html'
|
|
if self.template_name:
|
|
base_template_name = os.path.basename(self.template_name)
|
|
template_names.append(self.template_name)
|
|
if self.slug:
|
|
template_names.append('combo/cells/%s/%s' % (self.slug, base_template_name))
|
|
template_names.reverse()
|
|
tmpl = template.loader.select_template(template_names)
|
|
return tmpl.render(context)
|
|
|
|
def modify_global_context(self, context, request=None):
|
|
'''Apply changes to the template context that must visible to all cells in the page'''
|
|
return context
|
|
|
|
def render_for_search(self):
|
|
if not self.is_enabled():
|
|
return ''
|
|
if self.user_dependant:
|
|
return ''
|
|
if not self.page.is_visible(user=None):
|
|
return ''
|
|
if not self.is_visible(user=None):
|
|
return ''
|
|
context = RequestContext(RequestFactory().get(self.page.get_online_url()), {'synchronous': True})
|
|
context['request'] = context.request # for compatibility
|
|
context['user'] = None # compat
|
|
if not self.is_relevant(context):
|
|
return ''
|
|
from HTMLParser import HTMLParser
|
|
return HTMLParser().unescape(strip_tags(self.render(context)))
|
|
|
|
|
|
@register_cell_class
|
|
class TextCell(CellBase):
|
|
text = RichTextField(_('Text'), blank=True, null=True)
|
|
|
|
template_name = 'combo/text-cell.html'
|
|
|
|
class Meta:
|
|
verbose_name = _('Text')
|
|
|
|
def is_relevant(self, context):
|
|
return bool(self.text)
|
|
|
|
def get_additional_label(self):
|
|
if not self.text:
|
|
return None
|
|
return utils.ellipsize(self.text)
|
|
|
|
@classmethod
|
|
def get_cell_types(cls):
|
|
d = super(TextCell, cls).get_cell_types()
|
|
d[0]['order'] = -1
|
|
return d
|
|
|
|
|
|
@register_cell_class
|
|
class FortuneCell(CellBase):
|
|
ajax_refresh = 30
|
|
|
|
class Meta:
|
|
verbose_name = _('Fortune')
|
|
|
|
def render(self, context):
|
|
return subprocess.check_output(['fortune'])
|
|
|
|
@classmethod
|
|
def is_enabled(cls):
|
|
try:
|
|
subprocess.check_output(['fortune'])
|
|
except OSError:
|
|
return False
|
|
return settings.DEBUG
|
|
|
|
|
|
@register_cell_class
|
|
class UnlockMarkerCell(CellBase):
|
|
# XXX: this is kept to smooth transitions, it should be removed once all
|
|
# sites # have been migrated to ParentContentCell
|
|
"""Marks an 'acquired' placeholder as unlocked."""
|
|
visible = False
|
|
|
|
class Meta:
|
|
verbose_name = _('Unlock Marker')
|
|
|
|
def render(self, context):
|
|
return ''
|
|
|
|
@register_cell_class
|
|
class BlurpCell(CellBase):
|
|
blurp_key = models.CharField(max_length=50)
|
|
|
|
@classmethod
|
|
def get_cell_types(cls):
|
|
try:
|
|
blurp_renderers = settings.CMS_PLUGIN_BLURP_RENDERERS
|
|
except AttributeError:
|
|
return []
|
|
l = []
|
|
for blurp_key, blurp_value in blurp_renderers.items():
|
|
if blurp_value.get('private'):
|
|
continue
|
|
l.append({
|
|
'name': blurp_value.get('name'),
|
|
'cell_type_str': cls.get_cell_type_str(),
|
|
'group': _('Extra'),
|
|
'variant': blurp_key,
|
|
})
|
|
l.sort(lambda x, y: cmp(x.get('name'), y.get('name')))
|
|
return l
|
|
|
|
def get_label(self):
|
|
return settings.CMS_PLUGIN_BLURP_RENDERERS[self.blurp_key]['name']
|
|
|
|
def set_variant(self, variant):
|
|
self.blurp_key = variant
|
|
|
|
def render(self, context):
|
|
if settings.CMS_PLUGIN_BLURP_RENDERERS[self.blurp_key].get('ajax') and not context.get('synchronous'):
|
|
raise NothingInCacheException()
|
|
renderer = cmsplugin_blurp.utils.resolve_renderer(self.blurp_key)
|
|
template = renderer.render_template()
|
|
context = renderer.render(context)
|
|
return template.render(context)
|
|
|
|
def get_default_form_class(self):
|
|
return None
|
|
|
|
|
|
@register_cell_class
|
|
class MenuCell(CellBase):
|
|
depth = models.PositiveIntegerField(_('Depth'),
|
|
choices=[(i, i) for i in range(1, 3)], default=1, null=False)
|
|
initial_level = models.IntegerField(_('Initial Level'),
|
|
choices=[(-1, _('Same as page'))] +
|
|
[(i, i) for i in range(1, 3)], default=-1, null=False)
|
|
root_page = models.ForeignKey(Page, related_name='root_page',
|
|
null=True, blank=True, verbose_name=_('Root Page'))
|
|
|
|
template_name = 'combo/menu-cell.html'
|
|
|
|
class Meta:
|
|
verbose_name = _('Menu')
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import MenuCellForm
|
|
return MenuCellForm
|
|
|
|
def get_cell_extra_context(self, context):
|
|
from combo.public.menu import render_menu
|
|
ctx = super(MenuCell, self).get_cell_extra_context(context)
|
|
ctx['menu'] = render_menu(context, level=self.initial_level,
|
|
root_page=self.root_page, depth=self.depth)
|
|
return ctx
|
|
|
|
def render_for_search(self):
|
|
return ''
|
|
|
|
|
|
@register_cell_class
|
|
class LinkCell(CellBase):
|
|
title = models.CharField(_('Title'), max_length=150, blank=True)
|
|
url = models.URLField(_('URL'), blank=True)
|
|
link_page = models.ForeignKey('data.Page', related_name='link_cell', blank=True,
|
|
null=True, verbose_name=_('Internal link'))
|
|
anchor = models.CharField(_('Anchor'), max_length=150, blank=True)
|
|
|
|
template_name = 'combo/link-cell.html'
|
|
|
|
class Meta:
|
|
verbose_name = _('Link')
|
|
|
|
def get_additional_label(self):
|
|
title = self.title
|
|
if not title and self.link_page:
|
|
title = self.link_page.title
|
|
if not title:
|
|
return None
|
|
return utils.ellipsize(title)
|
|
|
|
def get_cell_extra_context(self, context):
|
|
render_skeleton = context.get('render_skeleton')
|
|
request = context.get('request')
|
|
context = super(LinkCell, self).get_cell_extra_context(context)
|
|
if self.link_page:
|
|
context['url'] = self.link_page.get_online_url()
|
|
context['title'] = self.title or self.link_page.title
|
|
else:
|
|
context['url'] = utils.get_templated_url(self.url)
|
|
context['title'] = self.title or self.url
|
|
if self.anchor:
|
|
context['url'] += '#' + self.anchor
|
|
if render_skeleton and not urlparse.urlparse(context['url']).netloc:
|
|
# create full URL when used in a skeleton
|
|
context['url'] = request.build_absolute_uri(context['url'])
|
|
return context
|
|
|
|
def get_default_form_class(self):
|
|
from forms import LinkCellForm
|
|
return LinkCellForm
|
|
|
|
|
|
@register_cell_class
|
|
class FeedCell(CellBase):
|
|
title = models.CharField(_('Title'), max_length=150, blank=True)
|
|
url = models.URLField(_('URL'), blank=True)
|
|
limit = models.PositiveSmallIntegerField(_('Maximum number of entries'),
|
|
null=True, blank=True)
|
|
|
|
template_name = 'combo/feed-cell.html'
|
|
|
|
class Meta:
|
|
verbose_name = _('RSS/Atom Feed')
|
|
|
|
def is_visible(self, user=None):
|
|
return bool(self.url) and super(FeedCell, self).is_visible(user=user)
|
|
|
|
def get_cell_extra_context(self, context):
|
|
context = super(FeedCell, self).get_cell_extra_context(context)
|
|
cache_key = hashlib.md5(self.url).hexdigest()
|
|
feed_content = cache.get(cache_key)
|
|
if not feed_content:
|
|
feed_response = requests.get(utils.get_templated_url(self.url))
|
|
if feed_response.status_code == 200:
|
|
feed_content = feed_response.content
|
|
cache.set(cache_key, feed_content, 600)
|
|
if feed_content:
|
|
context['feed'] = feedparser.parse(feed_content)
|
|
if self.limit:
|
|
context['feed']['entries'] = context['feed']['entries'][:self.limit]
|
|
if not self.title:
|
|
try:
|
|
self.title = context['feed']['feed'].title
|
|
except KeyError:
|
|
pass
|
|
return context
|
|
|
|
def render(self, context):
|
|
cache_key = hashlib.md5(self.url).hexdigest()
|
|
feed_content = cache.get(cache_key)
|
|
if not context.get('synchronous') and feed_content is None:
|
|
raise NothingInCacheException()
|
|
return super(FeedCell, self).render(context)
|
|
|
|
|
|
@register_cell_class
|
|
class ParametersCell(CellBase):
|
|
title = models.CharField(_('Title'), max_length=150, blank=True)
|
|
url = models.URLField(_('URL'), blank=True)
|
|
empty_label = models.CharField(_('Empty label'), max_length=64, default='---')
|
|
parameters = JSONField(_('Parameters'), blank=True,
|
|
help_text=_('Must be a JSON list, containing dictionaries with 3 keys: '
|
|
'name, value and optionnally roles; name must be a string, '
|
|
'value must be a dictionary and roles must a list of role '
|
|
'names. Role names limit the visibility of the choice.'))
|
|
|
|
template_name = 'combo/parameters-cell.html'
|
|
|
|
class Meta:
|
|
verbose_name = _('Parameters')
|
|
|
|
def get_additional_label(self):
|
|
return self.title
|
|
|
|
def is_visible(self, user=None):
|
|
return bool(self.parameters) and super(ParametersCell, self).is_visible(user=user)
|
|
|
|
def validate_schema(self, value):
|
|
if not isinstance(value, list):
|
|
return False, _('it must be a list')
|
|
if not all(isinstance(x, dict) for x in value):
|
|
return False, _('it must be a list of dictionaries')
|
|
if not all(set(x.keys()) <= set(['roles', 'name', 'value']) for x in value):
|
|
return False, _('permitted keys in the dictionaries are name, roles and value')
|
|
for x in value:
|
|
if 'roles' not in x:
|
|
continue
|
|
if not isinstance(x['roles'], list):
|
|
return False, _('roles must be a list')
|
|
if not all(isinstance(y, unicode) for y in x['roles']):
|
|
return False, _('roles must be a list of strings')
|
|
if len(set(x['roles'])) != len(x['roles']):
|
|
return False, _('role\'s names must be unique in a list of roles')
|
|
existing = Group.objects.filter(name__in=x['roles']).values_list('name', flat=True)
|
|
if len(existing) != len(x['roles']):
|
|
l = u', '.join(set(x['roles']) - set(existing))
|
|
return False, _('role(s) %s do(es) not exist') % l
|
|
if not all(isinstance(x['name'], unicode) for x in value):
|
|
return False, _('name must be a string')
|
|
if not all(isinstance(x['value'], dict) for x in value):
|
|
return False, ('value must be a dictionary')
|
|
if not len(set(x['name'] for x in value)) == len(value):
|
|
return False, _('names must be unique')
|
|
return True, ''
|
|
|
|
def clean(self):
|
|
validated, msg = self.validate_schema(self.parameters)
|
|
if not validated:
|
|
raise ValidationError(_('Parameters does not validate the expected schema: %s') % msg)
|
|
|
|
def get_form(self, request):
|
|
from .forms import ParametersForm
|
|
if not request.user.is_anonymous():
|
|
groups = set(request.user.groups.values_list('name', flat=True))
|
|
else:
|
|
groups = set()
|
|
parameters = [param for param in self.parameters
|
|
if not param.get('roles') or set(param['roles']) & groups]
|
|
return ParametersForm(request.GET, parameters=parameters,
|
|
empty_label=self.empty_label,
|
|
prefix='parameters-cells-' + str(self.pk))
|
|
|
|
def modify_global_context(self, context, request):
|
|
if not bool(self.parameters):
|
|
return
|
|
# Store form for later use by get_cell_extra_context
|
|
self._form = self.get_form(request)
|
|
if self._form.is_valid():
|
|
parameters = context['parameters'] if 'parameters' in context else {}
|
|
context['parameters'] = parameters
|
|
parameters.update(self._form.cleaned_data['choice'])
|
|
|
|
def get_cell_extra_context(self, context):
|
|
ctx = super(ParametersCell, self).get_cell_extra_context(context)
|
|
if hasattr(self, '_form'):
|
|
ctx['form'] = self._form
|
|
ctx['title'] = self.title
|
|
ctx['url'] = utils.get_templated_url(self.url)
|
|
return ctx
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import ParametersCellForm
|
|
return ParametersCellForm
|
|
|
|
|
|
@register_cell_class
|
|
class ParentContentCell(CellBase):
|
|
class Meta:
|
|
verbose_name = _('Same as parent')
|
|
|
|
def get_cells(self):
|
|
if self.page.parent:
|
|
parent_page = self.page.parent
|
|
elif self.page.slug != 'index':
|
|
try:
|
|
parent_page = Page.objects.get(slug='index', parent=None)
|
|
except Page.DoesNotExist:
|
|
return []
|
|
else:
|
|
return []
|
|
cells = CellBase.get_cells(placeholder=self.placeholder, page=parent_page)
|
|
for i, cell in enumerate(cells):
|
|
if not isinstance(cell, ParentContentCell):
|
|
continue
|
|
cells[i:i+1] = cell.get_cells()
|
|
break
|
|
return cells
|
|
|
|
def render(self, context):
|
|
return ''
|
|
|
|
|
|
class JsonCellBase(CellBase):
|
|
url = None
|
|
cache_duration = 60
|
|
template_string = None
|
|
varnames = None
|
|
force_async = False
|
|
log_errors = True
|
|
actions = {}
|
|
additional_data = None
|
|
# [
|
|
# {'key': ...,
|
|
# 'url': ...,
|
|
# 'cache_duration': ... (optional)
|
|
# },
|
|
# ...
|
|
# ]
|
|
|
|
_json_content = None
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def is_visible(self, user=None):
|
|
return bool(self.url) and super(JsonCellBase, self).is_visible(user=user)
|
|
|
|
def get_cell_extra_context(self, context, invalidate_cache=False):
|
|
extra_context = super(JsonCellBase, self).get_cell_extra_context(context)
|
|
if self.varnames and context.get('request'):
|
|
for varname in self.varnames:
|
|
if varname in context['request'].GET and varname not in context:
|
|
context[varname] = context['request'].GET[varname]
|
|
self._json_content = None
|
|
|
|
data_urls = [{'key': 'json', 'url': self.url, 'cache_duration': self.cache_duration,
|
|
'log_errors': self.log_errors}]
|
|
data_urls.extend(self.additional_data or [])
|
|
|
|
for data_url_dict in data_urls:
|
|
extra_context[data_url_dict['key']] = None
|
|
|
|
for data_url_dict in data_urls:
|
|
data_key = data_url_dict['key']
|
|
try:
|
|
url = utils.get_templated_url(data_url_dict['url'], context)
|
|
except utils.UnknownTemplateVariableError:
|
|
logger = logging.getLogger(__name__)
|
|
logger.warning('unknown variable in template URL (%s)', self.url)
|
|
continue
|
|
json_response = utils.requests.get(url,
|
|
headers={'Accept': 'application/json'},
|
|
remote_service='auto',
|
|
cache_duration=data_url_dict.get('cache_duration', self.cache_duration),
|
|
without_user=True,
|
|
raise_if_not_cached=not(context.get('synchronous')),
|
|
invalidate_cache=invalidate_cache,
|
|
log_errors=data_url_dict.get('log_errors', self.log_errors),
|
|
)
|
|
extra_context[data_key + '_url'] = url
|
|
extra_context[data_key + '_status'] = json_response.status_code
|
|
if json_response.status_code // 100 == 2:
|
|
if json_response.status_code != 204: # 204 = No Content
|
|
try:
|
|
extra_context[data_key] = json.loads(json_response.content)
|
|
except ValueError:
|
|
extra_context[data_key + '_error'] = 'invalid_json'
|
|
logger = logging.getLogger(__name__)
|
|
logger.error('invalid json content (%s)', url)
|
|
continue
|
|
elif json_response.headers.get('content-type') == 'application/json':
|
|
try:
|
|
extra_context[data_key + '_error'] = json.loads(json_response.content)
|
|
except ValueError:
|
|
extra_context[data_key + '_error'] = 'invalid_json'
|
|
|
|
# update context with data key so it can be used in future
|
|
# templated URLs
|
|
context[data_key] = extra_context[data_key]
|
|
|
|
# keep cache of first response as it may be used to find the
|
|
# appropriate template.
|
|
self._json_content = extra_context['json']
|
|
|
|
return extra_context
|
|
|
|
@property
|
|
def template_name(self):
|
|
json_content = self._json_content
|
|
if json_content is None:
|
|
return 'combo/json-error-cell.html'
|
|
if json_content.get('data'):
|
|
if isinstance(json_content['data'], list):
|
|
first_element = json_content['data'][0]
|
|
if isinstance(first_element, dict):
|
|
if 'url' in first_element and 'text' in first_element:
|
|
return 'combo/json-list-cell.html'
|
|
return 'combo/json-cell.html'
|
|
|
|
def post(self, request):
|
|
if not 'action' in request.POST:
|
|
raise PermissionDenied()
|
|
action = request.POST['action']
|
|
if not action in self.actions:
|
|
raise PermissionDenied()
|
|
|
|
error_message = self.actions[action].get('error-message')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
content = {}
|
|
for key, value in request.POST.items():
|
|
if key == 'action':
|
|
continue
|
|
content[key] = value
|
|
|
|
context_dict = copy.copy(content)
|
|
context_dict['request'] = request
|
|
context_dict['synchronous'] = True
|
|
context = RequestContext(request, context_dict)
|
|
|
|
try:
|
|
url = utils.get_templated_url(self.actions[action]['url'], context)
|
|
except utils.UnknownTemplateVariableError:
|
|
logger.warning('unknown variable in URL (%s)', self.actions[action]['url'])
|
|
raise PostException(error_message)
|
|
|
|
json_response = utils.requests.post(url,
|
|
headers={'Accept': 'application/json'},
|
|
remote_service='auto',
|
|
json=content,
|
|
without_user=True)
|
|
|
|
if json_response.status_code // 100 != 2: # 2xx
|
|
logger.error('error POSTing data to URL (%s)', url)
|
|
raise PostException(error_message)
|
|
|
|
if self.cache_duration:
|
|
self.get_cell_extra_context(context, invalidate_cache=True)
|
|
|
|
def render(self, context):
|
|
if self.force_async and not context.get('synchronous'):
|
|
raise NothingInCacheException()
|
|
if self.template_string:
|
|
tmpl = Template(self.template_string)
|
|
context.update(self.get_cell_extra_context(context))
|
|
return tmpl.render(context)
|
|
return super(JsonCellBase, self).render(context)
|
|
|
|
|
|
@register_cell_class
|
|
class JsonCell(JsonCellBase):
|
|
title = models.CharField(_('Title'), max_length=150, blank=True)
|
|
url = models.URLField(_('URL'), blank=True)
|
|
template_string = models.TextField(_('Template'), blank=True, null=True)
|
|
cache_duration = models.PositiveIntegerField(
|
|
_('Cache duration'), default=60)
|
|
force_async = models.BooleanField(_('Force asynchronous mode'),
|
|
default=JsonCellBase.force_async)
|
|
varnames_str = models.CharField(_('Variable names'), max_length=200, blank=True,
|
|
help_text=_('Comma separated list of query-string variables '
|
|
'to be copied in template context'))
|
|
|
|
class Meta:
|
|
verbose_name = _('JSON Feed')
|
|
|
|
@property
|
|
def varnames(self):
|
|
return [vn.strip() for vn in self.varnames_str.split(',') if vn.strip()]
|
|
|
|
|
|
@register_cell_class
|
|
class ConfigJsonCell(JsonCellBase):
|
|
key = models.CharField(max_length=50)
|
|
parameters = JSONField(blank=True)
|
|
|
|
@classmethod
|
|
def get_cell_types(cls):
|
|
l = []
|
|
for key, definition in settings.JSON_CELL_TYPES.items():
|
|
l.append({
|
|
'name': definition['name'],
|
|
'variant': key,
|
|
'group': _('Extra'),
|
|
'cell_type_str': cls.get_cell_type_str(),
|
|
})
|
|
l.sort(lambda x, y: cmp(x.get('name'), y.get('name')))
|
|
return l
|
|
|
|
def get_label(self):
|
|
return settings.JSON_CELL_TYPES[self.key]['name']
|
|
|
|
def set_variant(self, variant):
|
|
self.key = variant
|
|
|
|
@property
|
|
def ajax_refresh(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('auto_refresh', None)
|
|
|
|
@property
|
|
def url(self):
|
|
return settings.JSON_CELL_TYPES[self.key]['url']
|
|
|
|
@property
|
|
def cache_duration(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('cache_duration',
|
|
JsonCellBase.cache_duration)
|
|
|
|
@property
|
|
def varnames(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('varnames')
|
|
|
|
@property
|
|
def force_async(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('force_async',
|
|
JsonCellBase.force_async)
|
|
|
|
@property
|
|
def log_errors(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('log_errors',
|
|
JsonCellBase.log_errors)
|
|
|
|
@property
|
|
def actions(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('actions',
|
|
JsonCellBase.actions)
|
|
|
|
@property
|
|
def additional_data(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('additional-data')
|
|
|
|
@property
|
|
def template_name(self):
|
|
return 'combo/json/%s.html' % self.key
|
|
|
|
def get_default_form_class(self):
|
|
formdef = settings.JSON_CELL_TYPES[self.key].get('form')
|
|
if not formdef:
|
|
return None
|
|
from .forms import ConfigJsonForm
|
|
# create a subclass of ConfigJsonForm with 'formdef' (the list of
|
|
# fields) as an attribute.
|
|
config_form_class = type(str('%sConfigClass' % self.key),
|
|
(ConfigJsonForm,), {'formdef': formdef})
|
|
return config_form_class
|
|
|
|
def get_cell_extra_context(self, context, **kwargs):
|
|
context.update(copy.copy(self.parameters)) # early push for templated URLs
|
|
ctx = super(ConfigJsonCell, self).get_cell_extra_context(context, **kwargs)
|
|
ctx['parameters'] = self.parameters
|
|
ctx.update(self.parameters)
|
|
return ctx
|