2220 lines
78 KiB
Python
2220 lines
78 KiB
Python
# combo - content management system
|
|
# Copyright (C) 2014 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import copy
|
|
import datetime
|
|
import hashlib
|
|
import html
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import urllib.parse
|
|
|
|
import feedparser
|
|
import requests
|
|
from django import forms, template
|
|
from django.apps import apps
|
|
from django.conf import settings
|
|
from django.contrib import messages
|
|
from django.contrib.auth.models import Group
|
|
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib.postgres.fields import JSONField
|
|
from django.core import serializers
|
|
from django.core.cache import cache
|
|
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied, ValidationError
|
|
from django.db import models, transaction
|
|
from django.db.models import Max, Q
|
|
from django.db.models.base import ModelBase
|
|
from django.db.models.signals import post_delete, post_save, pre_save
|
|
from django.dispatch import receiver
|
|
from django.forms import models as model_forms
|
|
from django.forms.widgets import MediaDefiningClass
|
|
from django.template import TemplateDoesNotExist, TemplateSyntaxError, engines
|
|
from django.test.client import RequestFactory
|
|
from django.utils import timezone
|
|
from django.utils.encoding import force_text, python_2_unicode_compatible, smart_bytes
|
|
from django.utils.html import strip_tags
|
|
from django.utils.safestring import mark_safe
|
|
from django.utils.text import slugify
|
|
from django.utils.timezone import now
|
|
from django.utils.translation import ugettext_lazy as _
|
|
|
|
from combo import utils
|
|
from combo.utils import NothingInCacheException
|
|
|
|
from .fields import RichTextField, TemplatableURLField
|
|
from .library import get_cell_class, get_cell_classes, register_cell_class
|
|
|
|
|
|
class PostException(Exception):
|
|
pass
|
|
|
|
|
|
def element_is_visible(element, user=None, ignore_superuser=False):
|
|
if element.public:
|
|
if getattr(element, 'restricted_to_unlogged', None) is True:
|
|
return user is None or user.is_anonymous
|
|
return True
|
|
if user is None or user.is_anonymous:
|
|
return False
|
|
if user.is_superuser and not ignore_superuser:
|
|
return True
|
|
page_groups = element.groups.all()
|
|
if not page_groups:
|
|
groups_ok = True
|
|
else:
|
|
groups_ok = len(set(page_groups).intersection(user.groups.all())) > 0
|
|
if getattr(element, 'restricted_to_unlogged', None) is True:
|
|
return not (groups_ok)
|
|
return groups_ok
|
|
|
|
|
|
def django_template_validator(value):
|
|
try:
|
|
engines['django'].from_string(value)
|
|
except TemplateSyntaxError as e:
|
|
raise ValidationError(_('syntax error: %s') % e)
|
|
|
|
|
|
def format_sub_slug(sub_slug):
|
|
mapping = {}
|
|
|
|
if 'P<' not in sub_slug:
|
|
# simple sub_slug without regex
|
|
sub_slug = '(?P<%s>[a-z0-9]+)' % sub_slug
|
|
# search all named-groups in sub_slug
|
|
for i, m in enumerate(re.finditer(r'P<[\w_-]+>', sub_slug)):
|
|
# extract original name
|
|
original_group = m.group()[2:-1]
|
|
# rename it to remove all bad characters
|
|
new_group = 'g%i' % i
|
|
# update sub_slug
|
|
sub_slug = sub_slug.replace('<%s>' % original_group, '<%s>' % new_group)
|
|
# keep a mapping
|
|
mapping[new_group] = original_group
|
|
|
|
return sub_slug, mapping
|
|
|
|
|
|
def compile_sub_slug(sub_slug):
|
|
sub_slug = format_sub_slug(sub_slug)[0]
|
|
# will raise re.error if wrong regexp
|
|
re.compile(sub_slug)
|
|
|
|
|
|
def extract_context_from_sub_slug(sub_slug, sub_url):
|
|
sub_slug, mapping = format_sub_slug(sub_slug)
|
|
|
|
# match url
|
|
match = re.match('^' + sub_slug + '$', sub_url)
|
|
if match is None:
|
|
return
|
|
|
|
# return a dict with original group names
|
|
context = {mapping[k]: v for k, v in match.groupdict().items()}
|
|
# format also key to replace - by _
|
|
context.update({mapping[k].replace('-', '_'): v for k, v in match.groupdict().items()})
|
|
return context
|
|
|
|
|
|
class Placeholder:
|
|
def __init__(
|
|
self, key, name=None, acquired=False, optional=False, render=True, cell=None, force_synchronous=False
|
|
):
|
|
self.key = key
|
|
self.name = name
|
|
self.acquired = acquired
|
|
self.optional = optional
|
|
self.render = render
|
|
self.cell = cell
|
|
self.force_synchronous = force_synchronous
|
|
|
|
def get_name(self):
|
|
if self.cell:
|
|
return '%s / %s' % (self.cell.get_label(), self.name)
|
|
return self.name
|
|
|
|
|
|
class PageManager(models.Manager):
|
|
snapshots = False
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.snapshots = kwargs.pop('snapshots', False)
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def get_by_natural_key(self, path):
|
|
parts = [x for x in path.strip('/').split('/') if x] or ['index']
|
|
return self.get(slug=parts[-1])
|
|
|
|
def get_queryset(self):
|
|
queryset = super().get_queryset()
|
|
if self.snapshots:
|
|
return queryset.filter(snapshot__isnull=False)
|
|
else:
|
|
return queryset.filter(snapshot__isnull=True)
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class Page(models.Model):
|
|
objects = PageManager()
|
|
snapshots = PageManager(snapshots=True)
|
|
|
|
title = models.CharField(_('Title'), max_length=150)
|
|
slug = models.SlugField(_('Slug'))
|
|
sub_slug = models.CharField(
|
|
_('Sub Slug'),
|
|
max_length=150,
|
|
blank=True,
|
|
help_text=_(
|
|
'Context variable assigned to the path component, for example parking_id. '
|
|
'It is also possible to define this using a regular expression, '
|
|
'in that case variables will be provided according to named groups, '
|
|
'for example (?P<year>[0-9]{4}).'
|
|
),
|
|
)
|
|
description = models.TextField(_('Description'), blank=True)
|
|
template_name = models.CharField(_('Page Template'), max_length=50)
|
|
parent = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True)
|
|
order = models.PositiveIntegerField()
|
|
exclude_from_navigation = models.BooleanField(_('Exclude from navigation'), default=True)
|
|
redirect_url = models.CharField(_('Redirect URL'), max_length=200, blank=True)
|
|
|
|
public = models.BooleanField(_('Public'), default=True)
|
|
groups = models.ManyToManyField(Group, verbose_name=_('Groups'), blank=True)
|
|
creation_timestamp = models.DateTimeField(default=now)
|
|
last_update_timestamp = models.DateTimeField(auto_now=True)
|
|
|
|
picture = models.ImageField(_('Picture'), upload_to='page-pictures/', null=True)
|
|
|
|
edit_role = models.ForeignKey(
|
|
Group,
|
|
blank=True,
|
|
null=True,
|
|
default=None,
|
|
related_name='+',
|
|
verbose_name=_('Edit Role'),
|
|
on_delete=models.SET_NULL,
|
|
)
|
|
subpages_edit_role = models.ForeignKey(
|
|
Group,
|
|
blank=True,
|
|
null=True,
|
|
default=None,
|
|
related_name='+',
|
|
verbose_name=_('Subpages Edit Role'),
|
|
on_delete=models.SET_NULL,
|
|
)
|
|
|
|
# mark temporarily restored snapshots, it is required to save objects
|
|
# (pages and cells) for real for viewing past snapshots as many cells are
|
|
# asynchronously loaded and must refer to a real Page object.
|
|
snapshot = models.ForeignKey(
|
|
'PageSnapshot', on_delete=models.CASCADE, null=True, related_name='temporary_page'
|
|
)
|
|
|
|
# keep a cached list of cell types that are used in the page.
|
|
related_cells = JSONField(blank=True, default=dict)
|
|
|
|
_level = None
|
|
|
|
class Meta:
|
|
ordering = ['order']
|
|
|
|
def __str__(self):
|
|
return self.title
|
|
|
|
def natural_key(self):
|
|
return (self.get_online_url().strip('/'),)
|
|
|
|
def picture_extension(self):
|
|
if not self.picture:
|
|
return None
|
|
return os.path.splitext(self.picture.name)[-1]
|
|
|
|
def save(self, *args, **kwargs):
|
|
if 'update_fields' in kwargs:
|
|
return super().save(*args, **kwargs)
|
|
|
|
if not self.id:
|
|
self.related_cells = {'cell_types': []}
|
|
if self.order is None:
|
|
max_order = Page.objects.all().aggregate(Max('order')).get('order__max') or 0
|
|
self.order = max_order + 1
|
|
if not self.slug:
|
|
if not Page.objects.exists():
|
|
slug = 'index'
|
|
else:
|
|
base_slug = slugify(self.title)[:40]
|
|
slug = base_slug.strip('-')
|
|
i = 1
|
|
while Page.objects.filter(slug=slug, parent_id=self.parent_id).exists():
|
|
i += 1
|
|
slug = '%s-%s' % (base_slug, i)
|
|
self.slug = slug
|
|
if not self.template_name:
|
|
self.template_name = settings.COMBO_DEFAULT_PUBLIC_TEMPLATE
|
|
return super().save(*args, **kwargs)
|
|
|
|
def get_parents_and_self(self):
|
|
pages = [self]
|
|
page = self
|
|
while page.parent_id:
|
|
page = page._parent if hasattr(page, '_parent') else page.parent
|
|
pages.append(page)
|
|
return list(reversed(pages))
|
|
|
|
def get_online_url(self):
|
|
if self.redirect_url and not (
|
|
utils.is_templated_url(self.redirect_url) or self.redirect_url.startswith('.')
|
|
):
|
|
return self.redirect_url
|
|
parts = [x.slug for x in self.get_parents_and_self()]
|
|
if parts[0] == 'index':
|
|
parts = parts[1:]
|
|
if not parts:
|
|
return '/'
|
|
return '/' + '/'.join(parts) + '/'
|
|
|
|
def get_page_of_level(self, level):
|
|
'''Return page of given level in the page hierarchy.'''
|
|
parts = [self]
|
|
page = self
|
|
while page.parent_id:
|
|
page = page._parent if hasattr(page, '_parent') else page.parent
|
|
parts.append(page)
|
|
parts.reverse()
|
|
try:
|
|
return parts[level]
|
|
except IndexError:
|
|
return None
|
|
|
|
def get_siblings(self):
|
|
if hasattr(self, '_parent'):
|
|
if self._parent:
|
|
return self._parent._children
|
|
return Page.objects.filter(parent_id=self.parent_id)
|
|
|
|
def get_children(self):
|
|
if hasattr(self, '_children'):
|
|
return self._children
|
|
return Page.objects.filter(parent_id=self.id)
|
|
|
|
def has_children(self):
|
|
if hasattr(self, '_children'):
|
|
return bool(self._children)
|
|
return Page.objects.filter(parent_id=self.id).exists()
|
|
|
|
def get_descendants(self, include_myself=False):
|
|
def get_descendant_pages(page, include_page=True):
|
|
if include_page:
|
|
descendants = [page]
|
|
else:
|
|
descendants = []
|
|
for item in page.get_children():
|
|
descendants.extend(get_descendant_pages(item))
|
|
return descendants
|
|
|
|
return Page.objects.filter(
|
|
id__in=[x.id for x in get_descendant_pages(self, include_page=include_myself)]
|
|
)
|
|
|
|
def get_descendants_and_me(self):
|
|
return self.get_descendants(include_myself=True)
|
|
|
|
def get_template_display_name(self):
|
|
try:
|
|
return settings.COMBO_PUBLIC_TEMPLATES[self.template_name]['name']
|
|
except KeyError:
|
|
return _('Unknown (%s)') % self.template_name
|
|
|
|
def missing_template(self):
|
|
template_name = settings.COMBO_PUBLIC_TEMPLATES.get(self.template_name, {}).get('template')
|
|
if not template_name:
|
|
return True
|
|
try:
|
|
template.loader.select_template([template_name])
|
|
except TemplateDoesNotExist:
|
|
return True
|
|
return False
|
|
|
|
def is_editable(self, user):
|
|
if user.is_staff:
|
|
return True
|
|
group_ids = [x.id for x in user.groups.all()]
|
|
if self.edit_role_id in group_ids:
|
|
return True
|
|
hierarchy = self.get_parents_and_self()
|
|
for page in hierarchy:
|
|
if page.subpages_edit_role_id in group_ids:
|
|
return True
|
|
return False
|
|
|
|
def get_placeholders(self, request, traverse_cells=False, template_name=None):
|
|
placeholders = []
|
|
|
|
page_template = settings.COMBO_PUBLIC_TEMPLATES.get(template_name or self.template_name, {})
|
|
if page_template.get('placeholders'):
|
|
# manual declaration
|
|
for key, options in page_template['placeholders'].items():
|
|
placeholders.append(Placeholder(key=key, **options))
|
|
return placeholders
|
|
|
|
template_names = []
|
|
if page_template.get('template'):
|
|
template_names.append(page_template['template'])
|
|
template_names.append('combo/page_template.html')
|
|
tmpl = template.loader.select_template(template_names)
|
|
request = RequestFactory(SERVER_NAME=request.get_host()).get(self.get_online_url())
|
|
request.user = None
|
|
context = {
|
|
'page': self,
|
|
'request': request,
|
|
'synchronous': True,
|
|
'placeholder_search_mode': True,
|
|
'placeholders': placeholders,
|
|
'traverse_cells': traverse_cells,
|
|
}
|
|
tmpl.render(context, request)
|
|
return placeholders
|
|
|
|
def get_next_page(self, user=None, check_visibility=True, **kwargs):
|
|
pages = Page.get_as_reordered_flat_hierarchy(Page.objects.all(), **kwargs)
|
|
this_page = [x for x in pages if x.id == self.id][0]
|
|
pages = pages[pages.index(this_page) + 1 :]
|
|
for page in pages:
|
|
if not check_visibility or page.is_visible(user):
|
|
return page
|
|
return None
|
|
|
|
def get_previous_page(self, user=None, check_visibility=True, **kwargs):
|
|
pages = Page.get_as_reordered_flat_hierarchy(Page.objects.all(), **kwargs)
|
|
pages.reverse()
|
|
this_page = [x for x in pages if x.id == self.id][0]
|
|
pages = pages[pages.index(this_page) + 1 :]
|
|
for page in pages:
|
|
if not check_visibility or page.is_visible(user):
|
|
return page
|
|
return None
|
|
|
|
@classmethod
|
|
def get_as_reordered_flat_hierarchy(cls, object_list, root_page=None, follow_user_perms=None):
|
|
# create a list of [(page.order, page.id, page), (subpage.order, subpage.id, subpage),
|
|
# (subsubpage.order, subpage.id, subsubpage)] and sort it to get the page hierarchy
|
|
# as a flat list.
|
|
# follow_user_perms can be None or a User object, in that case only pages that are
|
|
# editable by user will be returned.
|
|
all_pages = {}
|
|
for page in object_list:
|
|
all_pages[page.id] = page
|
|
|
|
pages_hierarchy = []
|
|
for page in object_list:
|
|
page_hierarchy = [(page.order, page.id, page)]
|
|
parent_id = page.parent_id
|
|
while parent_id and parent_id in all_pages:
|
|
parent_page = all_pages[parent_id]
|
|
page_hierarchy.append((parent_page.order, parent_page.id, parent_page))
|
|
parent_id = parent_page.parent_id
|
|
page_hierarchy.reverse()
|
|
page.level = len(page_hierarchy) - 1
|
|
pages_hierarchy.append(page_hierarchy)
|
|
|
|
group_ids = None # None = do not pay attention to groups
|
|
if follow_user_perms and not follow_user_perms.is_staff:
|
|
group_ids = [x.id for x in follow_user_perms.groups.all()]
|
|
|
|
pages_hierarchy.sort()
|
|
|
|
if group_ids is not None:
|
|
# remove pages the user cannot see/edit
|
|
pages_hierarchy = [
|
|
x
|
|
for x in pages_hierarchy
|
|
if x[-1][-1].edit_role_id in group_ids
|
|
or any(y[-1].subpages_edit_role_id in group_ids for y in x[:-1])
|
|
]
|
|
|
|
# adjust levels to have shallowest level at 0
|
|
seen_pages = {} # page_id -> page_level
|
|
for page_hierarchy in pages_hierarchy:
|
|
_, page_id, page = page_hierarchy[-1]
|
|
if page.parent_id in seen_pages:
|
|
# parent page is displayed, adjust level according to it
|
|
page.level = seen_pages[page.parent_id] + 1
|
|
else:
|
|
# page with no parent displayed, set it at root level
|
|
page.level = 0
|
|
seen_pages[page_id] = page.level
|
|
|
|
return [x[-1][-1] for x in pages_hierarchy]
|
|
|
|
@staticmethod
|
|
@utils.cache_during_request
|
|
def get_with_hierarchy_attributes():
|
|
pages = Page.objects.all()
|
|
pages_by_id = {}
|
|
for page in pages:
|
|
pages_by_id[page.id] = page
|
|
page._parent = None
|
|
page._children = []
|
|
for page in pages:
|
|
page._parent = pages_by_id[page.parent_id] if page.parent_id else None
|
|
if page._parent:
|
|
page._parent._children.append(page)
|
|
for page in pages:
|
|
page._children.sort(key=lambda x: x.order)
|
|
return pages_by_id
|
|
|
|
def visibility(self):
|
|
if self.public:
|
|
return _('Public')
|
|
groups = self.groups.all()
|
|
groupnames = ', '.join([x.name for x in groups]) if groups else _('logged users')
|
|
return _('Private (%s)') % groupnames
|
|
|
|
def is_visible(self, user=None, ignore_superuser=False):
|
|
return element_is_visible(self, user=user, ignore_superuser=ignore_superuser)
|
|
|
|
def extra_labels(self):
|
|
extra_labels = []
|
|
if not self.exclude_from_navigation:
|
|
extra_labels.append(_('navigation'))
|
|
if self.redirect_url:
|
|
extra_labels.append(_('redirection'))
|
|
return extra_labels
|
|
|
|
def get_cells(self):
|
|
return CellBase.get_cells(page=self)
|
|
|
|
def build_cell_cache(self):
|
|
cell_classes = get_cell_classes()
|
|
cell_types = set()
|
|
for klass in cell_classes:
|
|
if klass is None:
|
|
continue
|
|
if klass.objects.filter(page=self).exists():
|
|
cell_types.add(klass.get_cell_type_str())
|
|
if cell_types != set(self.related_cells.get('cell_types', [])):
|
|
self.related_cells['cell_types'] = list(cell_types)
|
|
self.save(update_fields=['related_cells', 'last_update_timestamp'])
|
|
|
|
def get_serialized_page(self):
|
|
cells = [x for x in self.get_cells() if x.placeholder and not x.placeholder.startswith('_')]
|
|
serialized_page = json.loads(
|
|
serializers.serialize(
|
|
'json', [self], use_natural_foreign_keys=True, use_natural_primary_keys=True
|
|
)
|
|
)[0]
|
|
del serialized_page['model']
|
|
if 'snapshot' in serialized_page:
|
|
del serialized_page['snapshot']
|
|
if 'related_cells' in serialized_page['fields']:
|
|
del serialized_page['fields']['related_cells']
|
|
serialized_page['cells'] = json.loads(
|
|
serializers.serialize('json', cells, use_natural_foreign_keys=True, use_natural_primary_keys=True)
|
|
)
|
|
for index, cell in enumerate(cells):
|
|
serialized_page['cells'][index].update(cell.export_subobjects())
|
|
serialized_page['fields']['groups'] = [x[0] for x in serialized_page['fields']['groups']]
|
|
for cell in serialized_page['cells']:
|
|
del cell['pk']
|
|
del cell['fields']['page']
|
|
cell['fields']['groups'] = [x[0] for x in cell['fields']['groups']]
|
|
for key in list(cell['fields'].keys()):
|
|
if key.startswith('cached_'):
|
|
del cell['fields'][key]
|
|
return serialized_page
|
|
|
|
@classmethod
|
|
def load_serialized_page(cls, json_page, page=None, snapshot=None, request=None):
|
|
json_page['model'] = 'data.page'
|
|
json_page['fields']['groups'] = [[x] for x in json_page['fields']['groups'] if isinstance(x, str)]
|
|
created = None
|
|
if page is None:
|
|
page, created = Page.objects.get_or_create(slug=json_page['fields']['slug'], snapshot=snapshot)
|
|
json_page['pk'] = page.id
|
|
parent_slug = json_page['fields'].get('parent') or []
|
|
if parent_slug and not Page.objects.filter(slug=parent_slug[0].split('/')[-1]).exists():
|
|
# parent not found, remove it and exclude page from navigation
|
|
json_page['fields'].pop('parent')
|
|
json_page['fields']['exclude_from_navigation'] = True
|
|
if request:
|
|
messages.warning(
|
|
request,
|
|
_(
|
|
'Unknown parent for page "%s"; parent has been reset and page was excluded from navigation.'
|
|
)
|
|
% json_page['fields']['title'],
|
|
)
|
|
page = next(serializers.deserialize('json', json.dumps([json_page]), ignorenonexistent=True))
|
|
page.object.snapshot = snapshot
|
|
page.save()
|
|
for cell in json_page.get('cells'):
|
|
cell['fields']['groups'] = [[x] for x in cell['fields']['groups'] if isinstance(x, str)]
|
|
cell['fields']['page'] = page.object.id
|
|
|
|
# if there were cells, remove them
|
|
# if page was created, do nothing
|
|
if created is False:
|
|
for klass in get_cell_classes():
|
|
if klass is None:
|
|
continue
|
|
klass.objects.filter(page=page.object).delete()
|
|
return page.object # get page out of deserialization object
|
|
|
|
@classmethod
|
|
def load_serialized_cells(cls, cells):
|
|
# load new cells
|
|
for cell_data in cells:
|
|
model = apps.get_model(cell_data['model'])
|
|
cell_data = model.prepare_serialized_data(cell_data)
|
|
cell = list(serializers.deserialize('json', json.dumps([cell_data]), ignorenonexistent=True))[0]
|
|
cell.save()
|
|
# will populate cached_* attributes
|
|
cell.object.save()
|
|
cell.object.import_subobjects(cell_data)
|
|
|
|
@classmethod
|
|
def load_serialized_pages(cls, json_site, request=None):
|
|
cells_to_load = []
|
|
to_load = []
|
|
imported_pages = []
|
|
try:
|
|
post_save.disconnect(cell_maintain_page_cell_cache)
|
|
post_delete.disconnect(cell_maintain_page_cell_cache)
|
|
|
|
for json_page in json_site:
|
|
# pre-create pages
|
|
page, created = Page.objects.get_or_create(slug=json_page['fields']['slug'])
|
|
to_load.append((page, created, json_page))
|
|
|
|
# delete cells of already existing pages
|
|
to_clean = [p for p, created, j in to_load if not created]
|
|
for klass in get_cell_classes():
|
|
if klass is None:
|
|
continue
|
|
klass.objects.filter(page__in=to_clean).delete()
|
|
|
|
# now load pages
|
|
for (page, created, json_page) in to_load:
|
|
imported_pages.append(cls.load_serialized_page(json_page, page=page, request=request))
|
|
cells_to_load.extend(json_page.get('cells'))
|
|
|
|
# and cells
|
|
cls.load_serialized_cells(cells_to_load)
|
|
finally:
|
|
post_save.connect(cell_maintain_page_cell_cache)
|
|
post_delete.connect(cell_maintain_page_cell_cache)
|
|
|
|
# build cache
|
|
for page in imported_pages:
|
|
page.build_cell_cache()
|
|
|
|
return imported_pages
|
|
|
|
@classmethod
|
|
def export_all_for_json(cls):
|
|
ordered_pages = Page.get_as_reordered_flat_hierarchy(cls.objects.all())
|
|
return [x.get_serialized_page() for x in ordered_pages]
|
|
|
|
def get_redirect_url(self, context=None):
|
|
return utils.get_templated_url(self.redirect_url, context=context)
|
|
|
|
def get_last_update_time(self):
|
|
return self.last_update_timestamp
|
|
|
|
def is_new(self):
|
|
return self.creation_timestamp > timezone.now() - datetime.timedelta(days=7)
|
|
|
|
def duplicate(self, title=None, parent=False):
|
|
# clone current page
|
|
new_page = copy.deepcopy(self)
|
|
new_page.pk = None
|
|
# set title
|
|
new_page.title = title or _('Copy of %s') % self.title
|
|
# reset slug
|
|
new_page.slug = None
|
|
# reset snapshot
|
|
new_page.snapshot = None
|
|
# set order
|
|
new_page.order = self.order + 1
|
|
# exclude from navigation
|
|
new_page.exclude_from_navigation = True
|
|
# set parent
|
|
if parent is not False:
|
|
# it can be None
|
|
new_page.parent = parent
|
|
# store new page
|
|
new_page.save()
|
|
|
|
# set groups
|
|
new_page.groups.set(self.groups.all())
|
|
|
|
for cell in self.get_cells():
|
|
if cell.placeholder and cell.placeholder.startswith('_'):
|
|
continue
|
|
cell.duplicate(page_target=new_page)
|
|
|
|
return new_page
|
|
|
|
|
|
class PageSnapshot(models.Model):
|
|
page = models.ForeignKey(Page, on_delete=models.SET_NULL, null=True)
|
|
timestamp = models.DateTimeField(auto_now_add=True)
|
|
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True)
|
|
comment = models.TextField(blank=True, null=True)
|
|
serialization = JSONField(blank=True, default=dict)
|
|
|
|
class Meta:
|
|
ordering = ('-timestamp',)
|
|
|
|
@classmethod
|
|
def take(cls, page, request=None, comment=None, deletion=False):
|
|
snapshot = cls(page=page, comment=comment)
|
|
if request and not request.user.is_anonymous:
|
|
snapshot.user = request.user
|
|
if not deletion:
|
|
snapshot.serialization = page.get_serialized_page()
|
|
else:
|
|
snapshot.serialization = {}
|
|
snapshot.comment = comment or _('deletion')
|
|
snapshot.save()
|
|
|
|
def get_page(self):
|
|
try:
|
|
# try reusing existing page
|
|
return Page.snapshots.get(snapshot=self)
|
|
except Page.DoesNotExist:
|
|
return self.load_page(self.serialization, snapshot=self)
|
|
|
|
def restore(self):
|
|
json_page = self.serialization
|
|
# keep current page order
|
|
json_page['fields']['order'] = self.page.order
|
|
# and current parent
|
|
json_page['fields']['parent'] = [self.page.parent.slug] if self.page.parent else None
|
|
# and current exclude_from_navigation value
|
|
json_page['fields']['exclude_from_navigation'] = self.page.exclude_from_navigation
|
|
# restore snapshot
|
|
with transaction.atomic():
|
|
return self.load_page(json_page)
|
|
|
|
def load_page(self, json_page, snapshot=None):
|
|
try:
|
|
post_save.disconnect(cell_maintain_page_cell_cache)
|
|
post_delete.disconnect(cell_maintain_page_cell_cache)
|
|
|
|
page = Page.load_serialized_page(json_page, snapshot=snapshot)
|
|
page.load_serialized_cells(json_page['cells'])
|
|
finally:
|
|
post_save.connect(cell_maintain_page_cell_cache)
|
|
post_delete.connect(cell_maintain_page_cell_cache)
|
|
|
|
page.build_cell_cache()
|
|
return page
|
|
|
|
|
|
class Redirect(models.Model):
|
|
old_url = models.CharField(max_length=512)
|
|
page = models.ForeignKey(Page, on_delete=models.CASCADE)
|
|
creation_timestamp = models.DateTimeField(auto_now_add=True)
|
|
|
|
class Meta:
|
|
ordering = ('creation_timestamp',)
|
|
|
|
|
|
class ValidityInfo(models.Model):
|
|
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
|
|
object_id = models.PositiveIntegerField()
|
|
content_object = GenericForeignKey('content_type', 'object_id')
|
|
|
|
invalid_reason_code = models.CharField(max_length=100, blank=True, null=True, editable=False)
|
|
invalid_since = models.DateTimeField(blank=True, null=True, editable=False)
|
|
|
|
class Meta:
|
|
unique_together = [('content_type', 'object_id')]
|
|
|
|
@property
|
|
def invalid_datetime(self):
|
|
if not self.invalid_since:
|
|
return
|
|
return self.invalid_since + datetime.timedelta(days=2)
|
|
|
|
|
|
class CellMeta(MediaDefiningClass, ModelBase):
|
|
pass
|
|
|
|
|
|
@python_2_unicode_compatible
|
|
class CellBase(models.Model, metaclass=CellMeta):
|
|
|
|
page = models.ForeignKey(Page, on_delete=models.CASCADE)
|
|
placeholder = models.CharField(max_length=20)
|
|
order = models.PositiveIntegerField()
|
|
slug = models.SlugField(_('Slug'), blank=True)
|
|
extra_css_class = models.CharField(_('Extra classes for CSS styling'), max_length=100, blank=True)
|
|
template_name = models.CharField(_('Cell Template'), max_length=50, blank=True, null=True)
|
|
|
|
public = models.BooleanField(_('Public'), default=True)
|
|
# restricted_to_unlogged is actually an invert switch, it is used for mark
|
|
# a cell as only visibile to unlogged users but also, when groups are set,
|
|
# to mark the cell as visible to all but those groups.
|
|
restricted_to_unlogged = models.BooleanField(_('Restrict to unlogged users'), default=False)
|
|
groups = models.ManyToManyField(Group, verbose_name=_('Groups'), blank=True)
|
|
last_update_timestamp = models.DateTimeField(auto_now=True)
|
|
|
|
validity_info = GenericRelation(ValidityInfo)
|
|
invalid_reason_codes = {}
|
|
|
|
default_form_class = None
|
|
manager_form_factory_kwargs = {}
|
|
manager_form_template = 'combo/cell_form.html'
|
|
children_placeholder_prefix = None
|
|
|
|
visible = True
|
|
user_dependant = False
|
|
default_template_name = None
|
|
|
|
# get_badge(self, context); set to None so cell types can be skipped easily
|
|
get_badge = None
|
|
|
|
# message displayed when the cell is loaded asynchronously
|
|
loading_message = _('Loading...')
|
|
|
|
# modify_global_context(self, context, request=None)
|
|
# Apply changes to the template context that must visible to all cells in the page
|
|
modify_global_context = None
|
|
|
|
# if set, automatically refresh cell every n seconds
|
|
ajax_refresh = None
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def __str__(self):
|
|
label = self.get_verbose_name()
|
|
additional_label = self.get_additional_label()
|
|
if label and additional_label:
|
|
return '%s (%s)' % (label, re.sub(r'\r?\n', ' ', force_text(additional_label)))
|
|
else:
|
|
return force_text(label)
|
|
|
|
@classmethod
|
|
def get_verbose_name(cls):
|
|
return cls._meta.verbose_name
|
|
|
|
def get_additional_label(self):
|
|
return ''
|
|
|
|
@property
|
|
def legacy_class_name(self):
|
|
# legacy class name used in some themes
|
|
return self.__class__.__name__.lower()
|
|
|
|
@property
|
|
def class_name(self):
|
|
# convert CamelCase Python class name into a lower and dashed version
|
|
# appropriate for CSS
|
|
return re.sub('([A-Z]+)', r'-\1', self.__class__.__name__).lower()[1:]
|
|
|
|
@property
|
|
def css_class_names(self):
|
|
return ' '.join(
|
|
[
|
|
self.class_name,
|
|
self.legacy_class_name,
|
|
self.get_template_extra_css_classes(),
|
|
self.extra_css_class,
|
|
]
|
|
)
|
|
|
|
@property
|
|
def asset_css_classes(self):
|
|
from combo.apps.assets.models import Asset
|
|
|
|
if not hasattr(self, '_asset_keys'):
|
|
self._asset_keys = self.get_asset_slot_keys()
|
|
if not hasattr(self, '_assets'):
|
|
self._assets = {a.key: a for a in Asset.objects.filter(key__in=self._asset_keys.keys())}
|
|
|
|
if not self._asset_keys or not self._assets:
|
|
return ''
|
|
|
|
# add has-asset-<slug> for each asset found
|
|
css = sorted(f'has-asset-{v}' for k, v in self._asset_keys.items() if k in self._assets)
|
|
# add has-any-asset if at least one asset defined
|
|
if self._assets:
|
|
css.append('has-any-asset')
|
|
# add has-all-assets if all assets are defined
|
|
if self._assets.keys() == self._asset_keys.keys():
|
|
css.append('has-all-assets')
|
|
|
|
return ' '.join(css)
|
|
|
|
def can_have_assets(self):
|
|
return self.get_slug_for_asset() and self.get_asset_slot_templates()
|
|
|
|
def get_slug_for_asset(self):
|
|
return self.slug
|
|
|
|
def get_label_for_asset(self):
|
|
return _('%(cell_label)s on page %(page_name)s (%(page_slug)s)') % {
|
|
'cell_label': str(self),
|
|
'page_name': str(self.page),
|
|
'page_slug': self.page.slug,
|
|
}
|
|
|
|
def get_asset_slot_key(self, key):
|
|
return 'cell:%s:%s:%s' % (self.get_cell_type_str(), key, self.get_slug_for_asset())
|
|
|
|
def get_asset_slot_templates(self):
|
|
return settings.COMBO_CELL_ASSET_SLOTS.get(self.get_cell_type_str()) or {}
|
|
|
|
def get_asset_slot_keys(self):
|
|
if not self.can_have_assets():
|
|
return {}
|
|
slot_templates = self.get_asset_slot_templates()
|
|
return {self.get_asset_slot_key(key): key for key in slot_templates.keys()}
|
|
|
|
def get_asset_slots(self):
|
|
if not self.can_have_assets():
|
|
return {}
|
|
|
|
slot_templates = self.get_asset_slot_templates()
|
|
slots = {}
|
|
for slot_template_key, slot_template_data in slot_templates.items():
|
|
suffix = ''
|
|
if slot_template_data.get('suffix'):
|
|
suffix = ' (%s)' % slot_template_data['suffix']
|
|
slot_key = self.get_asset_slot_key(slot_template_key)
|
|
label = '%(prefix)s — %(label)s%(suffix)s' % {
|
|
'prefix': slot_template_data['prefix'],
|
|
'label': self.get_label_for_asset(),
|
|
'suffix': suffix,
|
|
}
|
|
short_label = '%(prefix)s%(suffix)s' % {'prefix': slot_template_data['prefix'], 'suffix': suffix}
|
|
slots[slot_key] = {
|
|
'label': label,
|
|
'short_label': short_label,
|
|
}
|
|
slots[slot_key].update(slot_template_data)
|
|
return slots
|
|
|
|
@classmethod
|
|
def get_cell_classes(cls, class_filter=lambda x: True):
|
|
for klass in get_cell_classes():
|
|
if not class_filter(klass):
|
|
continue
|
|
if not klass.is_enabled():
|
|
continue
|
|
if klass.visible is False:
|
|
continue
|
|
yield klass
|
|
|
|
@classmethod
|
|
def get_all_cell_types(cls, *args, **kwargs):
|
|
cell_types = []
|
|
for klass in cls.get_cell_classes(*args, **kwargs):
|
|
cell_types.extend(klass.get_cell_types())
|
|
return cell_types
|
|
|
|
@classmethod
|
|
def get_cells(
|
|
cls,
|
|
cell_filter=None,
|
|
skip_cell_cache=False,
|
|
prefetch_validity_info=False,
|
|
select_related=None,
|
|
load_contenttypes=False,
|
|
cells_exclude=None,
|
|
**kwargs,
|
|
):
|
|
"""Returns the list of cells of various classes matching **kwargs"""
|
|
cells = []
|
|
pages = []
|
|
select_related = select_related or {}
|
|
if 'page' in kwargs:
|
|
pages = [kwargs['page']]
|
|
elif 'page__in' in kwargs:
|
|
pages = kwargs['page__in']
|
|
else:
|
|
# if there are not explicit page, limit to non-snapshot pages
|
|
kwargs['page__snapshot__isnull'] = True
|
|
cell_classes = get_cell_classes()
|
|
if pages and not skip_cell_cache:
|
|
# if there's a request for some specific pages, limit cell types
|
|
# to those that are actually in use in those pages.
|
|
cell_types = set()
|
|
for page in pages:
|
|
if page.related_cells and 'cell_types' in page.related_cells:
|
|
cell_types |= set(page.related_cells['cell_types'])
|
|
else:
|
|
break
|
|
else:
|
|
cell_classes = [get_cell_class(x) for x in cell_types]
|
|
if load_contenttypes:
|
|
# populate ContentType cache
|
|
ContentType.objects.get_for_models(*cell_classes)
|
|
extra_filter = kwargs.pop('extra_filter', None)
|
|
for klass in cell_classes:
|
|
if klass is None:
|
|
continue
|
|
if cell_filter and not cell_filter(klass):
|
|
continue
|
|
cells_queryset = klass.objects.filter(**kwargs)
|
|
if cells_exclude:
|
|
cells_queryset = cells_queryset.exclude(cells_exclude)
|
|
if extra_filter:
|
|
cells_queryset = cells_queryset.filter(extra_filter)
|
|
if select_related:
|
|
cells_queryset = cells_queryset.select_related(
|
|
*select_related.get('__all__', []), *select_related.get(klass.get_cell_type_str(), [])
|
|
)
|
|
cells.extend(cells_queryset)
|
|
if prefetch_validity_info:
|
|
validity_info_list = list(ValidityInfo.objects.select_related('content_type'))
|
|
for cell in cells:
|
|
cell.prefetched_validity_info = [
|
|
v
|
|
for v in validity_info_list
|
|
if v.object_id == cell.pk and v.content_type.model_class() == cell.__class__
|
|
]
|
|
cells.sort(key=lambda x: x.order)
|
|
return cells
|
|
|
|
def get_reference(self):
|
|
"Returns a string that can serve as a unique reference to a cell" ""
|
|
return str('%s-%s' % (self.get_cell_type_str(), self.id))
|
|
|
|
@classmethod
|
|
def get_cell(cls, reference, **kwargs):
|
|
"""Returns the cell matching reference, and eventual **kwargs"""
|
|
content_id, cell_id = reference.split('-')
|
|
klass = get_cell_class(content_id)
|
|
if klass is None:
|
|
raise ObjectDoesNotExist()
|
|
return klass.objects.get(id=cell_id, **kwargs)
|
|
|
|
@classmethod
|
|
def get_cell_type_str(cls):
|
|
return '%s_%s' % (cls._meta.app_label, cls._meta.model_name)
|
|
|
|
@classmethod
|
|
def get_cell_type_group(cls):
|
|
return apps.get_app_config(cls._meta.app_label).verbose_name
|
|
|
|
@classmethod
|
|
def get_cell_types(cls):
|
|
return [
|
|
{
|
|
'name': cls.get_verbose_name(),
|
|
'cell_type_str': cls.get_cell_type_str(),
|
|
'group': cls.get_cell_type_group(),
|
|
'variant': 'default',
|
|
'order': 0,
|
|
}
|
|
]
|
|
|
|
@classmethod
|
|
def is_enabled(cls):
|
|
"""Defines if the cell type is enabled for the given site; this is used
|
|
to selectively enable cells from extension modules."""
|
|
return True
|
|
|
|
def set_variant(self, variant):
|
|
pass
|
|
|
|
def get_label(self):
|
|
return self.get_verbose_name()
|
|
|
|
def get_default_form_fields(self):
|
|
return [
|
|
x.name
|
|
for x in self._meta.local_concrete_fields
|
|
if x.name
|
|
not in (
|
|
'id',
|
|
'page',
|
|
'placeholder',
|
|
'order',
|
|
'public',
|
|
'groups',
|
|
'slug',
|
|
'extra_css_class',
|
|
'last_update_timestamp',
|
|
'restricted_to_unlogged',
|
|
'template_name',
|
|
)
|
|
]
|
|
|
|
def get_default_form_class(self):
|
|
if self.default_form_class:
|
|
return self.default_form_class
|
|
|
|
fields = self.get_default_form_fields()
|
|
if not fields:
|
|
return None
|
|
|
|
return model_forms.modelform_factory(
|
|
self.__class__, fields=fields, **self.manager_form_factory_kwargs
|
|
)
|
|
|
|
def get_options_form_class(self):
|
|
fields = ['slug', 'extra_css_class']
|
|
widgets = None
|
|
extra_templates = settings.COMBO_CELL_TEMPLATES.get(self.get_cell_type_str())
|
|
if extra_templates:
|
|
fields = ['template_name'] + fields
|
|
template_names = [('', _('Default Value'))] + [
|
|
(k, v['label']) for k, v in extra_templates.items()
|
|
]
|
|
widgets = {'template_name': forms.Select(choices=template_names)}
|
|
return model_forms.modelform_factory(self.__class__, fields=fields, widgets=widgets)
|
|
|
|
def get_extra_manager_context(self):
|
|
return {}
|
|
|
|
def mark_as_invalid(self, reason_code, force=True):
|
|
validity_info, created = ValidityInfo.objects.get_or_create(
|
|
content_type=ContentType.objects.get_for_model(self),
|
|
object_id=self.pk,
|
|
defaults={
|
|
'invalid_reason_code': reason_code,
|
|
'invalid_since': now(),
|
|
},
|
|
)
|
|
if created:
|
|
return
|
|
|
|
if not force and validity_info.invalid_since is not None:
|
|
# don't overwrite invalid reason already set
|
|
return
|
|
|
|
if validity_info.invalid_reason_code == reason_code:
|
|
# don't overwrite invalid_since if same reason already set
|
|
return
|
|
|
|
validity_info.invalid_reason_code = reason_code
|
|
validity_info.invalid_since = now()
|
|
validity_info.save()
|
|
|
|
def mark_as_valid(self):
|
|
validity_info = self.get_validity_info()
|
|
if validity_info is None:
|
|
return
|
|
validity_info.delete()
|
|
|
|
def get_validity_info(self):
|
|
if hasattr(self, 'prefetched_validity_info'):
|
|
if not self.prefetched_validity_info:
|
|
return
|
|
return self.prefetched_validity_info[0]
|
|
return self.validity_info.all().first()
|
|
|
|
def set_validity_from_url(self, resp, not_found_code='url_not_found', invalid_code='url_invalid'):
|
|
if resp is None:
|
|
# can not retrieve data, don't report cell as invalid
|
|
self.mark_as_valid()
|
|
elif resp.status_code == 404:
|
|
self.mark_as_invalid(not_found_code)
|
|
elif 400 <= resp.status_code < 500:
|
|
# 4xx error, cell is invalid
|
|
self.mark_as_invalid(invalid_code)
|
|
else:
|
|
# 2xx or 3xx: cell is valid
|
|
# 5xx error: can not retrieve data, don't report cell as invalid
|
|
self.mark_as_valid()
|
|
|
|
def get_invalid_reason(self):
|
|
validity_info = self.get_validity_info()
|
|
if validity_info is None:
|
|
return
|
|
if not validity_info.invalid_since:
|
|
return
|
|
return self.invalid_reason_codes.get(
|
|
validity_info.invalid_reason_code, validity_info.invalid_reason_code
|
|
)
|
|
|
|
def is_placeholder_active(self):
|
|
if not self.placeholder:
|
|
return False
|
|
if self.placeholder.startswith('_'):
|
|
return True
|
|
|
|
request = RequestFactory().get('/')
|
|
if not hasattr(self.page, '_placeholders'):
|
|
self.page._placeholders = self.page.get_placeholders(request, traverse_cells=True)
|
|
for placeholder in self.page._placeholders:
|
|
if placeholder.key == self.placeholder:
|
|
return True
|
|
return False
|
|
|
|
def is_hidden_because_invalid(self):
|
|
validity_info = self.get_validity_info()
|
|
return (
|
|
validity_info is not None
|
|
and validity_info.invalid_datetime
|
|
and validity_info.invalid_datetime <= now()
|
|
)
|
|
|
|
def is_visible(self, user=None, check_validity_info=True):
|
|
if check_validity_info and self.is_hidden_because_invalid():
|
|
return False
|
|
return element_is_visible(self, user=user)
|
|
|
|
def is_relevant(self, context):
|
|
"""Return whether it's relevant to render this cell in the page
|
|
context."""
|
|
return True
|
|
|
|
def is_user_dependant(self, context=None):
|
|
'''Return whether the cell content varies from user to user.'''
|
|
return self.user_dependant
|
|
|
|
def get_concerned_user(self, context):
|
|
'''Return user from UserSearch cell, or connected user.'''
|
|
return context.get('selected_user') or getattr(context.get('request'), 'user', None)
|
|
|
|
def get_cell_extra_context(self, context):
|
|
return {'cell': self}
|
|
|
|
def get_template_label(self):
|
|
cell_templates = settings.COMBO_CELL_TEMPLATES.get(self.get_cell_type_str()) or {}
|
|
selected_template_infos = cell_templates.get(self.template_name) or {}
|
|
return selected_template_infos.get('label')
|
|
|
|
def get_template_extra_css_classes(self):
|
|
cell_templates = settings.COMBO_CELL_TEMPLATES.get(self.get_cell_type_str()) or {}
|
|
selected_template_infos = cell_templates.get(self.template_name) or {}
|
|
return selected_template_infos.get('extra-css-classes') or ''
|
|
|
|
def render(self, context):
|
|
context.update(self.get_cell_extra_context(context))
|
|
template_names = ['combo/' + self._meta.model_name + '.html']
|
|
base_template_name = self._meta.model_name + '.html'
|
|
if self.default_template_name:
|
|
base_template_name = os.path.basename(self.default_template_name)
|
|
template_names.append(self.default_template_name)
|
|
if self.template_name:
|
|
cell_templates = settings.COMBO_CELL_TEMPLATES.get(self.get_cell_type_str()) or {}
|
|
selected_template_infos = cell_templates.get(self.template_name) or {}
|
|
if 'template' in selected_template_infos:
|
|
template_names.append(selected_template_infos.get('template'))
|
|
if self.slug:
|
|
template_names.append('combo/cells/%s/%s' % (self.slug, base_template_name))
|
|
template_names.reverse()
|
|
tmpl = template.loader.select_template(template_names)
|
|
return tmpl.render(context, context.get('request'))
|
|
|
|
def render_for_search(self):
|
|
if not self.is_enabled():
|
|
return ''
|
|
if self.is_user_dependant():
|
|
return ''
|
|
request = RequestFactory().get(self.page.get_online_url())
|
|
request.user = None # compat
|
|
context = {
|
|
'page': self.page,
|
|
'render_skeleton': False,
|
|
'request': request,
|
|
'site_base': request.build_absolute_uri('/')[:-1],
|
|
'synchronous': True,
|
|
'user': None, # compat
|
|
'absolute_uri': request.build_absolute_uri,
|
|
}
|
|
if not self.is_relevant(context):
|
|
return ''
|
|
return html.unescape(strip_tags(self.render(context)))
|
|
|
|
def get_external_links_data(self):
|
|
return []
|
|
|
|
@classmethod
|
|
def prepare_serialized_data(cls, cell_data):
|
|
return cell_data
|
|
|
|
def export_subobjects(self):
|
|
return {}
|
|
|
|
def import_subobjects(self, cell_json):
|
|
pass
|
|
|
|
def duplicate(self, page_target=None, placeholder=None, reset_slug=False, set_order=False):
|
|
# clone current cell
|
|
new_cell = copy.deepcopy(self)
|
|
new_cell.pk = None
|
|
# set page
|
|
new_cell.page = page_target or self.page
|
|
# set placeholder
|
|
new_cell.placeholder = placeholder or new_cell.placeholder
|
|
# reset slug if requested
|
|
if reset_slug:
|
|
new_cell.slug = ''
|
|
# set order if requested
|
|
if set_order:
|
|
new_cell.order = self.order + 1
|
|
# store new cell
|
|
new_cell.save()
|
|
|
|
# set groups
|
|
new_cell.groups.set(self.groups.all())
|
|
|
|
if hasattr(self, 'duplicate_m2m'):
|
|
self.duplicate_m2m(new_cell)
|
|
|
|
return new_cell
|
|
|
|
|
|
@register_cell_class
|
|
class TextCell(CellBase):
|
|
text = RichTextField(_('Text'), blank=True, null=True)
|
|
|
|
default_template_name = 'combo/text-cell.html'
|
|
|
|
class Meta:
|
|
verbose_name = _('Text')
|
|
|
|
def is_relevant(self, context):
|
|
return bool(self.text)
|
|
|
|
def get_additional_label(self):
|
|
if not self.text:
|
|
return None
|
|
return utils.ellipsize(self.text)
|
|
|
|
@classmethod
|
|
def get_cell_types(cls):
|
|
d = super().get_cell_types()
|
|
d[0]['order'] = -1
|
|
return d
|
|
|
|
def get_cell_extra_context(self, context):
|
|
extra_context = super().get_cell_extra_context(context)
|
|
text = self.text or ''
|
|
force_absolute_url = context.get('force_absolute_url') or context.get('render_skeleton')
|
|
|
|
def sub_variadic_url(match):
|
|
attribute = match.group(1)
|
|
url = match.group(2)
|
|
try:
|
|
url = utils.get_templated_url(url, context=context)
|
|
except utils.TemplateError as e:
|
|
logger = logging.getLogger(__name__)
|
|
logger.warning('error in templated URL (%s): %s', url, e)
|
|
return '%s="%s"' % (attribute, url)
|
|
|
|
text = re.sub(r'(href|src)="(.*?)"', sub_variadic_url, text)
|
|
|
|
if force_absolute_url:
|
|
request = context.get('request')
|
|
|
|
def sub_src(match):
|
|
url = request.build_absolute_uri(match.group(1))
|
|
return 'src="%s"' % url
|
|
|
|
def sub_href(match):
|
|
url = match.group(1)
|
|
if url.startswith('#'):
|
|
pass
|
|
else:
|
|
url = request.build_absolute_uri(url)
|
|
return 'href="%s"' % url
|
|
|
|
text = re.sub(r'src="(.*?)"', sub_src, text)
|
|
text = re.sub(r'href="(.*?)"', sub_href, text)
|
|
extra_context['text'] = mark_safe(text)
|
|
return extra_context
|
|
|
|
|
|
@register_cell_class
|
|
class FortuneCell(CellBase):
|
|
ajax_refresh = 30
|
|
|
|
class Meta:
|
|
verbose_name = _('Fortune')
|
|
|
|
def render(self, context):
|
|
return subprocess.check_output(['fortune'])
|
|
|
|
@classmethod
|
|
def is_enabled(cls):
|
|
try:
|
|
subprocess.check_output(['fortune'])
|
|
except OSError:
|
|
return False
|
|
return settings.DEBUG
|
|
|
|
|
|
@register_cell_class
|
|
class UnlockMarkerCell(CellBase):
|
|
# XXX: this is kept to smooth transitions, it should be removed once all
|
|
# sites # have been migrated to ParentContentCell
|
|
"""Marks an 'acquired' placeholder as unlocked."""
|
|
visible = False
|
|
|
|
class Meta:
|
|
verbose_name = _('Unlock Marker')
|
|
|
|
def render(self, context):
|
|
return ''
|
|
|
|
|
|
@register_cell_class
|
|
class MenuCell(CellBase):
|
|
depth = models.PositiveIntegerField(
|
|
_('Depth'), choices=[(i, i) for i in range(1, 3)], default=1, null=False
|
|
)
|
|
initial_level = models.IntegerField(
|
|
_('Initial Level'),
|
|
choices=[(-1, _('Same as page'))] + [(i, i) for i in range(1, 3)],
|
|
default=-1,
|
|
null=False,
|
|
)
|
|
root_page = models.ForeignKey(
|
|
Page,
|
|
on_delete=models.CASCADE,
|
|
related_name='root_page',
|
|
null=True,
|
|
blank=True,
|
|
verbose_name=_('Root Page'),
|
|
)
|
|
|
|
default_template_name = 'combo/menu-cell.html'
|
|
exclude_from_search = True
|
|
|
|
class Meta:
|
|
verbose_name = _('Menu')
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import MenuCellForm
|
|
|
|
return MenuCellForm
|
|
|
|
def get_cell_extra_context(self, context):
|
|
from combo.public.menu import render_menu
|
|
|
|
ctx = super().get_cell_extra_context(context)
|
|
ctx['menu'] = render_menu(
|
|
context,
|
|
level=self.initial_level,
|
|
root_page=self.root_page,
|
|
depth=self.depth,
|
|
ignore_visibility=False,
|
|
)
|
|
return ctx
|
|
|
|
|
|
@register_cell_class
|
|
class LinkCell(CellBase):
|
|
title = models.CharField(_('Title'), max_length=150, blank=True)
|
|
url = models.CharField(_('URL'), max_length=200, blank=True, validators=[django_template_validator])
|
|
link_page = models.ForeignKey(
|
|
'data.Page',
|
|
on_delete=models.CASCADE,
|
|
related_name='link_cell',
|
|
blank=True,
|
|
null=True,
|
|
verbose_name=_('Internal link'),
|
|
)
|
|
anchor = models.CharField(_('Anchor'), max_length=150, blank=True)
|
|
|
|
default_template_name = 'combo/link-cell.html'
|
|
add_as_link_label = _('add a link')
|
|
add_link_label = _('New link')
|
|
edit_link_label = _('Edit link')
|
|
add_as_link_code = 'link'
|
|
|
|
invalid_reason_codes = {
|
|
'data_url_not_defined': _('No link set'),
|
|
'data_url_not_found': _('URL seems to unexist'),
|
|
'data_url_invalid': _('URL seems to be invalid'),
|
|
}
|
|
|
|
class Meta:
|
|
verbose_name = _('Link')
|
|
|
|
def save(self, *args, **kwargs):
|
|
if 'update_fields' in kwargs:
|
|
# don't check validity
|
|
return super().save(*args, **kwargs)
|
|
|
|
result = super().save(*args, **kwargs)
|
|
# check validity
|
|
self.check_validity()
|
|
return result
|
|
|
|
def get_additional_label(self):
|
|
title = self.title
|
|
if not title and self.link_page:
|
|
title = self.link_page.title
|
|
if not title:
|
|
return None
|
|
return utils.ellipsize(title)
|
|
|
|
def get_slug_for_asset(self):
|
|
if self.placeholder and self.placeholder.startswith('_'):
|
|
return
|
|
if self.link_page:
|
|
return self.link_page.slug
|
|
return self.slug
|
|
|
|
def get_label_for_asset(self):
|
|
if self.link_page:
|
|
return str(self)
|
|
return super().get_label_for_asset()
|
|
|
|
def get_url(self, context=None):
|
|
context = context or {}
|
|
if self.link_page:
|
|
url = self.link_page.get_online_url()
|
|
else:
|
|
url = utils.get_templated_url(self.url, context=context)
|
|
if self.anchor:
|
|
url += '#' + self.anchor
|
|
return url
|
|
|
|
def get_cell_extra_context(self, context):
|
|
force_absolute_url = context.get('force_absolute_url') or context.get('render_skeleton')
|
|
request = context.get('request')
|
|
extra_context = super().get_cell_extra_context(context)
|
|
if self.link_page:
|
|
extra_context['title'] = self.title or self.link_page.title
|
|
extra_context['description'] = self.link_page.description
|
|
else:
|
|
extra_context['title'] = self.title or self.url
|
|
url = self.get_url(context)
|
|
if force_absolute_url and not urllib.parse.urlparse(url).netloc:
|
|
# create full URL when used in a skeleton
|
|
url = request.build_absolute_uri(url)
|
|
extra_context['url'] = url
|
|
return extra_context
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import LinkCellForm
|
|
|
|
return LinkCellForm
|
|
|
|
def render_for_search(self):
|
|
return ''
|
|
|
|
def get_external_links_data(self):
|
|
if not self.url:
|
|
return []
|
|
link_data = self.get_cell_extra_context({})
|
|
if link_data.get('title') and link_data.get('url'):
|
|
return [link_data]
|
|
return []
|
|
|
|
def check_validity(self):
|
|
if self.link_page_id:
|
|
self.mark_as_valid()
|
|
return
|
|
if not self.url:
|
|
if self.anchor:
|
|
# link to anchor on same page, ok.
|
|
self.mark_as_valid()
|
|
return
|
|
self.mark_as_invalid('data_url_not_defined')
|
|
return
|
|
|
|
if self.url.count('{{') > 1:
|
|
self.mark_as_valid()
|
|
return
|
|
|
|
resp = None
|
|
try:
|
|
resp = requests.get(self.get_url(), timeout=settings.REQUESTS_TIMEOUT)
|
|
resp.raise_for_status()
|
|
except (requests.exceptions.RequestException):
|
|
pass
|
|
|
|
self.set_validity_from_url(resp, not_found_code='data_url_not_found', invalid_code='data_url_invalid')
|
|
|
|
|
|
@register_cell_class
|
|
class LinkListCell(CellBase):
|
|
title = models.CharField(_('Title'), max_length=150, blank=True)
|
|
limit = models.PositiveSmallIntegerField(_('Limit'), null=True, blank=True)
|
|
|
|
default_template_name = 'combo/link-list-cell.html'
|
|
manager_form_template = 'combo/manager/link-list-cell-form.html'
|
|
children_placeholder_prefix = '_linkslist:'
|
|
exclude_from_search = True
|
|
|
|
invalid_reason_codes = {
|
|
'data_link_invalid': _('Invalid link'),
|
|
}
|
|
|
|
class Meta:
|
|
verbose_name = _('List of links')
|
|
|
|
@property
|
|
def link_placeholder(self):
|
|
return self.children_placeholder_prefix + str(self.pk)
|
|
|
|
def get_items(self, prefetch_validity_info=False):
|
|
return CellBase.get_cells(
|
|
page=self.page,
|
|
placeholder=self.link_placeholder,
|
|
cell_filter=lambda x: hasattr(x, 'add_as_link_label'),
|
|
prefetch_validity_info=prefetch_validity_info,
|
|
)
|
|
|
|
def get_items_with_prefetch(self):
|
|
return self.get_items(prefetch_validity_info=True)
|
|
|
|
def get_additional_label(self):
|
|
title = self.title
|
|
if not title:
|
|
return None
|
|
return utils.ellipsize(title)
|
|
|
|
def get_cell_extra_context(self, context):
|
|
extra_context = super().get_cell_extra_context(context)
|
|
links = []
|
|
for cell in context.get('page_cells', []):
|
|
if not hasattr(cell, 'add_as_link_label'):
|
|
continue
|
|
if not cell.placeholder == self.link_placeholder:
|
|
continue
|
|
links.append(cell.get_cell_extra_context(context))
|
|
extra_context['links'] = links
|
|
extra_context['more_links'] = []
|
|
if self.limit:
|
|
extra_context['more_links'] = extra_context['links'][self.limit :]
|
|
extra_context['links'] = extra_context['links'][: self.limit]
|
|
extra_context['title'] = self.title
|
|
return extra_context
|
|
|
|
def get_link_cell_classes(self):
|
|
return CellBase.get_cell_classes(lambda x: hasattr(x, 'add_as_link_label'))
|
|
|
|
def get_default_form_class(self):
|
|
from .forms import LinkListCellForm
|
|
|
|
return LinkListCellForm
|
|
|
|
def export_subobjects(self):
|
|
links = json.loads(
|
|
serializers.serialize(
|
|
'json', self.get_items(), use_natural_foreign_keys=True, use_natural_primary_keys=True
|
|
)
|
|
)
|
|
for link in links:
|
|
del link['pk']
|
|
del link['fields']['placeholder']
|
|
del link['fields']['page']
|
|
return {'links': links}
|
|
|
|
def import_subobjects(self, cell_json):
|
|
for link in cell_json['links']:
|
|
link['fields']['placeholder'] = self.link_placeholder
|
|
link['fields']['page'] = self.page_id
|
|
links = serializers.deserialize('json', json.dumps(cell_json['links']), ignorenonexistent=True)
|
|
for link in links:
|
|
link.save()
|
|
|
|
def duplicate_m2m(self, new_cell):
|
|
# duplicate also link items
|
|
for link in self.get_items():
|
|
link.duplicate(page_target=new_cell.page, placeholder=new_cell.link_placeholder)
|
|
|
|
def is_visible(self, check_validity_info=True, **kwargs):
|
|
# cell is visible even if items are invalid
|
|
return super().is_visible(check_validity_info=False, **kwargs)
|
|
|
|
def check_validity(self):
|
|
for link in self.get_items(prefetch_validity_info=True):
|
|
validity_info = link.get_validity_info()
|
|
if validity_info is not None:
|
|
self.mark_as_invalid('data_link_invalid')
|
|
return
|
|
self.mark_as_valid()
|
|
|
|
|
|
@register_cell_class
|
|
class FeedCell(CellBase):
|
|
title = models.CharField(_('Title'), max_length=150, blank=True)
|
|
url = models.CharField(_('URL'), blank=True, max_length=200)
|
|
limit = models.PositiveSmallIntegerField(_('Maximum number of entries'), null=True, blank=True)
|
|
|
|
manager_form_factory_kwargs = {'field_classes': {'url': TemplatableURLField}}
|
|
default_template_name = 'combo/feed-cell.html'
|
|
|
|
invalid_reason_codes = {
|
|
'data_url_not_defined': _('No URL set'),
|
|
'data_url_not_found': _('URL seems to unexist'),
|
|
'data_url_invalid': _('URL seems to be invalid'),
|
|
}
|
|
|
|
class Meta:
|
|
verbose_name = _('RSS/Atom Feed')
|
|
|
|
def is_visible(self, **kwargs):
|
|
return bool(self.url) and super().is_visible(**kwargs)
|
|
|
|
def save(self, *args, **kwargs):
|
|
result = super().save(*args, **kwargs)
|
|
if 'update_fields' not in kwargs:
|
|
# always mark cell as valid when it is saved, it will be checked
|
|
# for real when it is rendered
|
|
self.mark_as_valid()
|
|
return result
|
|
|
|
def get_cell_extra_context(self, context):
|
|
extra_context = super().get_cell_extra_context(context)
|
|
|
|
if not self.url:
|
|
self.mark_as_invalid('data_url_not_defined')
|
|
return extra_context
|
|
|
|
if context.get('placeholder_search_mode'):
|
|
# don't call webservices when we're just looking for placeholders
|
|
return extra_context
|
|
|
|
cache_key = hashlib.md5(smart_bytes(self.url)).hexdigest()
|
|
feed_content = cache.get(cache_key)
|
|
if not feed_content:
|
|
feed_response = None
|
|
try:
|
|
feed_response = requests.get(utils.get_templated_url(self.url))
|
|
feed_response.raise_for_status()
|
|
except (requests.exceptions.RequestException):
|
|
pass
|
|
else:
|
|
if feed_response.status_code == 200:
|
|
feed_content = feed_response.content
|
|
cache.set(cache_key, feed_content, 600)
|
|
self.set_validity_from_url(
|
|
feed_response, not_found_code='data_url_not_found', invalid_code='data_url_invalid'
|
|
)
|
|
if feed_content:
|
|
extra_context['feed'] = feedparser.parse(feed_content)
|
|
if self.limit:
|
|
extra_context['feed']['entries'] = extra_context['feed']['entries'][: self.limit]
|
|
if not self.title:
|
|
try:
|
|
self.title = extra_context['feed']['feed'].title
|
|
except (KeyError, AttributeError):
|
|
pass
|
|
return extra_context
|
|
|
|
def render(self, context):
|
|
cache_key = hashlib.md5(smart_bytes(self.url)).hexdigest()
|
|
feed_content = cache.get(cache_key)
|
|
if not context.get('synchronous') and feed_content is None:
|
|
raise NothingInCacheException()
|
|
return super().render(context)
|
|
|
|
|
|
@register_cell_class
|
|
class ParentContentCell(CellBase):
|
|
class Meta:
|
|
verbose_name = _('Same as parent')
|
|
|
|
def get_parents_cells(self, hierarchy):
|
|
try:
|
|
pages = [Page.objects.get(slug='index', parent=None)]
|
|
except Page.DoesNotExist:
|
|
pages = []
|
|
pages.extend(hierarchy)
|
|
if not pages:
|
|
return []
|
|
if len(pages) > 1 and pages[0].id == pages[1].id:
|
|
# don't duplicate index cells for real children of the index page.
|
|
pages = pages[1:]
|
|
cells_by_page = {}
|
|
for page in pages:
|
|
cells_by_page[page.id] = []
|
|
# get cells from placeholder + cells in private placeholders that may
|
|
# be used by actual cells.
|
|
placeholder_filter = Q(placeholder=self.placeholder)
|
|
for klass in CellBase.get_cell_classes(lambda x: bool(x.children_placeholder_prefix)):
|
|
placeholder_filter |= Q(placeholder__startswith=klass.children_placeholder_prefix)
|
|
for cell in CellBase.get_cells(page__in=pages, extra_filter=placeholder_filter):
|
|
cells_by_page[cell.page_id].append(cell)
|
|
|
|
cells = cells_by_page[pages[-1].id]
|
|
for page in reversed(pages[:-1]):
|
|
for i, cell in enumerate(cells):
|
|
if isinstance(cell, ParentContentCell):
|
|
cells[i : i + 1] = cells_by_page[page.id]
|
|
break
|
|
else:
|
|
# no more ParentContentCell, stop folloing the parent page chain
|
|
break
|
|
return cells
|
|
|
|
def render(self, context):
|
|
return ''
|
|
|
|
|
|
class JsonCellBase(CellBase):
|
|
url = None
|
|
cache_duration = 60
|
|
template_string = None
|
|
varnames = None
|
|
force_async = False
|
|
log_errors = True
|
|
timeout = None
|
|
make_global = False
|
|
actions = {}
|
|
additional_data = None
|
|
# [
|
|
# {'key': ...,
|
|
# 'url': ...,
|
|
# 'cache_duration': ... (optional)
|
|
# },
|
|
# ...
|
|
# ]
|
|
first_data_key = 'json'
|
|
|
|
_json_content = None
|
|
|
|
invalid_reason_codes = {
|
|
'data_url_not_found': _('URL seems to unexist'),
|
|
'data_url_invalid': _('URL seems to be invalid'),
|
|
}
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def save(self, *args, **kwargs):
|
|
result = super().save(*args, **kwargs)
|
|
if 'update_fields' not in kwargs:
|
|
# always mark cell as valid when it is saved, it will be checked
|
|
# for real when it is rendered
|
|
self.mark_as_valid()
|
|
return result
|
|
|
|
def is_visible(self, **kwargs):
|
|
return bool(self.url) and super().is_visible(**kwargs)
|
|
|
|
def is_user_dependant(self, context=None):
|
|
urls = [self.url] + [x['url'] for x in self.additional_data or []]
|
|
for url in urls:
|
|
if url and ('user_nameid' in url or 'user_email' in url):
|
|
return True
|
|
return False
|
|
|
|
def get_cell_parameters_context(self):
|
|
return {}
|
|
|
|
def get_cell_extra_context(self, context, invalidate_cache=False):
|
|
extra_context = super().get_cell_extra_context(context)
|
|
if context.get('placeholder_search_mode'):
|
|
# don't call webservices when we're just looking for placeholders
|
|
return extra_context
|
|
if self.varnames and context.get('request'):
|
|
for varname in self.varnames:
|
|
if varname in context['request'].GET:
|
|
context[varname] = context['request'].GET[varname]
|
|
self._json_content = None
|
|
|
|
extra_context.update(self.get_cell_parameters_context())
|
|
context.update(self.get_cell_parameters_context())
|
|
context['concerned_user'] = self.get_concerned_user(context)
|
|
|
|
data_urls = [
|
|
{
|
|
'key': self.first_data_key,
|
|
'url': self.url,
|
|
'cache_duration': self.cache_duration,
|
|
'log_errors': self.log_errors,
|
|
'timeout': self.timeout,
|
|
}
|
|
]
|
|
data_urls.extend(self.additional_data or [])
|
|
|
|
for data_url_dict in data_urls:
|
|
extra_context[data_url_dict['key']] = None
|
|
|
|
for data_url_dict in data_urls:
|
|
data_key = data_url_dict['key']
|
|
log_errors = data_url_dict.get('log_errors', self.log_errors)
|
|
try:
|
|
url = utils.get_templated_url(data_url_dict['url'], context)
|
|
except utils.TemplateError as e:
|
|
logger = logging.getLogger(__name__)
|
|
if log_errors and e.msg.startswith('syntax error'):
|
|
logger.error('error in templated URL (%s): %s', data_url_dict['url'], e)
|
|
else:
|
|
logger.debug('error in templated URL (%s): %s', data_url_dict['url'], e)
|
|
continue
|
|
extra_context[data_key + '_url'] = url
|
|
if not url:
|
|
continue
|
|
try:
|
|
json_response = utils.requests.get(
|
|
url,
|
|
headers={'Accept': 'application/json'},
|
|
remote_service='auto',
|
|
cache_duration=data_url_dict.get('cache_duration', self.cache_duration),
|
|
without_user=True,
|
|
raise_if_not_cached=not (context.get('synchronous')),
|
|
invalidate_cache=invalidate_cache,
|
|
log_errors=log_errors,
|
|
timeout=data_url_dict.get('timeout', self.timeout),
|
|
django_request=context.get('request'),
|
|
)
|
|
except requests.RequestException as e:
|
|
extra_context[data_key + '_status'] = -1
|
|
extra_context[data_key + '_error'] = force_text(e)
|
|
extra_context[data_key + '_exception'] = e
|
|
logger = logging.getLogger(__name__)
|
|
if log_errors:
|
|
logger.warning('error on request %r: %s', url, force_text(e))
|
|
else:
|
|
logger.debug('error on request %r: %s', url, force_text(e))
|
|
continue
|
|
extra_context[data_key + '_status'] = json_response.status_code
|
|
if json_response.status_code // 100 == 2:
|
|
if json_response.status_code != 204: # 204 = No Content
|
|
try:
|
|
extra_context[data_key] = json_response.json()
|
|
except ValueError:
|
|
extra_context[data_key + '_error'] = 'invalid_json'
|
|
logger = logging.getLogger(__name__)
|
|
if log_errors:
|
|
logger.error('invalid json content (%s)', url)
|
|
else:
|
|
logger.debug('invalid json content (%s)', url)
|
|
continue
|
|
elif json_response.headers.get('content-type') == 'application/json':
|
|
try:
|
|
extra_context[data_key + '_error'] = json_response.json()
|
|
except ValueError:
|
|
extra_context[data_key + '_error'] = 'invalid_json'
|
|
|
|
# update context with data key so it can be used in future
|
|
# templated URLs
|
|
context[data_key] = extra_context[data_key]
|
|
|
|
if not self._meta.abstract:
|
|
returns = []
|
|
for data_url in data_urls:
|
|
if data_url['url'].count('{{') > 1:
|
|
# ignore returns of url with more than one variable
|
|
continue
|
|
returns.append(extra_context.get(data_url['key'] + '_status'))
|
|
returns = {s for s in returns if s is not None}
|
|
if returns and 200 not in returns: # not a single valid answer
|
|
if 404 in returns:
|
|
self.mark_as_invalid('data_url_not_found')
|
|
elif any([400 <= r < 500 for r in returns]):
|
|
# at least 4xx errors, report the cell as invalid
|
|
self.mark_as_invalid('data_url_invalid')
|
|
else:
|
|
# 2xx or 3xx: cell is valid
|
|
# 5xx error: can not retrieve data, don't report cell as invalid
|
|
self.mark_as_valid()
|
|
else:
|
|
self.mark_as_valid()
|
|
|
|
# keep cache of first response as it may be used to find the
|
|
# appropriate template.
|
|
self._json_content = extra_context[self.first_data_key]
|
|
|
|
return extra_context
|
|
|
|
def modify_global_context(self, context, request):
|
|
if self.make_global:
|
|
synchronous = context.get('synchronous')
|
|
context['synchronous'] = True
|
|
try:
|
|
preloaded_data = self.get_cell_extra_context(context)
|
|
context[self.make_global] = preloaded_data.get(self.first_data_key)
|
|
finally:
|
|
context['synchronous'] = synchronous
|
|
|
|
@property
|
|
def default_template_name(self):
|
|
json_content = self._json_content
|
|
if json_content is None:
|
|
return 'combo/json-error-cell.html'
|
|
if isinstance(json_content, dict) and json_content.get('data'):
|
|
if isinstance(json_content['data'], list):
|
|
first_element = json_content['data'][0]
|
|
if isinstance(first_element, dict):
|
|
if 'url' in first_element and 'text' in first_element:
|
|
return 'combo/json-list-cell.html'
|
|
return 'combo/json-cell.html'
|
|
|
|
def post(self, request):
|
|
if not 'action' in request.POST:
|
|
raise PermissionDenied()
|
|
action = request.POST['action']
|
|
if not action in self.actions:
|
|
raise PermissionDenied()
|
|
|
|
error_message = self.actions[action].get('error-message')
|
|
timeout = self.actions[action].get('timeout', self.timeout)
|
|
method = self.actions[action].get('method', 'POST')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
content = {}
|
|
for key, value in request.POST.items():
|
|
if key == 'action':
|
|
continue
|
|
if key.endswith('[]'): # array
|
|
content[key[:-2]] = request.POST.getlist(key)
|
|
else:
|
|
content[key] = value
|
|
|
|
context = copy.copy(content)
|
|
context.update(self.get_cell_parameters_context())
|
|
context['request'] = request
|
|
context['synchronous'] = True
|
|
|
|
try:
|
|
url = utils.get_templated_url(self.actions[action]['url'], context)
|
|
except utils.TemplateError as e:
|
|
logger.warning('error in templated URL (%s): %s', self.actions[action]['url'], e)
|
|
raise PostException(error_message)
|
|
|
|
json_response = utils.requests.request(
|
|
method,
|
|
url,
|
|
headers={'Accept': 'application/json'},
|
|
remote_service='auto',
|
|
json=content,
|
|
without_user=True,
|
|
django_request=request,
|
|
timeout=timeout,
|
|
)
|
|
|
|
if json_response.status_code // 100 != 2: # 2xx
|
|
logger.error('error POSTing data to URL (%s)', url)
|
|
raise PostException(error_message)
|
|
|
|
if self.cache_duration:
|
|
self.get_cell_extra_context(context, invalidate_cache=True)
|
|
|
|
if self.actions[action].get('response', 'cell') == 'raw':
|
|
# response: raw in the config will get the response directly sent
|
|
# to the client.
|
|
return json_response
|
|
|
|
# default behaviour, the cell will render itself.
|
|
return None
|
|
|
|
def render(self, context):
|
|
if self.force_async and not context.get('synchronous'):
|
|
raise NothingInCacheException()
|
|
if self.template_string:
|
|
tmpl = engines['django'].from_string(self.template_string)
|
|
context.update(self.get_cell_extra_context(context))
|
|
return tmpl.render(context, context.get('request'))
|
|
return super().render(context)
|
|
|
|
|
|
@register_cell_class
|
|
class JsonCell(JsonCellBase):
|
|
title = models.CharField(_('Title'), max_length=150, blank=True)
|
|
url = models.CharField(_('URL'), blank=True, max_length=500)
|
|
template_string = models.TextField(
|
|
_('Display Template'), blank=True, null=True, validators=[django_template_validator]
|
|
)
|
|
cache_duration = models.PositiveIntegerField(_('Cache duration'), default=60, help_text=_('In seconds.'))
|
|
force_async = models.BooleanField(_('Force asynchronous mode'), default=JsonCellBase.force_async)
|
|
varnames_str = models.CharField(
|
|
_('Variable names'),
|
|
max_length=200,
|
|
blank=True,
|
|
help_text=_('Comma separated list of query-string variables ' 'to be copied in template context'),
|
|
)
|
|
timeout = models.PositiveIntegerField(
|
|
_('Request timeout'), default=0, help_text=_('In seconds. Use 0 for default system timeout')
|
|
)
|
|
|
|
manager_form_factory_kwargs = {'field_classes': {'url': TemplatableURLField}}
|
|
|
|
class Meta:
|
|
verbose_name = _('JSON Prototype')
|
|
|
|
@property
|
|
def varnames(self):
|
|
return [vn.strip() for vn in self.varnames_str.split(',') if vn.strip()]
|
|
|
|
|
|
class ConfigJsonCellManager(models.Manager):
|
|
def get_queryset(self):
|
|
queryset = super().get_queryset()
|
|
return queryset.filter(key__in=settings.JSON_CELL_TYPES.keys())
|
|
|
|
|
|
@register_cell_class
|
|
class ConfigJsonCell(JsonCellBase):
|
|
objects = ConfigJsonCellManager()
|
|
|
|
key = models.CharField(max_length=50)
|
|
parameters = JSONField(blank=True, default=dict)
|
|
|
|
def __str__(self):
|
|
return force_text(_('%s (JSON Cell)') % self.get_label())
|
|
|
|
@classmethod
|
|
def get_cell_types(cls):
|
|
l = []
|
|
for key, definition in settings.JSON_CELL_TYPES.items():
|
|
l.append(
|
|
{
|
|
'name': definition['name'],
|
|
'variant': key,
|
|
'group': _('Extra'),
|
|
'cell_type_str': cls.get_cell_type_str(),
|
|
}
|
|
)
|
|
l.sort(key=lambda x: x.get('name'))
|
|
return l
|
|
|
|
def get_label(self):
|
|
return settings.JSON_CELL_TYPES[self.key]['name']
|
|
|
|
def set_variant(self, variant):
|
|
self.key = variant
|
|
|
|
@property
|
|
def css_class_names(self):
|
|
return super().css_class_names + ' ' + self.key
|
|
|
|
@property
|
|
def ajax_refresh(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('auto_refresh', None)
|
|
|
|
@property
|
|
def make_global(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('make_global', False)
|
|
|
|
@property
|
|
def repeat(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('repeat')
|
|
|
|
@property
|
|
def url(self):
|
|
return settings.JSON_CELL_TYPES[self.key]['url']
|
|
|
|
@property
|
|
def cache_duration(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('cache_duration', JsonCellBase.cache_duration)
|
|
|
|
@property
|
|
def varnames(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('varnames')
|
|
|
|
@property
|
|
def force_async(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('force_async', JsonCellBase.force_async)
|
|
|
|
@property
|
|
def log_errors(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('log_errors', JsonCellBase.log_errors)
|
|
|
|
@property
|
|
def timeout(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('timeout', JsonCellBase.timeout)
|
|
|
|
@property
|
|
def actions(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('actions', JsonCellBase.actions)
|
|
|
|
@property
|
|
def additional_data(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('additional-data')
|
|
|
|
@property
|
|
def default_template_name(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('template-name', 'combo/json/%s.html' % self.key)
|
|
|
|
@property
|
|
def loading_message(self):
|
|
return settings.JSON_CELL_TYPES[self.key].get('loading-message', CellBase.loading_message)
|
|
|
|
def get_default_form_class(self):
|
|
formdef = settings.JSON_CELL_TYPES[self.key].get('form')
|
|
if not formdef:
|
|
return None
|
|
from .forms import ConfigJsonForm
|
|
|
|
# create a subclass of ConfigJsonForm with 'formdef' (the list of
|
|
# fields) as an attribute.
|
|
config_form_class = type(str('%sConfigClass' % self.key), (ConfigJsonForm,), {'formdef': formdef})
|
|
return config_form_class
|
|
|
|
def get_cell_parameters_context(self):
|
|
context = copy.copy(self.parameters or {})
|
|
context['parameters'] = self.parameters
|
|
return context
|
|
|
|
def render(self, context):
|
|
settings_varnames = [f.get('varname') for f in settings.JSON_CELL_TYPES[self.key].get('form') or {}]
|
|
if 'template_string' in self.parameters and 'template_string' in settings_varnames:
|
|
self.template_string = self.parameters['template_string']
|
|
return super().render(context)
|
|
|
|
|
|
@receiver(pre_save, sender=Page)
|
|
def create_redirects(sender, instance, raw, **kwargs):
|
|
if raw or not instance.id or instance.snapshot_id:
|
|
return
|
|
if kwargs.get('update_fields') and kwargs['update_fields'] == frozenset({'related_cells'}):
|
|
return
|
|
try:
|
|
old_page = Page.objects.get(id=instance.id)
|
|
except Page.DoesNotExist:
|
|
return
|
|
if old_page.slug == instance.slug and old_page.parent_id == instance.parent_id:
|
|
return
|
|
affected_pages = level_pages = [old_page]
|
|
while True:
|
|
level_pages = Page.objects.filter(parent_id__in=[x.id for x in level_pages]).select_related('parent')
|
|
if len(level_pages) == 0:
|
|
break
|
|
affected_pages.extend(level_pages)
|
|
for page in affected_pages:
|
|
Redirect(page=page, old_url=page.get_online_url()).save()
|
|
|
|
|
|
@receiver(post_save)
|
|
@receiver(post_delete)
|
|
def cell_maintain_page_cell_cache(sender, instance=None, **kwargs):
|
|
if not issubclass(sender, CellBase):
|
|
return
|
|
if not instance.page_id:
|
|
return
|
|
page = instance.page
|
|
if kwargs.get('created') is False:
|
|
# don't build the cache on update, but update page's last_update_timestamp
|
|
page.save(update_fields=['last_update_timestamp'])
|
|
return
|
|
page.build_cell_cache()
|
|
|
|
|
|
class SiteSettings(models.Model):
|
|
initial_login_page_path = models.CharField(
|
|
_('Initial login page path'),
|
|
help_text=_('Page to redirect to the first time user logs in.'),
|
|
max_length=100,
|
|
)
|
|
welcome_page_path = models.CharField(
|
|
_('Welcome page path'),
|
|
help_text=_('Page to redirect to on the first visit, to suggest user to log in.'),
|
|
max_length=100,
|
|
)
|