combo/combo/data/models.py

1845 lines
66 KiB
Python

# combo - content management system
# Copyright (C) 2014 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import copy
import datetime
import feedparser
import hashlib
import html
import json
import logging
import os
import re
import requests
import subprocess
from django.apps import apps
from django.conf import settings
from django.contrib import messages
from django.contrib.auth.models import Group
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.fields import GenericRelation
from django.contrib.contenttypes.models import ContentType
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist, ValidationError, PermissionDenied
from django.core import serializers
from django.db import models, transaction
from django.db.models.base import ModelBase
from django.db.models.signals import pre_save, post_save, post_delete
from django.db.models import Max, Q
from django.dispatch import receiver
from django.forms import models as model_forms
from django import forms
from django import template
from django.utils import six, timezone
from django.utils.encoding import python_2_unicode_compatible, force_text, smart_bytes
from django.utils.html import strip_tags
from django.utils.safestring import mark_safe
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.text import slugify
from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _
from django.forms.widgets import MediaDefiningClass
from django.template import Context, engines, TemplateDoesNotExist, TemplateSyntaxError
from django.test.client import RequestFactory
from .fields import RichTextField, TemplatableURLField
from jsonfield import JSONField
from .library import register_cell_class, get_cell_classes, get_cell_class
from combo import utils
from combo.utils import NothingInCacheException
class PostException(Exception):
pass
def element_is_visible(element, user=None):
if element.public:
if getattr(element, 'restricted_to_unlogged', None) is True:
return (user is None or user.is_anonymous)
return True
if user is None or user.is_anonymous:
return False
if user.is_superuser:
return True
page_groups = element.groups.all()
if not page_groups:
groups_ok = True
else:
groups_ok = len(set(page_groups).intersection(user.groups.all())) > 0
if getattr(element, 'restricted_to_unlogged', None) is True:
return not(groups_ok)
return groups_ok
def django_template_validator(value):
try:
engines['django'].from_string(value)
except TemplateSyntaxError as e:
raise ValidationError(_('syntax error: %s') % e)
class Placeholder(object):
def __init__(self, key, name=None, acquired=False, optional=False,
render=True, cell=None, force_synchronous=False):
self.key = key
self.name = name
self.acquired = acquired
self.optional = optional
self.render = render
self.cell = cell
self.force_synchronous = force_synchronous
def get_name(self):
if self.cell:
return '%s / %s' % (self.cell.get_label(), self.name)
return self.name
class PageManager(models.Manager):
snapshots = False
def __init__(self, *args, **kwargs):
self.snapshots = kwargs.pop('snapshots', False)
super(PageManager, self).__init__(*args, **kwargs)
def get_by_natural_key(self, path):
parts = [x for x in path.strip('/').split('/') if x] or ['index']
return self.get(slug=parts[-1])
def get_queryset(self):
queryset = super(PageManager, self).get_queryset()
if self.snapshots:
return queryset.filter(snapshot__isnull=False)
else:
return queryset.filter(snapshot__isnull=True)
@python_2_unicode_compatible
class Page(models.Model):
objects = PageManager()
snapshots = PageManager(snapshots=True)
title = models.CharField(_('Title'), max_length=150)
slug = models.SlugField(_('Slug'))
sub_slug = models.CharField(_('Sub Slug'), max_length=150, blank=True,
help_text=_('Regular expression to create variadic subpages. '
'Matching named groups are exposed as context variables.'
))
description = models.TextField(_('Description'), blank=True)
template_name = models.CharField(_('Template'), max_length=50)
parent = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True)
order = models.PositiveIntegerField()
exclude_from_navigation = models.BooleanField(_('Exclude from navigation'), default=True)
redirect_url = models.CharField(_('Redirect URL'), max_length=200, blank=True)
public = models.BooleanField(_('Public'), default=True)
groups = models.ManyToManyField(Group, verbose_name=_('Groups'), blank=True)
creation_timestamp = models.DateTimeField(default=now)
last_update_timestamp = models.DateTimeField(auto_now=True)
picture = models.ImageField(_('Picture'), upload_to='page-pictures/', null=True)
# mark temporarily restored snapshots, it is required to save objects
# (pages and cells) for real for viewing past snapshots as many cells are
# asynchronously loaded and must refer to a real Page object.
snapshot = models.ForeignKey('PageSnapshot', on_delete=models.CASCADE, null=True,
related_name='temporary_page')
# keep a cached list of cell types that are used in the pages.
related_cells = JSONField(blank=True)
_level = None
class Meta:
ordering = ['order']
def __str__(self):
return self.title
def natural_key(self):
return (self.get_online_url().strip('/'), )
def picture_extension(self):
if not self.picture:
return None
return os.path.splitext(self.picture.name)[-1]
def save(self, *args, **kwargs):
if not self.id:
self.related_cells = {'cell_types': []}
if not self.order:
max_order = Page.objects.all().aggregate(Max('order')).get('order__max') or 0
self.order = max_order + 1
if not self.slug:
if not Page.objects.exists():
slug = 'index'
else:
base_slug = slugify(self.title)[:40]
slug = base_slug.strip('-')
i = 1
while Page.objects.filter(slug=slug, parent_id=self.parent_id).exists():
i += 1
slug = '%s-%s' % (base_slug, i)
self.slug = slug
if not self.template_name:
self.template_name = settings.COMBO_DEFAULT_PUBLIC_TEMPLATE
return super(Page, self).save(*args, **kwargs)
def get_parents_and_self(self):
pages = [self]
page = self
while page.parent_id:
page = page._parent if hasattr(page, '_parent') else page.parent
pages.append(page)
return list(reversed(pages))
def get_online_url(self):
parts = [x.slug for x in self.get_parents_and_self()]
if parts[0] == 'index':
parts = parts[1:]
if not parts:
return '/'
return '/' + '/'.join(parts) + '/'
def get_page_of_level(self, level):
'''Return page of given level in the page hierarchy.'''
parts = [self]
page = self
while page.parent_id:
page = page._parent if hasattr(page, '_parent') else page.parent
parts.append(page)
parts.reverse()
try:
return parts[level]
except IndexError:
return None
def get_siblings(self):
if hasattr(self, '_parent'):
if self._parent:
return self._parent._children
return Page.objects.filter(parent_id=self.parent_id)
def get_children(self):
if hasattr(self, '_children'):
return self._children
return Page.objects.filter(parent_id=self.id)
def has_children(self):
if hasattr(self, '_children'):
return bool(self._children)
return Page.objects.filter(parent_id=self.id).exists()
def get_descendants_and_me(self):
def get_descendant_pages(page):
descendants = [page]
for item in page.get_children():
descendants.extend(get_descendant_pages(item))
return descendants
return Page.objects.filter(id__in=[x.id for x in get_descendant_pages(self)])
def get_template_display_name(self):
try:
return settings.COMBO_PUBLIC_TEMPLATES[self.template_name]['name']
except KeyError:
return _('Unknown (%s)') % self.template_name
def missing_template(self):
template_name = settings.COMBO_PUBLIC_TEMPLATES.get(self.template_name, {}).get('template')
if not template_name:
return True
try:
template.loader.select_template([template_name])
except TemplateDoesNotExist:
return True
return False
def get_placeholders(self, request, traverse_cells=False, template_name=None):
placeholders = []
page_template = settings.COMBO_PUBLIC_TEMPLATES.get(template_name or self.template_name, {})
if page_template.get('placeholders'):
# manual declaration
for key, options in page_template['placeholders'].items():
placeholders.append(Placeholder(key=key, **options))
return placeholders
template_names = []
if page_template.get('template'):
template_names.append(page_template['template'])
template_names.append('combo/page_template.html')
tmpl = template.loader.select_template(template_names)
request = RequestFactory(SERVER_NAME=request.get_host()).get(self.get_online_url())
request.user = None
context = {
'page': self,
'request': request,
'synchronous': True,
'placeholder_search_mode': True,
'placeholders': placeholders,
'traverse_cells': traverse_cells,
}
tmpl.render(context, request)
return placeholders
def get_next_page(self, user=None, check_visibility=True):
pages = Page.get_as_reordered_flat_hierarchy(Page.objects.all())
this_page = [x for x in pages if x.id == self.id][0]
pages = pages[pages.index(this_page)+1:]
for page in pages:
if not check_visibility or page.is_visible(user):
return page
return None
def get_previous_page(self, user=None, check_visibility=True):
pages = Page.get_as_reordered_flat_hierarchy(Page.objects.all())
pages.reverse()
this_page = [x for x in pages if x.id == self.id][0]
pages = pages[pages.index(this_page)+1:]
for page in pages:
if not check_visibility or page.is_visible(user):
return page
return None
@classmethod
def get_as_reordered_flat_hierarchy(cls, object_list):
reordered = []
parenting = collections.defaultdict(list)
for page in object_list:
parenting[page.parent_id].append(page)
def fill_list(object_sublist, level=0, parent=None):
for page in object_sublist:
parent_id = parent.pk if parent else None
if page.parent_id == parent_id:
page.level = level
reordered.append(page)
if page.id in parenting:
fill_list(object_sublist, level=level+1, parent=page)
fill_list(object_list)
return reordered
@staticmethod
@utils.cache_during_request
def get_with_hierarchy_attributes():
pages = Page.objects.all()
pages_by_id = {}
for page in pages:
pages_by_id[page.id] = page
page._parent = None
page._children = []
for page in pages:
page._parent = pages_by_id[page.parent_id] if page.parent_id else None
if page._parent:
page._parent._children.append(page)
for page in pages:
page._children.sort(key=lambda x: x.order)
return pages_by_id
def visibility(self):
if self.public:
return _('Public')
groups = self.groups.all()
groupnames = ', '.join([x.name for x in groups]) if groups else _('logged users')
return _('Private (%s)') % groupnames
def is_visible(self, user=None):
return element_is_visible(self, user=user)
def get_cells(self):
return CellBase.get_cells(page=self)
def build_cell_cache(self):
cells = CellBase.get_cells(page=self, skip_cell_cache=True)
cell_types = set()
for cell in cells:
cell_types.add(cell.get_cell_type_str())
if cell_types != set(self.related_cells.get('cell_types', [])):
self.related_cells['cell_types'] = list(cell_types)
self.save()
def get_serialized_page(self):
cells = [x for x in self.get_cells() if x.placeholder and not x.placeholder.startswith('_')]
serialized_page = json.loads(serializers.serialize('json', [self],
use_natural_foreign_keys=True, use_natural_primary_keys=True))[0]
del serialized_page['model']
if 'snapshot' in serialized_page:
del serialized_page['snapshot']
if 'related_cells' in serialized_page['fields']:
del serialized_page['fields']['related_cells']
serialized_page['cells'] = json.loads(serializers.serialize('json',
cells, use_natural_foreign_keys=True, use_natural_primary_keys=True))
for index, cell in enumerate(cells):
serialized_page['cells'][index].update(cell.export_subobjects())
serialized_page['fields']['groups'] = [x[0] for x in serialized_page['fields']['groups']]
for cell in serialized_page['cells']:
del cell['pk']
del cell['fields']['page']
cell['fields']['groups'] = [x[0] for x in cell['fields']['groups']]
for key in list(cell['fields'].keys()):
if key.startswith('cached_'):
del cell['fields'][key]
return serialized_page
@classmethod
def load_serialized_page(cls, json_page, snapshot=None, request=None):
json_page['model'] = 'data.page'
json_page['fields']['groups'] = [[x] for x in json_page['fields']['groups'] if isinstance(x, six.string_types)]
page, created = Page.objects.get_or_create(slug=json_page['fields']['slug'], snapshot=snapshot)
json_page['pk'] = page.id
parent_slug = json_page['fields'].get('parent') or []
if parent_slug and not Page.objects.filter(slug=parent_slug[0]).exists():
# parent not found, remove it and exclude page from navigation
json_page['fields'].pop('parent')
json_page['fields']['exclude_from_navigation'] = True
if request:
messages.warning(
request,
_('Unknown parent for page "%s"; parent has been reset and page was excluded from navigation.')
% json_page['fields']['title'])
page = next(serializers.deserialize('json', json.dumps([json_page]), ignorenonexistent=True))
page.object.snapshot = snapshot
page.save()
for cell in json_page.get('cells'):
cell['fields']['groups'] = [[x] for x in cell['fields']['groups'] if isinstance(x, six.string_types)]
if snapshot:
cell['fields']['page'] = page.object.id
else:
cell['fields']['page'] = page.object.natural_key()
# if there were cells, remove them
for cell in CellBase.get_cells(page_id=page.object.id):
cell.delete()
return page.object # get page out of deserialization object
@classmethod
def load_serialized_cells(cls, cells):
# load new cells
for cell_data in cells:
model = apps.get_model(cell_data['model'])
cell_data = model.prepare_serialized_data(cell_data)
cell = list(serializers.deserialize('json', json.dumps([cell_data]), ignorenonexistent=True))[0]
cell.save()
# will populate cached_* attributes
cell.object.save()
cell.object.import_subobjects(cell_data)
@classmethod
def load_serialized_pages(cls, json_site, request=None):
cells = []
for json_page in json_site:
cls.load_serialized_page(json_page, request=request)
cells.extend(json_page.get('cells'))
cls.load_serialized_cells(cells)
@classmethod
def export_all_for_json(cls):
ordered_pages = Page.get_as_reordered_flat_hierarchy(cls.objects.all())
return [x.get_serialized_page() for x in ordered_pages]
def get_redirect_url(self, context=None):
return utils.get_templated_url(self.redirect_url, context=context)
def get_last_update_time(self):
return self.last_update_timestamp
def is_new(self):
return self.creation_timestamp > timezone.now() - datetime.timedelta(days=7)
def duplicate(self, title=None):
# clone current page
new_page = copy.deepcopy(self)
new_page.pk = None
# set title
new_page.title = title or _('Copy of %s') % self.title
# reset slug
new_page.slug = None
# reset snapshot
new_page.snapshot = None
# set order
new_page.order = self.order + 1
# exclude from navigation
new_page.exclude_from_navigation = True
# store new page
new_page.save()
# set groups
new_page.groups.set(self.groups.all())
for cell in self.get_cells():
if cell.placeholder and cell.placeholder.startswith('_'):
continue
cell.duplicate(page_target=new_page)
return new_page
class PageSnapshot(models.Model):
page = models.ForeignKey(Page, on_delete=models.SET_NULL, null=True)
timestamp = models.DateTimeField(auto_now_add=True)
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True)
comment = models.TextField(blank=True, null=True)
serialization = JSONField(blank=True)
class Meta:
ordering = ('-timestamp',)
@classmethod
def take(cls, page, request=None, comment=None, deletion=False):
snapshot = cls(page=page, comment=comment)
if request and not request.user.is_anonymous:
snapshot.user = request.user
if not deletion:
snapshot.serialization = page.get_serialized_page()
else:
snapshot.serialization = {}
snapshot.comment = comment or _('deletion')
snapshot.save()
def get_page(self):
try:
# try reusing existing page
return Page.snapshots.get(snapshot=self)
except Page.DoesNotExist:
page = Page.load_serialized_page(self.serialization, snapshot=self)
page.load_serialized_cells(self.serialization['cells'])
return page
def restore(self):
with transaction.atomic():
page = Page.load_serialized_page(self.serialization)
page.load_serialized_cells(self.serialization['cells'])
return page
class Redirect(models.Model):
old_url = models.CharField(max_length=512)
page = models.ForeignKey(Page, on_delete=models.CASCADE)
creation_timestamp = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ('creation_timestamp',)
class ValidityInfo(models.Model):
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
object_id = models.PositiveIntegerField()
content_object = GenericForeignKey('content_type', 'object_id')
invalid_reason_code = models.CharField(max_length=100, blank=True, null=True, editable=False)
invalid_since = models.DateTimeField(blank=True, null=True, editable=False)
class Meta:
unique_together = [('content_type', 'object_id')]
@property
def invalid_datetime(self):
if not self.invalid_since:
return
return self.invalid_since + datetime.timedelta(days=2)
class CellMeta(MediaDefiningClass, ModelBase):
pass
@python_2_unicode_compatible
class CellBase(six.with_metaclass(CellMeta, models.Model)):
page = models.ForeignKey(Page, on_delete=models.CASCADE)
placeholder = models.CharField(max_length=20)
order = models.PositiveIntegerField()
slug = models.SlugField(_('Slug'), blank=True)
extra_css_class = models.CharField(_('Extra classes for CSS styling'), max_length=100, blank=True)
public = models.BooleanField(_('Public'), default=True)
# restricted_to_unlogged is actually an invert switch, it is used for mark
# a cell as only visibile to unlogged users but also, when groups are set,
# to mark the cell as visible to all but those groups.
restricted_to_unlogged = models.BooleanField(
_('Restrict to unlogged users'), default=False)
groups = models.ManyToManyField(Group, verbose_name=_('Groups'), blank=True)
last_update_timestamp = models.DateTimeField(auto_now=True)
validity_info = GenericRelation(ValidityInfo)
invalid_reason_codes = {}
default_form_class = None
manager_form_factory_kwargs = {}
manager_form_template = 'combo/cell_form.html'
children_placeholder_prefix = None
visible = True
user_dependant = False
template_name = None
# get_badge(self, context); set to None so cell types can be skipped easily
get_badge = None
# message displayed when the cell is loaded asynchronously
loading_message = _('Loading...')
# modify_global_context(self, context, request=None)
# Apply changes to the template context that must visible to all cells in the page
modify_global_context = None
# if set, automatically refresh cell every n seconds
ajax_refresh = None
class Meta:
abstract = True
def __str__(self):
label = self.get_verbose_name()
additional_label = self.get_additional_label()
if label and additional_label:
return u'%s (%s)' % (label, re.sub(r'\r?\n', u' ', force_text(additional_label)))
else:
return force_text(label)
@classmethod
def get_verbose_name(cls):
return cls._meta.verbose_name
def get_additional_label(self):
return ''
@property
def legacy_class_name(self):
# legacy class name used in some themes
return self.__class__.__name__.lower()
@property
def class_name(self):
# convert CamelCase Python class name into a lower and dashed version
# appropriate for CSS
return re.sub('([A-Z]+)', r'-\1', self.__class__.__name__).lower()[1:]
@property
def css_class_names(self):
return ' '.join([self.class_name, self.legacy_class_name, self.extra_css_class])
@property
def asset_css_classes(self):
from combo.apps.assets.models import Asset
if not hasattr(self, '_asset_keys'):
self._asset_keys = self.get_asset_slot_keys()
if not hasattr(self, '_assets'):
self._assets = {a.key: a for a in Asset.objects.filter(key__in=self._asset_keys.keys())}
if not self._asset_keys or not self._assets:
return ''
# add has-asset-<slug> for each asset found
css = sorted(['has-asset-{}'.format(v) for k, v in self._asset_keys.items() if k in self._assets])
# add has-any-asset if at least one asset defined
if self._assets:
css.append('has-any-asset')
# add has-all-assets if all assets are defined
if self._assets.keys() == self._asset_keys.keys():
css.append('has-all-assets')
return ' '.join(css)
def can_have_assets(self):
return self.get_slug_for_asset() and self.get_asset_slot_templates()
def get_slug_for_asset(self):
return self.slug
def get_label_for_asset(self):
return _(u'%(cell_label)s on page %(page_name)s (%(page_slug)s)') % {
'cell_label': str(self),
'page_name': str(self.page),
'page_slug': self.page.slug,
}
def get_asset_slot_key(self, key):
return 'cell:%s:%s:%s' % (
self.get_cell_type_str(),
key,
self.get_slug_for_asset())
def get_asset_slot_templates(self):
return settings.COMBO_CELL_ASSET_SLOTS.get(self.get_cell_type_str()) or {}
def get_asset_slot_keys(self):
if not self.can_have_assets():
return {}
slot_templates = self.get_asset_slot_templates()
return {self.get_asset_slot_key(key): key for key in slot_templates.keys()}
def get_asset_slots(self):
if not self.can_have_assets():
return {}
slot_templates = self.get_asset_slot_templates()
slots = {}
for slot_template_key, slot_template_data in slot_templates.items():
suffix = ''
if slot_template_data.get('suffix'):
suffix = ' (%s)' % slot_template_data['suffix']
slot_key = self.get_asset_slot_key(slot_template_key)
label = u'%(prefix)s%(label)s%(suffix)s' % {
'prefix': slot_template_data['prefix'],
'label': self.get_label_for_asset(),
'suffix': suffix
}
short_label = u'%(prefix)s%(suffix)s' % {
'prefix': slot_template_data['prefix'],
'suffix': suffix
}
slots[slot_key] = {
'label': label,
'short_label': short_label,
}
slots[slot_key].update(slot_template_data)
return slots
@classmethod
def get_cell_classes(cls, class_filter=lambda x: True):
for klass in get_cell_classes():
if not class_filter(klass):
continue
if not klass.is_enabled():
continue
if klass.visible is False:
continue
yield klass
@classmethod
def get_all_cell_types(cls, *args, **kwargs):
cell_types = []
for klass in cls.get_cell_classes(*args, **kwargs):
cell_types.extend(klass.get_cell_types())
return cell_types
@classmethod
def get_cells(cls, cell_filter=None, skip_cell_cache=False, prefetch_validity_info=False, select_related=None, load_contenttypes=False, **kwargs):
"""Returns the list of cells of various classes matching **kwargs"""
cells = []
pages = []
select_related = select_related or {}
if 'page' in kwargs:
pages = [kwargs['page']]
elif 'page__in' in kwargs:
pages = kwargs['page__in']
else:
# if there are not explicit page, limit to non-snapshot pages
kwargs['page__snapshot__isnull'] = True
cell_classes = get_cell_classes()
if pages and not skip_cell_cache:
# if there's a request for some specific pages, limit cell types
# to those that are actually in use in those pages.
cell_types = set()
for page in pages:
if page.related_cells and 'cell_types' in page.related_cells:
cell_types |= set(page.related_cells['cell_types'])
else:
break
else:
cell_classes = [get_cell_class(x) for x in cell_types]
if load_contenttypes:
# populate ContentType cache
ContentType.objects.get_for_models(*cell_classes)
extra_filter = kwargs.pop('extra_filter', None)
for klass in cell_classes:
if klass is None:
continue
if cell_filter and not cell_filter(klass):
continue
cells_queryset = klass.objects.filter(**kwargs)
if extra_filter:
cells_queryset = cells_queryset.filter(extra_filter)
if select_related:
cells_queryset = cells_queryset.select_related(
*select_related.get('__all__', []),
*select_related.get(klass.get_cell_type_str(), []))
cells.extend(cells_queryset)
if prefetch_validity_info:
validity_info_list = list(ValidityInfo.objects.select_related('content_type'))
for cell in cells:
cell.prefetched_validity_info = [
v for v in validity_info_list
if v.object_id == cell.pk and v.content_type.model_class() == cell.__class__]
cells.sort(key=lambda x: x.order)
return cells
def get_reference(self):
"Returns a string that can serve as a unique reference to a cell"""
return str('%s-%s' % (self.get_cell_type_str(), self.id))
@classmethod
def get_cell(cls, reference, **kwargs):
"""Returns the cell matching reference, and eventual **kwargs"""
content_id, cell_id = reference.split('-')
klass = get_cell_class(content_id)
if klass is None:
raise ObjectDoesNotExist()
return klass.objects.get(id=cell_id, **kwargs)
@classmethod
def get_cell_type_str(cls):
return '%s_%s' % (cls._meta.app_label, cls._meta.model_name)
@classmethod
def get_cell_type_group(cls):
return apps.get_app_config(cls._meta.app_label).verbose_name
@classmethod
def get_cell_types(cls):
return [{
'name': cls.get_verbose_name(),
'cell_type_str': cls.get_cell_type_str(),
'group': cls.get_cell_type_group(),
'variant': 'default',
'order': 0,
}]
@classmethod
def is_enabled(cls):
"""Defines if the cell type is enabled for the given site; this is used
to selectively enable cells from extension modules."""
return True
def set_variant(self, variant):
pass
def get_label(self):
return self.get_verbose_name()
def get_default_form_class(self):
if self.default_form_class:
return self.default_form_class
fields = [x.name for x in self._meta.local_concrete_fields
if x.name not in ('id', 'page', 'placeholder', 'order',
'public', 'groups', 'slug',
'extra_css_class', 'last_update_timestamp',
'restricted_to_unlogged')]
if not fields:
return None
return model_forms.modelform_factory(self.__class__, fields=fields, **self.manager_form_factory_kwargs)
def get_options_form_class(self):
return model_forms.modelform_factory(self.__class__,
fields=['slug', 'extra_css_class'])
def get_extra_manager_context(self):
return {}
def mark_as_invalid(self, reason_code, force=True):
validity_info, created = ValidityInfo.objects.get_or_create(
content_type=ContentType.objects.get_for_model(self),
object_id=self.pk,
defaults={
'invalid_reason_code': reason_code,
'invalid_since': now(),
})
if created:
return
if not force and validity_info.invalid_since is not None:
# don't overwrite invalid reason already set
return
if validity_info.invalid_reason_code == reason_code:
# don't overwrite invalid_since if same reason already set
return
validity_info.invalid_reason_code = reason_code
validity_info.invalid_since = now()
validity_info.save()
def mark_as_valid(self):
validity_info = self.get_validity_info()
if validity_info is None:
return
validity_info.delete()
def get_validity_info(self):
if hasattr(self, 'prefetched_validity_info'):
if not self.prefetched_validity_info:
return
return self.prefetched_validity_info[0]
return self.validity_info.all().first()
def set_validity_from_url(
self, resp,
not_found_code='url_not_found', invalid_code='url_invalid'):
if resp is None:
# can not retrieve data, don't report cell as invalid
self.mark_as_valid()
elif resp.status_code == 404:
self.mark_as_invalid(not_found_code)
elif 400 <= resp.status_code < 500:
# 4xx error, cell is invalid
self.mark_as_invalid(invalid_code)
else:
# 2xx or 3xx: cell is valid
# 5xx error: can not retrieve data, don't report cell as invalid
self.mark_as_valid()
def get_invalid_reason(self):
validity_info = self.get_validity_info()
if validity_info is None:
return
if not validity_info.invalid_since:
return
return self.invalid_reason_codes.get(
validity_info.invalid_reason_code, validity_info.invalid_reason_code)
def is_placeholder_active(self):
if not self.placeholder:
return False
if self.placeholder.startswith('_'):
return True
request = RequestFactory().get('/')
if not hasattr(self.page, '_placeholders'):
self.page._placeholders = self.page.get_placeholders(request, traverse_cells=True)
for placeholder in self.page._placeholders:
if placeholder.key == self.placeholder:
return True
return False
def is_visible(self, user=None, check_validity_info=True):
if check_validity_info:
validity_info = self.get_validity_info()
if validity_info is not None and validity_info.invalid_datetime and validity_info.invalid_datetime <= now():
return False
return element_is_visible(self, user=user)
def is_relevant(self, context):
'''Return whether it's relevant to render this cell in the page
context.'''
return True
def is_user_dependant(self, context=None):
'''Return whether the cell content varies from user to user.'''
return self.user_dependant
def get_concerned_user(self, context):
'''Return user from UserSearch cell, or connected user.'''
return context.get('selected_user') or getattr(context.get('request'), 'user', None)
def get_cell_extra_context(self, context):
return {'cell': self}
def render(self, context):
context.update(self.get_cell_extra_context(context))
template_names = ['combo/' + self._meta.model_name + '.html']
base_template_name = self._meta.model_name + '.html'
if self.template_name:
base_template_name = os.path.basename(self.template_name)
template_names.append(self.template_name)
if self.slug:
template_names.append('combo/cells/%s/%s' % (self.slug, base_template_name))
template_names.reverse()
tmpl = template.loader.select_template(template_names)
return tmpl.render(context, context.get('request'))
def render_for_search(self):
if not self.is_enabled():
return ''
if self.is_user_dependant():
return ''
request = RequestFactory().get(self.page.get_online_url())
request.user = None # compat
context = {
'page': self.page,
'render_skeleton': False,
'request': request,
'site_base': request.build_absolute_uri('/')[:-1],
'synchronous': True,
'user': None, # compat
'absolute_uri': request.build_absolute_uri,
}
if not self.is_relevant(context):
return ''
return html.unescape(strip_tags(self.render(context)))
def get_external_links_data(self):
return []
@classmethod
def prepare_serialized_data(cls, cell_data):
return cell_data
def export_subobjects(self):
return {}
def import_subobjects(self, cell_json):
pass
def duplicate(self, page_target=None, placeholder=None):
# clone current cell
new_cell = copy.deepcopy(self)
new_cell.pk = None
# set page
new_cell.page = page_target or self.page
# set placeholder
new_cell.placeholder = placeholder or new_cell.placeholder
# store new cell
new_cell.save()
# set groups
new_cell.groups.set(self.groups.all())
if hasattr(self, 'duplicate_m2m'):
self.duplicate_m2m(new_cell)
return new_cell
@register_cell_class
class TextCell(CellBase):
text = RichTextField(_('Text'), blank=True, null=True)
template_name = 'combo/text-cell.html'
class Meta:
verbose_name = _('Text')
def is_relevant(self, context):
return bool(self.text)
def get_additional_label(self):
if not self.text:
return None
return utils.ellipsize(self.text)
@classmethod
def get_cell_types(cls):
d = super(TextCell, cls).get_cell_types()
d[0]['order'] = -1
return d
def get_cell_extra_context(self, context):
extra_context = super(TextCell, self).get_cell_extra_context(context)
text = self.text or ''
render_skeleton = context.get('render_skeleton')
def sub_variadic_url(match):
attribute = match.group(1)
url = match.group(2)
try:
url = utils.get_templated_url(url, context=context)
except utils.TemplateError as e:
logger = logging.getLogger(__name__)
logger.warning('error in templated URL (%s): %s' % (url, e))
return '%s="%s"' % (attribute, url)
text = re.sub(r'(href|src)="(.*?)"', sub_variadic_url, text)
if render_skeleton:
request = context.get('request')
def sub_src(match):
url = request.build_absolute_uri(match.group(1))
return 'src="%s"' % url
def sub_href(match):
url = match.group(1)
if url.startswith('#'):
pass
else:
url = request.build_absolute_uri(url)
return 'href="%s"' % url
text = re.sub(r'src="(.*?)"', sub_src, text)
text = re.sub(r'href="(.*?)"', sub_href, text)
extra_context['text'] = mark_safe(text)
return extra_context
@register_cell_class
class FortuneCell(CellBase):
ajax_refresh = 30
class Meta:
verbose_name = _('Fortune')
def render(self, context):
return subprocess.check_output(['fortune'])
@classmethod
def is_enabled(cls):
try:
subprocess.check_output(['fortune'])
except OSError:
return False
return settings.DEBUG
@register_cell_class
class UnlockMarkerCell(CellBase):
# XXX: this is kept to smooth transitions, it should be removed once all
# sites # have been migrated to ParentContentCell
"""Marks an 'acquired' placeholder as unlocked."""
visible = False
class Meta:
verbose_name = _('Unlock Marker')
def render(self, context):
return ''
@register_cell_class
class MenuCell(CellBase):
depth = models.PositiveIntegerField(_('Depth'),
choices=[(i, i) for i in range(1, 3)], default=1, null=False)
initial_level = models.IntegerField(_('Initial Level'),
choices=[(-1, _('Same as page'))] +
[(i, i) for i in range(1, 3)], default=-1, null=False)
root_page = models.ForeignKey(Page, on_delete=models.CASCADE, related_name='root_page',
null=True, blank=True, verbose_name=_('Root Page'))
template_name = 'combo/menu-cell.html'
exclude_from_search = True
class Meta:
verbose_name = _('Menu')
def get_default_form_class(self):
from .forms import MenuCellForm
return MenuCellForm
def get_cell_extra_context(self, context):
from combo.public.menu import render_menu
ctx = super(MenuCell, self).get_cell_extra_context(context)
ctx['menu'] = render_menu(context, level=self.initial_level,
root_page=self.root_page, depth=self.depth,
ignore_visibility=False)
return ctx
@register_cell_class
class LinkCell(CellBase):
title = models.CharField(_('Title'), max_length=150, blank=True)
url = models.CharField(_('URL'), max_length=200, blank=True, validators=[django_template_validator])
link_page = models.ForeignKey(
'data.Page', on_delete=models.CASCADE, related_name='link_cell', blank=True,
null=True, verbose_name=_('Internal link'))
anchor = models.CharField(_('Anchor'), max_length=150, blank=True)
template_name = 'combo/link-cell.html'
add_as_link_label = _('add a link')
add_link_label = _('New link')
edit_link_label = _('Edit link')
add_as_link_code = 'link'
invalid_reason_codes = {
'data_url_not_defined': _('No link set'),
'data_url_not_found': _('URL seems to unexist'),
'data_url_invalid': _('URL seems to be invalid'),
}
class Meta:
verbose_name = _('Link')
def save(self, *args, **kwargs):
if 'update_fields' in kwargs:
# don't check validity
return super(LinkCell, self).save(*args, **kwargs)
result = super(LinkCell, self).save(*args, **kwargs)
# check validity
self.check_validity()
return result
def get_additional_label(self):
title = self.title
if not title and self.link_page:
title = self.link_page.title
if not title:
return None
return utils.ellipsize(title)
def get_slug_for_asset(self):
if self.placeholder and self.placeholder.startswith('_'):
return
if self.link_page:
return self.link_page.slug
return self.slug
def get_label_for_asset(self):
if self.link_page:
return str(self)
return super().get_label_for_asset()
def get_url(self, context=None):
context = context or {}
if self.link_page:
url = self.link_page.get_online_url()
else:
url = utils.get_templated_url(self.url, context=context)
if self.anchor:
url += '#' + self.anchor
return url
def get_cell_extra_context(self, context):
render_skeleton = context.get('render_skeleton')
request = context.get('request')
extra_context = super(LinkCell, self).get_cell_extra_context(context)
if self.link_page:
extra_context['title'] = self.title or self.link_page.title
else:
extra_context['title'] = self.title or self.url
url = self.get_url(context)
if render_skeleton and not urlparse.urlparse(url).netloc:
# create full URL when used in a skeleton
url = request.build_absolute_uri(url)
extra_context['url'] = url
return extra_context
def get_default_form_class(self):
from .forms import LinkCellForm
return LinkCellForm
def render_for_search(self):
return ''
def get_external_links_data(self):
if not self.url:
return []
link_data = self.get_cell_extra_context({})
if link_data.get('title') and link_data.get('url'):
return [link_data]
return []
def check_validity(self):
if self.link_page:
self.mark_as_valid()
return
if not self.url:
if self.anchor:
# link to anchor on same page, ok.
self.mark_as_valid()
return
self.mark_as_invalid('data_url_not_defined')
return
if self.url.count('{{') > 1:
self.mark_as_valid()
return
resp = None
try:
resp = requests.get(self.get_url(), timeout=settings.REQUESTS_TIMEOUT)
resp.raise_for_status()
except (requests.exceptions.RequestException):
pass
self.set_validity_from_url(resp, not_found_code='data_url_not_found', invalid_code='data_url_invalid')
@register_cell_class
class LinkListCell(CellBase):
title = models.CharField(_('Title'), max_length=150, blank=True)
limit = models.PositiveSmallIntegerField(_('Limit'), null=True, blank=True)
template_name = 'combo/link-list-cell.html'
manager_form_template = 'combo/manager/link-list-cell-form.html'
children_placeholder_prefix = '_linkslist:'
exclude_from_search = True
invalid_reason_codes = {
'data_link_invalid': _('Invalid link'),
}
class Meta:
verbose_name = _('List of links')
@property
def link_placeholder(self):
return self.children_placeholder_prefix + str(self.pk)
def get_items(self, prefetch_validity_info=False):
return CellBase.get_cells(
page=self.page,
placeholder=self.link_placeholder,
cell_filter=lambda x: hasattr(x, 'add_as_link_label'),
prefetch_validity_info=prefetch_validity_info)
def get_items_with_prefetch(self):
return self.get_items(prefetch_validity_info=True)
def get_additional_label(self):
title = self.title
if not title:
return None
return utils.ellipsize(title)
def get_cell_extra_context(self, context):
extra_context = super(LinkListCell, self).get_cell_extra_context(context)
links = []
for cell in context.get('page_cells', []):
if not hasattr(cell, 'add_as_link_label'):
continue
if not cell.placeholder == self.link_placeholder:
continue
links.append(cell.get_cell_extra_context(context))
extra_context['links'] = links
extra_context['more_links'] = []
if self.limit:
extra_context['more_links'] = extra_context['links'][self.limit:]
extra_context['links'] = extra_context['links'][:self.limit]
extra_context['title'] = self.title
return extra_context
def get_link_cell_classes(self):
return CellBase.get_cell_classes(lambda x: hasattr(x, 'add_as_link_label'))
def get_default_form_class(self):
from .forms import LinkListCellForm
return LinkListCellForm
def export_subobjects(self):
links = json.loads(
serializers.serialize(
'json',
self.get_items(),
use_natural_foreign_keys=True,
use_natural_primary_keys=True))
for link in links:
del link['pk']
del link['fields']['placeholder']
del link['fields']['page']
return {'links': links}
def import_subobjects(self, cell_json):
for link in cell_json['links']:
link['fields']['placeholder'] = self.link_placeholder
link['fields']['page'] = self.page_id
links = serializers.deserialize('json', json.dumps(cell_json['links']), ignorenonexistent=True)
for link in links:
link.save()
def duplicate_m2m(self, new_cell):
# duplicate also link items
for link in self.get_items():
link.duplicate(page_target=new_cell.page, placeholder=new_cell.link_placeholder)
def check_validity(self):
for link in self.get_items(prefetch_validity_info=True):
validity_info = link.get_validity_info()
if validity_info is not None:
self.mark_as_invalid('data_link_invalid')
return
self.mark_as_valid()
@register_cell_class
class FeedCell(CellBase):
title = models.CharField(_('Title'), max_length=150, blank=True)
url = models.CharField(_('URL'), blank=True, max_length=200)
limit = models.PositiveSmallIntegerField(
_('Maximum number of entries'),
null=True, blank=True)
manager_form_factory_kwargs = {'field_classes': {'url': TemplatableURLField}}
template_name = 'combo/feed-cell.html'
invalid_reason_codes = {
'data_url_not_defined': _('No URL set'),
'data_url_not_found': _('URL seems to unexist'),
'data_url_invalid': _('URL seems to be invalid'),
}
class Meta:
verbose_name = _('RSS/Atom Feed')
def is_visible(self, **kwargs):
return bool(self.url) and super(FeedCell, self).is_visible(**kwargs)
def get_cell_extra_context(self, context):
extra_context = super(FeedCell, self).get_cell_extra_context(context)
if not self.url:
self.mark_as_invalid('data_url_not_defined')
return extra_context
if context.get('placeholder_search_mode'):
# don't call webservices when we're just looking for placeholders
return extra_context
cache_key = hashlib.md5(smart_bytes(self.url)).hexdigest()
feed_content = cache.get(cache_key)
if not feed_content:
feed_response = None
try:
feed_response = requests.get(utils.get_templated_url(self.url))
feed_response.raise_for_status()
except (requests.exceptions.RequestException):
pass
else:
if feed_response.status_code == 200:
feed_content = feed_response.content
cache.set(cache_key, feed_content, 600)
self.set_validity_from_url(feed_response, not_found_code='data_url_not_found', invalid_code='data_url_invalid')
if feed_content:
extra_context['feed'] = feedparser.parse(feed_content)
if self.limit:
extra_context['feed']['entries'] = extra_context['feed']['entries'][:self.limit]
if not self.title:
try:
self.title = extra_context['feed']['feed'].title
except (KeyError, AttributeError):
pass
return extra_context
def render(self, context):
cache_key = hashlib.md5(smart_bytes(self.url)).hexdigest()
feed_content = cache.get(cache_key)
if not context.get('synchronous') and feed_content is None:
raise NothingInCacheException()
return super(FeedCell, self).render(context)
@register_cell_class
class ParentContentCell(CellBase):
class Meta:
verbose_name = _('Same as parent')
def get_parents_cells(self, hierarchy):
try:
pages = [Page.objects.get(slug='index', parent=None)]
except Page.DoesNotExist:
pages = []
pages.extend(hierarchy)
if not pages:
return []
if len(pages) > 1 and pages[0].id == pages[1].id:
# don't duplicate index cells for real children of the index page.
pages = pages[1:]
cells_by_page = {}
for page in pages:
cells_by_page[page.id] = []
# get cells from placeholder + cells in private placeholders that may
# be used by actual cells.
placeholder_filter = Q(placeholder=self.placeholder)
for klass in CellBase.get_cell_classes(lambda x: bool(x.children_placeholder_prefix)):
placeholder_filter |= Q(placeholder__startswith=klass.children_placeholder_prefix)
for cell in CellBase.get_cells(page__in=pages, extra_filter=placeholder_filter):
cells_by_page[cell.page_id].append(cell)
cells = cells_by_page[pages[-1].id]
for page in reversed(pages[:-1]):
for i, cell in enumerate(cells):
if isinstance(cell, ParentContentCell):
cells[i:i+1] = cells_by_page[page.id]
break
else:
# no more ParentContentCell, stop folloing the parent page chain
break
return cells
def render(self, context):
return ''
class JsonCellBase(CellBase):
url = None
cache_duration = 60
template_string = None
varnames = None
force_async = False
log_errors = True
timeout = None
actions = {}
additional_data = None
# [
# {'key': ...,
# 'url': ...,
# 'cache_duration': ... (optional)
# },
# ...
# ]
first_data_key = 'json'
_json_content = None
invalid_reason_codes = {
'data_url_not_found': _('URL seems to unexist'),
'data_url_invalid': _('URL seems to be invalid'),
}
class Meta:
abstract = True
def is_visible(self, **kwargs):
return bool(self.url) and super(JsonCellBase, self).is_visible(**kwargs)
def is_user_dependant(self, context=None):
urls = [self.url] + [x['url'] for x in self.additional_data or []]
for url in urls:
if url and ('user_nameid' in url or 'user_email' in url):
return True
return False
def get_cell_parameters_context(self):
return {}
def get_cell_extra_context(self, context, invalidate_cache=False):
extra_context = super(JsonCellBase, self).get_cell_extra_context(context)
if context.get('placeholder_search_mode'):
# don't call webservices when we're just looking for placeholders
return extra_context
if self.varnames and context.get('request'):
for varname in self.varnames:
if varname in context['request'].GET:
context[varname] = context['request'].GET[varname]
self._json_content = None
extra_context.update(self.get_cell_parameters_context())
context.update(self.get_cell_parameters_context())
context['concerned_user'] = self.get_concerned_user(context)
data_urls = [{'key': self.first_data_key, 'url': self.url, 'cache_duration': self.cache_duration,
'log_errors': self.log_errors, 'timeout': self.timeout}]
data_urls.extend(self.additional_data or [])
for data_url_dict in data_urls:
extra_context[data_url_dict['key']] = None
for data_url_dict in data_urls:
data_key = data_url_dict['key']
log_errors = data_url_dict.get('log_errors', self.log_errors)
try:
url = utils.get_templated_url(data_url_dict['url'], context)
except utils.TemplateError as e:
logger = logging.getLogger(__name__)
logger.warning('error in templated URL (%s): %s', data_url_dict['url'], e)
continue
extra_context[data_key + '_url'] = url
if not url:
continue
try:
json_response = utils.requests.get(
url,
headers={'Accept': 'application/json'},
remote_service='auto',
cache_duration=data_url_dict.get('cache_duration', self.cache_duration),
without_user=True,
raise_if_not_cached=not(context.get('synchronous')),
invalidate_cache=invalidate_cache,
log_errors=log_errors,
timeout=data_url_dict.get('timeout', self.timeout),
django_request=context.get('request'),
)
except requests.RequestException as e:
extra_context[data_key + '_status'] = -1
extra_context[data_key + '_error'] = force_text(e)
extra_context[data_key + '_exception'] = e
logger = logging.getLogger(__name__)
if log_errors:
logger.warning(u'error on request %r: %s', url, force_text(e))
else:
logger.debug(u'error on request %r: %s', url, force_text(e))
continue
extra_context[data_key + '_status'] = json_response.status_code
if json_response.status_code // 100 == 2:
if json_response.status_code != 204: # 204 = No Content
try:
extra_context[data_key] = json_response.json()
except ValueError:
extra_context[data_key + '_error'] = 'invalid_json'
logger = logging.getLogger(__name__)
if log_errors:
logger.error('invalid json content (%s)', url)
else:
logger.debug('invalid json content (%s)', url)
continue
elif json_response.headers.get('content-type') == 'application/json':
try:
extra_context[data_key + '_error'] = json_response.json()
except ValueError:
extra_context[data_key + '_error'] = 'invalid_json'
# update context with data key so it can be used in future
# templated URLs
context[data_key] = extra_context[data_key]
if not self._meta.abstract:
returns = []
for data_url in data_urls:
if data_url['url'].count('{{') > 1:
# ignore returns of url with more than one variable
continue
returns.append(extra_context.get(data_url['key'] + '_status'))
returns = set([s for s in returns if s is not None])
if returns and 200 not in returns: # not a single valid answer
if 404 in returns:
self.mark_as_invalid('data_url_not_found')
elif any([400 <= r < 500 for r in returns]):
# at least 4xx errors, report the cell as invalid
self.mark_as_invalid('data_url_invalid')
else:
# 2xx or 3xx: cell is valid
# 5xx error: can not retrieve data, don't report cell as invalid
self.mark_as_valid()
else:
self.mark_as_valid()
# keep cache of first response as it may be used to find the
# appropriate template.
self._json_content = extra_context[self.first_data_key]
return extra_context
@property
def template_name(self):
json_content = self._json_content
if json_content is None:
return 'combo/json-error-cell.html'
if json_content.get('data'):
if isinstance(json_content['data'], list):
first_element = json_content['data'][0]
if isinstance(first_element, dict):
if 'url' in first_element and 'text' in first_element:
return 'combo/json-list-cell.html'
return 'combo/json-cell.html'
def post(self, request):
if not 'action' in request.POST:
raise PermissionDenied()
action = request.POST['action']
if not action in self.actions:
raise PermissionDenied()
error_message = self.actions[action].get('error-message')
timeout = self.actions[action].get('timeout', self.timeout)
method = self.actions[action].get('method', 'POST')
logger = logging.getLogger(__name__)
content = {}
for key, value in request.POST.items():
if key == 'action':
continue
if key.endswith('[]'): # array
content[key[:-2]] = request.POST.getlist(key)
else:
content[key] = value
context = copy.copy(content)
context.update(self.get_cell_parameters_context())
context['request'] = request
context['synchronous'] = True
try:
url = utils.get_templated_url(self.actions[action]['url'], context)
except utils.TemplateError as e:
logger.warning('error in templated URL (%s): %s', self.actions[action]['url'], e)
raise PostException(error_message)
json_response = utils.requests.request(
method,
url,
headers={'Accept': 'application/json'},
remote_service='auto',
json=content,
without_user=True,
django_request=request,
timeout=timeout,
)
if json_response.status_code // 100 != 2: # 2xx
logger.error('error POSTing data to URL (%s)', url)
raise PostException(error_message)
if self.cache_duration:
self.get_cell_extra_context(context, invalidate_cache=True)
if self.actions[action].get('response', 'cell') == 'raw':
# response: raw in the config will get the response directly sent
# to the client.
return json_response
# default behaviour, the cell will render itself.
return None
def render(self, context):
if self.force_async and not context.get('synchronous'):
raise NothingInCacheException()
if self.template_string:
tmpl = engines['django'].from_string(self.template_string)
context.update(self.get_cell_extra_context(context))
return tmpl.render(context, context.get('request'))
return super(JsonCellBase, self).render(context)
@register_cell_class
class JsonCell(JsonCellBase):
title = models.CharField(_('Title'), max_length=150, blank=True)
url = models.CharField(_('URL'), blank=True, max_length=500)
template_string = models.TextField(_('Display Template'), blank=True, null=True,
validators=[django_template_validator])
cache_duration = models.PositiveIntegerField(
_('Cache duration'), default=60)
force_async = models.BooleanField(_('Force asynchronous mode'),
default=JsonCellBase.force_async)
varnames_str = models.CharField(_('Variable names'), max_length=200, blank=True,
help_text=_('Comma separated list of query-string variables '
'to be copied in template context'))
timeout = models.PositiveIntegerField(_('Request timeout'), default=0,
help_text=_('In seconds. Use 0 for default system timeout'))
manager_form_factory_kwargs = {'field_classes': {'url': TemplatableURLField}}
class Meta:
verbose_name = _('JSON Prototype')
@property
def varnames(self):
return [vn.strip() for vn in self.varnames_str.split(',') if vn.strip()]
class ConfigJsonCellManager(models.Manager):
def get_queryset(self):
queryset = super(ConfigJsonCellManager, self).get_queryset()
return queryset.filter(key__in=settings.JSON_CELL_TYPES.keys())
@register_cell_class
class ConfigJsonCell(JsonCellBase):
objects = ConfigJsonCellManager()
key = models.CharField(max_length=50)
parameters = JSONField(blank=True)
@classmethod
def get_cell_types(cls):
l = []
for key, definition in settings.JSON_CELL_TYPES.items():
l.append({
'name': definition['name'],
'variant': key,
'group': _('Extra'),
'cell_type_str': cls.get_cell_type_str(),
})
l.sort(key=lambda x: x.get('name'))
return l
def get_label(self):
return settings.JSON_CELL_TYPES[self.key]['name']
def set_variant(self, variant):
self.key = variant
@property
def css_class_names(self):
return super(ConfigJsonCell, self).css_class_names + ' ' + self.key
@property
def ajax_refresh(self):
return settings.JSON_CELL_TYPES[self.key].get('auto_refresh', None)
@property
def url(self):
return settings.JSON_CELL_TYPES[self.key]['url']
@property
def cache_duration(self):
return settings.JSON_CELL_TYPES[self.key].get('cache_duration',
JsonCellBase.cache_duration)
@property
def varnames(self):
return settings.JSON_CELL_TYPES[self.key].get('varnames')
@property
def force_async(self):
return settings.JSON_CELL_TYPES[self.key].get('force_async',
JsonCellBase.force_async)
@property
def log_errors(self):
return settings.JSON_CELL_TYPES[self.key].get('log_errors',
JsonCellBase.log_errors)
@property
def timeout(self):
return settings.JSON_CELL_TYPES[self.key].get('timeout',
JsonCellBase.timeout)
@property
def actions(self):
return settings.JSON_CELL_TYPES[self.key].get('actions',
JsonCellBase.actions)
@property
def additional_data(self):
return settings.JSON_CELL_TYPES[self.key].get('additional-data')
@property
def template_name(self):
return 'combo/json/%s.html' % self.key
@property
def loading_message(self):
return settings.JSON_CELL_TYPES[self.key].get('loading-message',
CellBase.loading_message)
def get_default_form_class(self):
formdef = settings.JSON_CELL_TYPES[self.key].get('form')
if not formdef:
return None
from .forms import ConfigJsonForm
# create a subclass of ConfigJsonForm with 'formdef' (the list of
# fields) as an attribute.
config_form_class = type(str('%sConfigClass' % self.key),
(ConfigJsonForm,), {'formdef': formdef})
return config_form_class
def get_cell_parameters_context(self):
context = copy.copy(self.parameters or {})
context['parameters'] = self.parameters
return context
@receiver(pre_save, sender=Page)
def create_redirects(sender, instance, raw, **kwargs):
if raw or not instance.id or instance.snapshot_id:
return
try:
old_page = Page.objects.get(id=instance.id)
except Page.DoesNotExist:
return
if old_page.slug == instance.slug and old_page.parent_id == instance.parent_id:
return
affected_pages = level_pages = [old_page]
while True:
level_pages = Page.objects.filter(parent_id__in=[x.id for x in level_pages]).select_related('parent')
if len(level_pages) == 0:
break
affected_pages.extend(level_pages)
for page in affected_pages:
Redirect(page=page, old_url=page.get_online_url()).save()
@receiver(post_save)
@receiver(post_delete)
def cell_maintain_page_cell_cache(sender, instance=None, **kwargs):
if not issubclass(sender, CellBase):
return
if not instance.page_id:
return
page = instance.page
page.build_cell_cache()