# combo - content management system # Copyright (C) 2014 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import collections import copy import feedparser import hashlib import json import logging import os import re import requests import subprocess from django.apps import apps from django.conf import settings from django.contrib.auth.models import Group from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist, ValidationError, PermissionDenied from django.core import serializers from django.db import models, transaction from django.db.models.base import ModelBase from django.db.models.signals import pre_save, post_save, post_delete from django.db.models import Max from django.dispatch import receiver from django.forms import models as model_forms from django import forms from django import template from django.utils import six from django.utils.encoding import python_2_unicode_compatible, force_text, smart_bytes from django.utils.html import strip_tags from django.utils.safestring import mark_safe from django.utils.six.moves.urllib import parse as urlparse from django.utils.text import slugify from django.utils.translation import ugettext_lazy as _ from django.forms.widgets import MediaDefiningClass from django.template import Context, engines, TemplateDoesNotExist, TemplateSyntaxError from django.test.client import RequestFactory from .fields import RichTextField, TemplatableURLField from jsonfield import JSONField from .library import register_cell_class, get_cell_classes, get_cell_class from combo import utils from combo.utils import NothingInCacheException class PostException(Exception): pass def element_is_visible(element, user=None): if element.public: if getattr(element, 'restricted_to_unlogged', None) is True: return (user is None or user.is_anonymous) return True if user is None or user.is_anonymous: return False if user.is_superuser: return True page_groups = element.groups.all() if not page_groups: groups_ok = True else: groups_ok = len(set(page_groups).intersection(user.groups.all())) > 0 if getattr(element, 'restricted_to_unlogged', None) is True: return not(groups_ok) return groups_ok class Placeholder(object): def __init__(self, key, name=None, acquired=False, render=True, cell=None, force_synchronous=False): self.key = key self.name = name self.acquired = acquired self.render = render self.cell = cell self.force_synchronous = force_synchronous def get_name(self): if self.cell: return '%s / %s' % (self.cell.get_label(), self.name) return self.name class PageManager(models.Manager): snapshots = False def __init__(self, *args, **kwargs): self.snapshots = kwargs.pop('snapshots', False) super(PageManager, self).__init__(*args, **kwargs) def get_by_natural_key(self, path): parts = [x for x in path.strip('/').split('/') if x] or ['index'] return self.get(slug=parts[-1]) def get_queryset(self): queryset = super(PageManager, self).get_queryset() if self.snapshots: return queryset.filter(snapshot__isnull=False) else: return queryset.filter(snapshot__isnull=True) @python_2_unicode_compatible class Page(models.Model): objects = PageManager() snapshots = PageManager(snapshots=True) title = models.CharField(_('Title'), max_length=150) slug = models.SlugField(_('Slug')) sub_slug = models.CharField(_('Sub Slug'), max_length=150, blank=True, help_text=_('Regular expression to create variadic subpages. ' 'Matching named groups are exposed as context variables.' )) description = models.TextField(_('Description'), blank=True) template_name = models.CharField(_('Template'), max_length=50) parent = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True) order = models.PositiveIntegerField() exclude_from_navigation = models.BooleanField(_('Exclude from navigation'), default=False) redirect_url = models.CharField(_('Redirect URL'), max_length=200, blank=True) public = models.BooleanField(_('Public'), default=True) groups = models.ManyToManyField(Group, verbose_name=_('Groups'), blank=True) last_update_timestamp = models.DateTimeField(auto_now=True) picture = models.ImageField(_('Picture'), upload_to='page-pictures/', null=True) # mark temporarily restored snapshots, it is required to save objects # (pages and cells) for real for viewing past snapshots as many cells are # asynchronously loaded and must refer to a real Page object. snapshot = models.ForeignKey('PageSnapshot', on_delete=models.CASCADE, null=True, related_name='temporary_page') # keep a cached list of cell types that are used in the pages. related_cells = JSONField(blank=True) _level = None class Meta: ordering = ['order'] def __str__(self): return self.title def natural_key(self): return (self.get_online_url().strip('/'), ) def picture_extension(self): if not self.picture: return None return os.path.splitext(self.picture.name)[-1] def save(self, *args, **kwargs): if not self.id: self.related_cells = {'cell_types': []} if not self.order: max_order = Page.objects.all().aggregate(Max('order')).get('order__max') or 0 self.order = max_order + 1 if not self.slug: if not Page.objects.exists(): slug = 'index' else: base_slug = slugify(self.title)[:40] slug = base_slug.strip('-') i = 1 while Page.objects.filter(slug=slug, parent_id=self.parent_id).exists(): i += 1 slug = '%s-%s' % (base_slug, i) self.slug = slug if not self.template_name: self.template_name = settings.COMBO_DEFAULT_PUBLIC_TEMPLATE return super(Page, self).save(*args, **kwargs) def get_parents_and_self(self): pages = [self] page = self while page.parent_id: page = page._parent if hasattr(page, '_parent') else page.parent pages.append(page) return list(reversed(pages)) def get_online_url(self): parts = [x.slug for x in self.get_parents_and_self()] if parts[0] == 'index': parts = parts[1:] if not parts: return '/' return '/' + '/'.join(parts) + '/' def get_page_of_level(self, level): '''Return page of given level in the page hierarchy.''' parts = [self] page = self while page.parent_id: page = page._parent if hasattr(page, '_parent') else page.parent parts.append(page) parts.reverse() try: return parts[level] except IndexError: return None def get_siblings(self): if hasattr(self, '_parent'): if self._parent: return self._parent._children return Page.objects.filter(parent_id=self.parent_id) def get_children(self): if hasattr(self, '_children'): return self._children return Page.objects.filter(parent_id=self.id) def has_children(self): if hasattr(self, '_children'): return bool(self._children) return Page.objects.filter(parent_id=self.id).exists() def get_template_display_name(self): try: return settings.COMBO_PUBLIC_TEMPLATES[self.template_name]['name'] except KeyError: return _('Unknown (%s)') % self.template_name def missing_template(self): template_name = settings.COMBO_PUBLIC_TEMPLATES.get(self.template_name, {}).get('template') if not template_name: return True try: template.loader.select_template([template_name]) except TemplateDoesNotExist: return True return False def get_placeholders(self, request, traverse_cells=False, template_name=None): placeholders = [] page_template = settings.COMBO_PUBLIC_TEMPLATES.get(template_name or self.template_name, {}) if page_template.get('placeholders'): # manual declaration for key, options in page_template['placeholders'].items(): placeholders.append(Placeholder(key=key, **options)) return placeholders template_names = [] if page_template.get('template'): template_names.append(page_template['template']) template_names.append('combo/page_template.html') tmpl = template.loader.select_template(template_names) request = RequestFactory(SERVER_NAME=request.get_host()).get(self.get_online_url()) request.user = None context = { 'page': self, 'request': request, 'synchronous': True, 'placeholder_search_mode': True, 'placeholders': placeholders, 'traverse_cells': traverse_cells, } tmpl.render(context, request) return placeholders def get_next_page(self, user=None, check_visibility=True): pages = Page.get_as_reordered_flat_hierarchy(Page.objects.all()) this_page = [x for x in pages if x.id == self.id][0] pages = pages[pages.index(this_page)+1:] for page in pages: if not check_visibility or page.is_visible(user): return page return None def get_previous_page(self, user=None, check_visibility=True): pages = Page.get_as_reordered_flat_hierarchy(Page.objects.all()) pages.reverse() this_page = [x for x in pages if x.id == self.id][0] pages = pages[pages.index(this_page)+1:] for page in pages: if not check_visibility or page.is_visible(user): return page return None @classmethod def get_as_reordered_flat_hierarchy(cls, object_list): reordered = [] parenting = collections.defaultdict(list) for page in object_list: parenting[page.parent_id].append(page) def fill_list(object_sublist, level=0, parent=None): for page in object_sublist: if page.parent == parent: page.level = level reordered.append(page) if page.id in parenting: fill_list(object_sublist, level=level+1, parent=page) fill_list(object_list) return reordered @staticmethod @utils.cache_during_request def get_with_hierarchy_attributes(): pages = Page.objects.all() pages_by_id = {} for page in pages: pages_by_id[page.id] = page page._parent = None page._children = [] for page in pages: page._parent = pages_by_id[page.parent_id] if page.parent_id else None if page._parent: page._parent._children.append(page) for page in pages: page._children.sort(key=lambda x: x.order) return pages_by_id def visibility(self): if self.public: return _('Public') groups = self.groups.all() groupnames = ', '.join([x.name for x in groups]) if groups else _('logged users') return _('Private (%s)') % groupnames def is_visible(self, user=None): return element_is_visible(self, user=user) def get_cells(self): return CellBase.get_cells(page=self) def build_cell_cache(self): cells = CellBase.get_cells(page=self, skip_cell_cache=True) cell_types = set() for cell in cells: cell_types.add(cell.get_cell_type_str()) if cell_types != set(self.related_cells.get('cell_types', [])): self.related_cells['cell_types'] = list(cell_types) self.save() def get_serialized_page(self): cells = [x for x in self.get_cells() if x.placeholder and not x.placeholder.startswith('_')] serialized_page = json.loads(serializers.serialize('json', [self], use_natural_foreign_keys=True, use_natural_primary_keys=True))[0] del serialized_page['model'] if 'snapshot' in serialized_page: del serialized_page['snapshot'] if 'related_cells' in serialized_page['fields']: del serialized_page['fields']['related_cells'] serialized_page['cells'] = json.loads(serializers.serialize('json', cells, use_natural_foreign_keys=True, use_natural_primary_keys=True)) for index, cell in enumerate(cells): serialized_page['cells'][index].update(cell.export_subobjects()) serialized_page['fields']['groups'] = [x[0] for x in serialized_page['fields']['groups']] for cell in serialized_page['cells']: del cell['pk'] del cell['fields']['page'] cell['fields']['groups'] = [x[0] for x in cell['fields']['groups']] for key in list(cell['fields'].keys()): if key.startswith('cached_'): del cell['fields'][key] return serialized_page @classmethod def load_serialized_page(cls, json_page, snapshot=None): json_page['model'] = 'data.page' json_page['fields']['groups'] = [[x] for x in json_page['fields']['groups'] if isinstance(x, six.string_types)] page, created = Page.objects.get_or_create(slug=json_page['fields']['slug'], snapshot=snapshot) json_page['pk'] = page.id page = [x for x in serializers.deserialize('json', json.dumps([json_page]))][0] page.object.snapshot = snapshot page.save() for cell in json_page.get('cells'): cell['fields']['groups'] = [[x] for x in cell['fields']['groups'] if isinstance(x, six.string_types)] if snapshot: cell['fields']['page'] = page.object.id else: cell['fields']['page'] = page.object.natural_key() # if there were cells, remove them for cell in CellBase.get_cells(page_id=page.object.id): cell.delete() return page.object # get page out of deserialization object @classmethod def load_serialized_cells(cls, cells): # load new cells for index, cell in enumerate(serializers.deserialize('json', json.dumps(cells))): cell.save() # will populate cached_* attributes cell.object.save() cell.object.import_subobjects(cells[index]) @classmethod def load_serialized_pages(cls, json_site): cells = [] for json_page in json_site: cls.load_serialized_page(json_page) cells.extend(json_page.get('cells')) cls.load_serialized_cells(cells) # 2nd pass to set parents for json_page in json_site: if json_page.get('parent_slug'): page = Page.objects.get(slug=json_page['fields']['slug']) page.parent = Page.objects.get(slug=json_page.get('parent_slug')) page.save() @classmethod def export_all_for_json(cls): ordered_pages = Page.get_as_reordered_flat_hierarchy(cls.objects.all()) return [x.get_serialized_page() for x in ordered_pages] def get_redirect_url(self, context=None): return utils.get_templated_url(self.redirect_url, context=context) def get_last_update_time(self): cells = CellBase.get_cells(page_id=self.id) return max([self.last_update_timestamp] + [x.last_update_timestamp for x in cells]) def duplicate(self): # clone current page new_page = copy.deepcopy(self) new_page.pk = None # set title new_page.title = _('Copy of %s') % self.title # reset slug new_page.slug = None # reset snapshot new_page.snapshot = None # set order new_page.order = self.order + 1 # store new page new_page.save() # set groups new_page.groups.set(self.groups.all()) for cell in self.get_cells(): if cell.placeholder and cell.placeholder.startswith('_'): continue cell.duplicate(page_target=new_page) return new_page class PageSnapshot(models.Model): page = models.ForeignKey(Page, on_delete=models.SET_NULL, null=True) timestamp = models.DateTimeField(auto_now_add=True) user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True) comment = models.TextField(blank=True, null=True) serialization = JSONField(blank=True) class Meta: ordering = ('-timestamp',) @classmethod def take(cls, page, request=None, comment=None, deletion=False): snapshot = cls(page=page, comment=comment) if request and not request.user.is_anonymous: snapshot.user = request.user if not deletion: snapshot.serialization = page.get_serialized_page() else: snapshot.serialization = {} snapshot.comment = comment or _('deletion') snapshot.save() def get_page(self): try: # try reusing existing page return Page.snapshots.get(snapshot=self) except Page.DoesNotExist: page = Page.load_serialized_page(self.serialization, snapshot=self) page.load_serialized_cells(self.serialization['cells']) return page def restore(self): with transaction.atomic(): page = Page.load_serialized_page(self.serialization) page.load_serialized_cells(self.serialization['cells']) return page class Redirect(models.Model): old_url = models.CharField(max_length=512) page = models.ForeignKey(Page, on_delete=models.CASCADE) creation_timestamp = models.DateTimeField(auto_now_add=True) class Meta: ordering = ('creation_timestamp',) class CellMeta(MediaDefiningClass, ModelBase): pass @python_2_unicode_compatible class CellBase(six.with_metaclass(CellMeta, models.Model)): page = models.ForeignKey(Page, on_delete=models.CASCADE) placeholder = models.CharField(max_length=20) order = models.PositiveIntegerField() slug = models.SlugField(_('Slug'), blank=True) extra_css_class = models.CharField(_('Extra classes for CSS styling'), max_length=100, blank=True) public = models.BooleanField(_('Public'), default=True) # restricted_to_unlogged is actually an invert switch, it is used for mark # a cell as only visibile to unlogged users but also, when groups are set, # to mark the cell as visible to all but those groups. restricted_to_unlogged = models.BooleanField( _('Restrict to unlogged users'), default=False) groups = models.ManyToManyField(Group, verbose_name=_('Groups'), blank=True) last_update_timestamp = models.DateTimeField(auto_now=True) default_form_class = None manager_form_factory_kwargs = {} manager_form_template = 'combo/cell_form.html' visible = True user_dependant = False template_name = None # get_badge(self, context); set to None so cell types can be skipped easily get_badge = None # get_asset_slots(self); set to None so cell types can be skipped easily get_asset_slots = None # message displayed when the cell is loaded asynchronously loading_message = _('Loading...') # modify_global_context(self, context, request=None) # Apply changes to the template context that must visible to all cells in the page modify_global_context = None # if set, automatically refresh cell every n seconds ajax_refresh = None class Meta: abstract = True def __str__(self): label = self.get_verbose_name() additional_label = self.get_additional_label() if label and additional_label: return u'%s (%s)' % (label, re.sub(r'\r?\n', u' ', force_text(additional_label))) else: return force_text(label) @classmethod def get_verbose_name(cls): return cls._meta.verbose_name def get_additional_label(self): return '' @property def class_name(self): return self.__class__.__name__.lower() @property def css_class_names(self): return self.class_name + ' ' + self.extra_css_class @classmethod def get_cell_classes(cls, class_filter=lambda x: True): for klass in get_cell_classes(): if not class_filter(klass): continue if not klass.is_enabled(): continue if klass.visible is False: continue yield klass @classmethod def get_all_cell_types(cls, *args, **kwargs): cell_types = [] for klass in cls.get_cell_classes(*args, **kwargs): cell_types.extend(klass.get_cell_types()) return cell_types @classmethod def get_cells(cls, cell_filter=None, skip_cell_cache=False, **kwargs): """Returns the list of cells of various classes matching **kwargs""" cells = [] pages = [] if 'page' in kwargs: pages = [kwargs['page']] elif 'page__in' in kwargs: pages = kwargs['page__in'] else: # if there are not explicit page, limit to non-snapshot pages kwargs['page__snapshot__isnull'] = True cell_classes = get_cell_classes() if pages and not skip_cell_cache: # if there's a request for some specific pages, limit cell types # to those that are actually in use in those pages. cell_types = set() for page in pages: if page.related_cells and 'cell_types' in page.related_cells: cell_types |= set(page.related_cells['cell_types']) else: break else: cell_classes = [get_cell_class(x) for x in cell_types] for klass in cell_classes: if klass is None: continue if cell_filter and not cell_filter(klass): continue cells.extend(klass.objects.filter(**kwargs)) cells.sort(key=lambda x: x.order) return cells def get_reference(self): "Returns a string that can serve as a unique reference to a cell""" return str('%s-%s' % (self.get_cell_type_str(), self.id)) @classmethod def get_cell(cls, reference, **kwargs): """Returns the cell matching reference, and eventual **kwargs""" content_id, cell_id = reference.split('-') klass = get_cell_class(content_id) if klass is None: raise ObjectDoesNotExist() return klass.objects.get(id=cell_id, **kwargs) @classmethod def get_cell_type_str(cls): return '%s_%s' % (cls._meta.app_label, cls._meta.model_name) @classmethod def get_cell_type_group(cls): return apps.get_app_config(cls._meta.app_label).verbose_name @classmethod def get_cell_types(cls): return [{ 'name': cls.get_verbose_name(), 'cell_type_str': cls.get_cell_type_str(), 'group': cls.get_cell_type_group(), 'variant': 'default', 'order': 0, }] @classmethod def is_enabled(cls): """Defines if the cell type is enabled for the given site; this is used to selectively enable cells from extension modules.""" return True def set_variant(self, variant): pass def get_label(self): return self.get_verbose_name() def get_default_form_class(self): if self.default_form_class: return self.default_form_class fields = [x.name for x in self._meta.local_concrete_fields if x.name not in ('id', 'page', 'placeholder', 'order', 'public', 'groups', 'slug', 'extra_css_class', 'last_update_timestamp', 'restricted_to_unlogged')] if not fields: return None return model_forms.modelform_factory(self.__class__, fields=fields, **self.manager_form_factory_kwargs) def get_options_form_class(self): return model_forms.modelform_factory(self.__class__, fields=['slug', 'extra_css_class']) def get_extra_manager_context(self): return {} def is_visible(self, user=None): return element_is_visible(self, user=user) def is_relevant(self, context): '''Return whether it's relevant to render this cell in the page context.''' return True def is_user_dependant(self, context): '''Return whether the cell content varies from user to user.''' return self.user_dependant def get_concerned_user(self, context): '''Return user from UserSearch cell, or connected user.''' return context.get('selected_user') or getattr(context.get('request'), 'user', None) def get_cell_extra_context(self, context): return {'cell': self} def render(self, context): context.update(self.get_cell_extra_context(context)) template_names = ['combo/' + self._meta.model_name + '.html'] base_template_name = self._meta.model_name + '.html' if self.template_name: base_template_name = os.path.basename(self.template_name) template_names.append(self.template_name) if self.slug: template_names.append('combo/cells/%s/%s' % (self.slug, base_template_name)) template_names.reverse() tmpl = template.loader.select_template(template_names) return tmpl.render(context, context.get('request')) def render_for_search(self): if not self.is_enabled(): return '' if self.user_dependant: return '' if not self.page.is_visible(user=None): return '' if not self.is_visible(user=None): return '' request = RequestFactory().get(self.page.get_online_url()) request.user = None # compat context = { 'page': self.page, 'render_skeleton': False, 'request': request, 'site_base': request.build_absolute_uri('/')[:-1], 'synchronous': True, 'user': None, # compat 'absolute_uri': request.build_absolute_uri, } if not self.is_relevant(context): return '' from django.utils.six.moves.html_parser import HTMLParser return HTMLParser().unescape(strip_tags(self.render(context))) def get_external_links_data(self): return [] def export_subobjects(self): return {} def import_subobjects(self, cell_json): pass def duplicate(self, page_target=None, placeholder=None): # clone current cell new_cell = copy.deepcopy(self) new_cell.pk = None # set page new_cell.page = page_target or self.page # set placeholder new_cell.placeholder = placeholder or new_cell.placeholder # store new cell new_cell.save() # set groups new_cell.groups.set(self.groups.all()) if hasattr(self, 'duplicate_m2m'): self.duplicate_m2m(new_cell) return new_cell @register_cell_class class TextCell(CellBase): text = RichTextField(_('Text'), blank=True, null=True) template_name = 'combo/text-cell.html' class Meta: verbose_name = _('Text') def is_relevant(self, context): return bool(self.text) def get_additional_label(self): if not self.text: return None return utils.ellipsize(self.text) @classmethod def get_cell_types(cls): d = super(TextCell, cls).get_cell_types() d[0]['order'] = -1 return d def get_cell_extra_context(self, context): extra_context = super(TextCell, self).get_cell_extra_context(context) text = self.text or '' render_skeleton = context.get('render_skeleton') def sub_variadic_url(match): attribute = match.group(1) url = match.group(2) try: url = utils.get_templated_url(url, context=context) except utils.TemplateError as e: logger = logging.getLogger(__name__) logger.warning('error in templated URL (%s): %s' % (url, e)) return '%s="%s"' % (attribute, url) text = re.sub(r'(href|src)="(.*?)"', sub_variadic_url, text) if render_skeleton: request = context.get('request') def sub_src(match): url = request.build_absolute_uri(match.group(1)) return 'src="%s"' % url def sub_href(match): url = match.group(1) if url.startswith('#'): pass else: url = request.build_absolute_uri(url) return 'href="%s"' % url text = re.sub(r'src="(.*?)"', sub_src, text) text = re.sub(r'href="(.*?)"', sub_href, text) extra_context['text'] = mark_safe(text) return extra_context @register_cell_class class FortuneCell(CellBase): ajax_refresh = 30 class Meta: verbose_name = _('Fortune') def render(self, context): return subprocess.check_output(['fortune']) @classmethod def is_enabled(cls): try: subprocess.check_output(['fortune']) except OSError: return False return settings.DEBUG @register_cell_class class UnlockMarkerCell(CellBase): # XXX: this is kept to smooth transitions, it should be removed once all # sites # have been migrated to ParentContentCell """Marks an 'acquired' placeholder as unlocked.""" visible = False class Meta: verbose_name = _('Unlock Marker') def render(self, context): return '' @register_cell_class class MenuCell(CellBase): depth = models.PositiveIntegerField(_('Depth'), choices=[(i, i) for i in range(1, 3)], default=1, null=False) initial_level = models.IntegerField(_('Initial Level'), choices=[(-1, _('Same as page'))] + [(i, i) for i in range(1, 3)], default=-1, null=False) root_page = models.ForeignKey(Page, on_delete=models.CASCADE, related_name='root_page', null=True, blank=True, verbose_name=_('Root Page')) template_name = 'combo/menu-cell.html' class Meta: verbose_name = _('Menu') def get_default_form_class(self): from .forms import MenuCellForm return MenuCellForm def get_cell_extra_context(self, context): from combo.public.menu import render_menu ctx = super(MenuCell, self).get_cell_extra_context(context) ctx['menu'] = render_menu(context, level=self.initial_level, root_page=self.root_page, depth=self.depth, ignore_visibility=False) return ctx def render_for_search(self): return '' @register_cell_class class LinkCell(CellBase): title = models.CharField(_('Title'), max_length=150, blank=True) url = models.CharField(_('URL'), max_length=200, blank=True) link_page = models.ForeignKey( 'data.Page', on_delete=models.CASCADE, related_name='link_cell', blank=True, null=True, verbose_name=_('Internal link')) anchor = models.CharField(_('Anchor'), max_length=150, blank=True) template_name = 'combo/link-cell.html' add_as_link_label = _('add a link') add_link_label = _('New link') edit_link_label = _('Edit link') add_as_link_code = 'link' class Meta: verbose_name = _('Link') def get_additional_label(self): title = self.title if not title and self.link_page: title = self.link_page.title if not title: return None return utils.ellipsize(title) def get_cell_extra_context(self, context): render_skeleton = context.get('render_skeleton') request = context.get('request') extra_context = super(LinkCell, self).get_cell_extra_context(context) if self.link_page: extra_context['url'] = self.link_page.get_online_url() extra_context['title'] = self.title or self.link_page.title else: extra_context['url'] = utils.get_templated_url(self.url, context=context) extra_context['title'] = self.title or self.url if self.anchor: extra_context['url'] += '#' + self.anchor if render_skeleton and not urlparse.urlparse(extra_context['url']).netloc: # create full URL when used in a skeleton extra_context['url'] = request.build_absolute_uri(extra_context['url']) return extra_context def get_default_form_class(self): from .forms import LinkCellForm return LinkCellForm def render_for_search(self): return '' def get_external_links_data(self): if not self.url: return [] link_data = self.get_cell_extra_context({}) if link_data.get('title') and link_data.get('url'): return [link_data] return [] @register_cell_class class LinkListCell(CellBase): title = models.CharField(_('Title'), max_length=150, blank=True) template_name = 'combo/link-list-cell.html' manager_form_template = 'combo/manager/link-list-cell-form.html' class Meta: verbose_name = _('List of links') @property def link_placeholder(self): return '_linkslist:{}'.format(self.pk) def get_items(self): return CellBase.get_cells( page=self.page, placeholder=self.link_placeholder, cell_filter=lambda x: hasattr(x, 'add_as_link_label')) def get_additional_label(self): title = self.title if not title: return None return utils.ellipsize(title) def get_cell_extra_context(self, context): extra_context = super(LinkListCell, self).get_cell_extra_context(context) links = [] for cell in context.get('page_cells', []): if not hasattr(cell, 'add_as_link_label'): continue if not cell.placeholder == self.link_placeholder: continue links.append(cell.get_cell_extra_context(context)) extra_context['links'] = links extra_context['title'] = self.title return extra_context def get_link_cell_classes(self): return CellBase.get_cell_classes(lambda x: hasattr(x, 'add_as_link_label')) def get_default_form_class(self): from .forms import LinkListCellForm return LinkListCellForm def render_for_search(self): return '' def export_subobjects(self): return {'links': json.loads( serializers.serialize( 'json', self.get_items(), use_natural_foreign_keys=True, use_natural_primary_keys=True) )} def import_subobjects(self, cell_json): for link in cell_json['links']: link['fields']['placeholder'] = self.link_placeholder for link in serializers.deserialize('json', json.dumps(cell_json['links'])): link.save() def duplicate_m2m(self, new_cell): # duplicate also link items for link in self.get_items(): link.duplicate(page_target=new_cell.page, placeholder=new_cell.link_placeholder) @register_cell_class class FeedCell(CellBase): title = models.CharField(_('Title'), max_length=150, blank=True) url = models.CharField(_('URL'), blank=True, max_length=200) limit = models.PositiveSmallIntegerField(_('Maximum number of entries'), null=True, blank=True) manager_form_factory_kwargs = {'field_classes': {'url': TemplatableURLField}} template_name = 'combo/feed-cell.html' class Meta: verbose_name = _('RSS/Atom Feed') def is_visible(self, user=None): return bool(self.url) and super(FeedCell, self).is_visible(user=user) def get_cell_extra_context(self, context): extra_context = super(FeedCell, self).get_cell_extra_context(context) if context.get('placeholder_search_mode'): # don't call webservices when we're just looking for placeholders return extra_context cache_key = hashlib.md5(smart_bytes(self.url)).hexdigest() feed_content = cache.get(cache_key) if not feed_content: try: feed_response = requests.get(utils.get_templated_url(self.url)) feed_response.raise_for_status() except (requests.exceptions.RequestException): pass else: if feed_response.status_code == 200: feed_content = feed_response.content cache.set(cache_key, feed_content, 600) if feed_content: extra_context['feed'] = feedparser.parse(feed_content) if self.limit: extra_context['feed']['entries'] = extra_context['feed']['entries'][:self.limit] if not self.title: try: self.title = extra_context['feed']['feed'].title except (KeyError, AttributeError): pass return extra_context def render(self, context): cache_key = hashlib.md5(smart_bytes(self.url)).hexdigest() feed_content = cache.get(cache_key) if not context.get('synchronous') and feed_content is None: raise NothingInCacheException() return super(FeedCell, self).render(context) @register_cell_class class ParentContentCell(CellBase): class Meta: verbose_name = _('Same as parent') def get_parents_cells(self, hierarchy): try: pages = [Page.objects.get(slug='index', parent=None)] except Page.DoesNotExist: pages = [] pages.extend(hierarchy) if not pages: return [] if len(pages) > 1 and pages[0].id == pages[1].id: # don't duplicate index cells for real children of the index page. pages = pages[1:] cells_by_page = {} for page in pages: cells_by_page[page.id] = [] for cell in list(CellBase.get_cells(placeholder=self.placeholder, page__in=pages)): cells_by_page[cell.page_id].append(cell) cells = cells_by_page[pages[-1].id] for page in reversed(pages[:-1]): for i, cell in enumerate(cells): if isinstance(cell, ParentContentCell): cells[i:i+1] = cells_by_page[page.id] break else: # no more ParentContentCell, stop folloing the parent page chain break return cells def render(self, context): return '' class JsonCellBase(CellBase): url = None cache_duration = 60 template_string = None varnames = None force_async = False log_errors = True timeout = None actions = {} additional_data = None # [ # {'key': ..., # 'url': ..., # 'cache_duration': ... (optional) # }, # ... # ] first_data_key = 'json' _json_content = None class Meta: abstract = True def is_visible(self, user=None): return bool(self.url) and super(JsonCellBase, self).is_visible(user=user) def get_cell_parameters_context(self): return {} def get_cell_extra_context(self, context, invalidate_cache=False): extra_context = super(JsonCellBase, self).get_cell_extra_context(context) if context.get('placeholder_search_mode'): # don't call webservices when we're just looking for placeholders return extra_context if self.varnames and context.get('request'): for varname in self.varnames: if varname in context['request'].GET: context[varname] = context['request'].GET[varname] self._json_content = None extra_context.update(self.get_cell_parameters_context()) context.update(self.get_cell_parameters_context()) context['concerned_user'] = self.get_concerned_user(context) data_urls = [{'key': self.first_data_key, 'url': self.url, 'cache_duration': self.cache_duration, 'log_errors': self.log_errors, 'timeout': self.timeout}] data_urls.extend(self.additional_data or []) for data_url_dict in data_urls: extra_context[data_url_dict['key']] = None for data_url_dict in data_urls: data_key = data_url_dict['key'] log_errors = data_url_dict.get('log_errors', self.log_errors) try: url = utils.get_templated_url(data_url_dict['url'], context) except utils.TemplateError as e: logger = logging.getLogger(__name__) logger.warning('error in templated URL (%s): %s', data_url_dict['url'], e) continue extra_context[data_key + '_url'] = url if not url: continue try: json_response = utils.requests.get(url, headers={'Accept': 'application/json'}, remote_service='auto', cache_duration=data_url_dict.get('cache_duration', self.cache_duration), without_user=True, raise_if_not_cached=not(context.get('synchronous')), invalidate_cache=invalidate_cache, log_errors=log_errors, timeout=data_url_dict.get('timeout', self.timeout), django_request=context.get('request'), ) except requests.RequestException as e: extra_context[data_key + '_status'] = -1 extra_context[data_key + '_error'] = force_text(e) extra_context[data_key + '_exception'] = e logger = logging.getLogger(__name__) if log_errors: logger.warning(u'error on request %r: %s', url, force_text(e)) else: logger.debug(u'error on request %r: %s', url, force_text(e)) continue extra_context[data_key + '_status'] = json_response.status_code if json_response.status_code // 100 == 2: if json_response.status_code != 204: # 204 = No Content try: extra_context[data_key] = json_response.json() except ValueError: extra_context[data_key + '_error'] = 'invalid_json' logger = logging.getLogger(__name__) if log_errors: logger.error('invalid json content (%s)', url) else: logger.debug('invalid json content (%s)', url) continue elif json_response.headers.get('content-type') == 'application/json': try: extra_context[data_key + '_error'] = json_response.json() except ValueError: extra_context[data_key + '_error'] = 'invalid_json' # update context with data key so it can be used in future # templated URLs context[data_key] = extra_context[data_key] # keep cache of first response as it may be used to find the # appropriate template. self._json_content = extra_context[self.first_data_key] return extra_context @property def template_name(self): json_content = self._json_content if json_content is None: return 'combo/json-error-cell.html' if json_content.get('data'): if isinstance(json_content['data'], list): first_element = json_content['data'][0] if isinstance(first_element, dict): if 'url' in first_element and 'text' in first_element: return 'combo/json-list-cell.html' return 'combo/json-cell.html' def post(self, request): if not 'action' in request.POST: raise PermissionDenied() action = request.POST['action'] if not action in self.actions: raise PermissionDenied() error_message = self.actions[action].get('error-message') timeout = self.actions[action].get('timeout', self.timeout) method = self.actions[action].get('method', 'POST') logger = logging.getLogger(__name__) content = {} for key, value in request.POST.items(): if key == 'action': continue if key.endswith('[]'): # array content[key[:-2]] = request.POST.getlist(key) else: content[key] = value context = copy.copy(content) context.update(self.get_cell_parameters_context()) context['request'] = request context['synchronous'] = True try: url = utils.get_templated_url(self.actions[action]['url'], context) except utils.TemplateError as e: logger.warning('error in templated URL (%s): %s', self.actions[action]['url'], e) raise PostException(error_message) json_response = utils.requests.request( method, url, headers={'Accept': 'application/json'}, remote_service='auto', json=content, without_user=True, django_request=request, timeout=timeout, ) if json_response.status_code // 100 != 2: # 2xx logger.error('error POSTing data to URL (%s)', url) raise PostException(error_message) if self.cache_duration: self.get_cell_extra_context(context, invalidate_cache=True) if self.actions[action].get('response', 'cell') == 'raw': # response: raw in the config will get the response directly sent # to the client. return json_response # default behaviour, the cell will render itself. return None def render(self, context): if self.force_async and not context.get('synchronous'): raise NothingInCacheException() if self.template_string: tmpl = engines['django'].from_string(self.template_string) context.update(self.get_cell_extra_context(context)) return tmpl.render(context, context.get('request')) return super(JsonCellBase, self).render(context) def django_template_validator(value): try: tmpl = engines['django'].from_string(value) except TemplateSyntaxError as e: raise ValidationError(_('syntax error: %s') % e) @register_cell_class class JsonCell(JsonCellBase): title = models.CharField(_('Title'), max_length=150, blank=True) url = models.CharField(_('URL'), blank=True, max_length=500) template_string = models.TextField(_('Display Template'), blank=True, null=True, validators=[django_template_validator]) cache_duration = models.PositiveIntegerField( _('Cache duration'), default=60) force_async = models.BooleanField(_('Force asynchronous mode'), default=JsonCellBase.force_async) varnames_str = models.CharField(_('Variable names'), max_length=200, blank=True, help_text=_('Comma separated list of query-string variables ' 'to be copied in template context')) timeout = models.PositiveIntegerField(_('Request timeout'), default=0, help_text=_('In seconds. Use 0 for default system timeout')) manager_form_factory_kwargs = {'field_classes': {'url': TemplatableURLField}} class Meta: verbose_name = _('JSON Prototype') @property def varnames(self): return [vn.strip() for vn in self.varnames_str.split(',') if vn.strip()] class ConfigJsonCellManager(models.Manager): def get_queryset(self): queryset = super(ConfigJsonCellManager, self).get_queryset() return queryset.filter(key__in=settings.JSON_CELL_TYPES.keys()) @register_cell_class class ConfigJsonCell(JsonCellBase): objects = ConfigJsonCellManager() key = models.CharField(max_length=50) parameters = JSONField(blank=True) @classmethod def get_cell_types(cls): l = [] for key, definition in settings.JSON_CELL_TYPES.items(): l.append({ 'name': definition['name'], 'variant': key, 'group': _('Extra'), 'cell_type_str': cls.get_cell_type_str(), }) l.sort(key=lambda x: x.get('name')) return l def get_label(self): return settings.JSON_CELL_TYPES[self.key]['name'] def set_variant(self, variant): self.key = variant @property def css_class_names(self): return super(ConfigJsonCell, self).css_class_names + ' ' + self.key @property def ajax_refresh(self): return settings.JSON_CELL_TYPES[self.key].get('auto_refresh', None) @property def url(self): return settings.JSON_CELL_TYPES[self.key]['url'] @property def cache_duration(self): return settings.JSON_CELL_TYPES[self.key].get('cache_duration', JsonCellBase.cache_duration) @property def varnames(self): return settings.JSON_CELL_TYPES[self.key].get('varnames') @property def force_async(self): return settings.JSON_CELL_TYPES[self.key].get('force_async', JsonCellBase.force_async) @property def log_errors(self): return settings.JSON_CELL_TYPES[self.key].get('log_errors', JsonCellBase.log_errors) @property def timeout(self): return settings.JSON_CELL_TYPES[self.key].get('timeout', JsonCellBase.timeout) @property def actions(self): return settings.JSON_CELL_TYPES[self.key].get('actions', JsonCellBase.actions) @property def additional_data(self): return settings.JSON_CELL_TYPES[self.key].get('additional-data') @property def template_name(self): return 'combo/json/%s.html' % self.key @property def loading_message(self): return settings.JSON_CELL_TYPES[self.key].get('loading-message', CellBase.loading_message) def get_default_form_class(self): formdef = settings.JSON_CELL_TYPES[self.key].get('form') if not formdef: return None from .forms import ConfigJsonForm # create a subclass of ConfigJsonForm with 'formdef' (the list of # fields) as an attribute. config_form_class = type(str('%sConfigClass' % self.key), (ConfigJsonForm,), {'formdef': formdef}) return config_form_class def get_cell_parameters_context(self): context = copy.copy(self.parameters or {}) context['parameters'] = self.parameters return context class ExternalLinkSearchItem(models.Model): # Link to an external site. # # Those are automatically collected during by the "update_index" command, # that calls get_external_links_data from all available cells, to be used # by the general search engine. title = models.CharField(_('Title'), max_length=150) text = models.TextField(blank=True) url = models.CharField(_('URL'), max_length=200, blank=True) last_update_timestamp = models.DateTimeField(auto_now=True) @receiver(pre_save, sender=Page) def create_redirects(sender, instance, raw, **kwargs): if raw or not instance.id or instance.snapshot_id: return try: old_page = Page.objects.get(id=instance.id) except Page.DoesNotExist: return if old_page.slug == instance.slug and old_page.parent_id == instance.parent_id: return affected_pages = level_pages = [old_page] while True: level_pages = Page.objects.filter(parent_id__in=[x.id for x in level_pages]).select_related('parent') if len(level_pages) == 0: break affected_pages.extend(level_pages) for page in affected_pages: Redirect(page=page, old_url=page.get_online_url()).save() @receiver(post_save) @receiver(post_delete) def cell_maintain_page_cell_cache(sender, instance=None, **kwargs): if not issubclass(sender, CellBase): return if not instance.page_id: return page = instance.page page.build_cell_cache()