combo/combo/apps/search/models.py

284 lines
12 KiB
Python

# combo - content management system
# Copyright (C) 2014-2017 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib.auth.models import Group
from django.contrib.contenttypes import fields
from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django import template
from django.http import HttpResponse
from django.core.exceptions import PermissionDenied
from django.utils.functional import cached_property
from django.utils.http import quote
from django.shortcuts import get_object_or_404
from django.template import RequestContext, Template
from jsonfield import JSONField
from combo.utils import requests
from combo.data.models import CellBase, Page
from combo.data.library import register_cell_class
from combo.utils import get_templated_url
from . import engines
def get_root_page_and_children(service_slug):
if not service_slug.startswith('_text_page_'):
return
page_slug = service_slug.replace('_text_page_', '')
try:
root_page = Page.objects.get(slug=page_slug, sub_slug='')
except (Page.DoesNotExist, Page.MultipleObjectsReturned):
return
return root_page.get_descendants_and_me()
@register_cell_class
class SearchCell(CellBase):
template_name = 'combo/search-cell.html'
manager_form_template = 'combo/manager/search-cell-form.html'
exclude_from_search = True
_search_services = JSONField(_('Search Services'), default=dict, blank=True)
autofocus = models.BooleanField(_('Autofocus'), default=False)
input_placeholder = models.CharField(_('Placeholder'), max_length=64, default="", blank=True)
class Meta:
verbose_name = _('Search')
def is_visible(self, **kwargs):
if not self._search_services.get('data'):
return False
return super(SearchCell, self).is_visible(**kwargs)
def get_default_form_class(self):
from .forms import SearchCellForm
return SearchCellForm
@property
def varname(self):
if self.slug:
# no hyphen in varname, could be used in context and templates
return self.slug.replace('-', '_')
return ''
@cached_property
def search_services(self):
services = []
for service_slug in self._search_services.get('data') or []:
service = engines.get(service_slug)
if service_slug.startswith('_text_page_'):
service = engines.get('_text')
if service_slug.startswith('cards:'):
parts = service_slug.split(':')
without_user = False
if parts[2].endswith('__without-user__'):
without_user = True
parts[2] = parts[2].replace('__without-user__', '')
if not service:
# retry with cleaned parts
service = engines.get(':'.join(parts[:3]))
if not service:
# still not found
continue
if len(parts) > 3:
custom_views_by_id = {c['id']: c['text'] for c in service.get('custom_views') or []}
service['label'] = '%s - %s' % (service['label'], custom_views_by_id.get(parts[3]))
service['selected_custom_view'] = parts[3]
if without_user:
service['without_user'] = True
service['label'] = '%s (%s)' % (service['label'], _('logged-in user ignored'))
if service and (service.get('url') or service.get('function')):
service['slug'] = service_slug
service['options'] = self._search_services.get('options', {}).get(service_slug)
services.append(service)
return services
@cached_property
def available_engines(self):
all_engines = engines.get_engines()
# always remove _text engine: we can add search on page and sub pages
current_engines = [e['slug'] for e in self.search_services if e['slug'] != '_text' and not e['slug'].startswith('cards:')]
return {k: v for k, v in all_engines.items() if k not in current_engines}
def get_search_services_for_display(self):
# get pages for _text engines
page_slugs = [
e['slug'].replace('_text_page_', '') for e in self.search_services
if e['slug'].startswith('_text_page_')]
pages = (
Page.objects
.filter(snapshot__isnull=True, sub_slug='', slug__in=page_slugs)
.values('slug', 'title'))
pages_by_slug = {'_text_page_%s' % p['slug']: p['title'] for p in pages}
services = []
for service in self.search_services:
label = service['label']
if service['slug'] in pages_by_slug:
label = _('Page "%(page)s" and sub pages Contents') % {'page': pages_by_slug[service['slug']]}
services.append((service['slug'], label, service['options']))
return services
@property
def has_multiple_search_services(self):
return len(self._search_services.get('data') or []) > 1
@classmethod
def get_cells_by_search_service(cls, search_service):
for cell in cls.objects.all():
if search_service in (cell._search_services.get('data') or []):
yield cell
def modify_global_context(self, context, request):
# if self.varname is in the query string (of the page),
# add it to the global context; so it can be used by others cells
# for example by a JsonCell with ...[self.varname]... in its URL
if self.varname and self.varname in request.GET:
context[self.varname] = request.GET.get(self.varname)
def get_cell_extra_context(self, context):
extra_context = super(SearchCell, self).get_cell_extra_context(context)
# if there is a q_<slug> in query_string, send it to the template (to be
# used as an initial query) and remove it from query_string
initial_q = None
initial_query_string = None
if context.get('request'):
request_get = context['request'].GET.copy()
if self.varname and context.get('request'):
q_varname = 'q_%s' % self.varname
if q_varname in request_get:
initial_q = request_get[q_varname]
del request_get[q_varname]
initial_query_string = request_get.urlencode()
extra_context.update({
'initial_q': initial_q,
'initial_query_string': initial_query_string
})
return extra_context
@classmethod
def ajax_results_view(cls, request, cell_pk, service_slug):
cell = get_object_or_404(cls, pk=cell_pk)
if not cell.is_visible(user=request.user) or not cell.page.is_visible(request.user):
raise PermissionDenied
query = request.GET.get('q')
def render_response(service=None, results=None, pages=None):
service = service or {}
results = results or {'err': 0, 'data': []}
template_names = ['combo/search-cell-results.html']
if cell.slug:
template_names.insert(0, 'combo/cells/%s/search-cell-results.html' % cell.slug)
tmpl = template.loader.select_template(template_names)
service_label = service.get('label')
if pages:
service_label = _('Page "%(page)s" and sub pages Contents') % {'page': pages[0].title}
try:
# optional label defined with engine
if service['options']['title']:
service_label = service['options']['title']
except (KeyError, TypeError):
pass
context = {
'cell': cell,
'results': results,
'search_service': service,
'search_service_label': service_label,
'query': query
}
return HttpResponse(tmpl.render(context, request), content_type='text/html')
for service in cell.search_services:
if service.get('slug') == service_slug:
break
else:
return render_response()
if not query:
return render_response(service)
pages = None
if service.get('function'): # internal search engine
pages = get_root_page_and_children(service_slug)
results = {'data': service['function'](request, query, pages=pages)}
else:
url = get_templated_url(service['url'],
context={'request': request, 'q': query, 'search_service': service})
if '%(q)s' in url:
# escape percent signs by doubling them, then restore %(q)s
url = url.replace('%', '%%').replace('%%(q)s', '%(q)s')
url = url % {'q': quote(query.encode('utf-8'))}
if url.startswith('/'):
url = request.build_absolute_uri(url)
if not url:
return render_response(service)
kwargs = {}
kwargs['cache_duration'] = service.get('cache_duration', 0)
kwargs['remote_service'] = 'auto' if service.get('signature') else None
# don't automatically add user info to query string, if required it can
# be set explicitely in the URL template in the engine definition (via
# {{user_nameid}} or {{user_email}}).
kwargs['without_user'] = True
# don't send error traces on HTTP errors
kwargs['log_errors'] = 'warn'
response = requests.get(url, **kwargs)
try:
results = response.json()
except ValueError:
return render_response(service)
if service.get('data_key'):
results['data'] = results.get(service['data_key']) or []
hit_templates = {}
if service.get('hit_url_template'):
hit_templates['url'] = Template(service['hit_url_template'])
if service.get('hit_label_template'):
hit_templates['text'] = Template(service['hit_label_template'])
if service.get('hit_description_template'):
hit_templates['description'] = Template(service['hit_description_template'])
if hit_templates:
for hit in results.get('data') or []:
for k, v in hit_templates.items():
hit[k] = v.render(RequestContext(request, hit))
return render_response(service, results, pages=pages)
def has_text_search_service(self):
return any(key.startswith('_text') for key in self._search_services.get('data', []))
def missing_index(self):
return IndexedCell.objects.all().count() == 0
class IndexedCell(models.Model):
cell_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
cell_pk = models.PositiveIntegerField(null=True)
cell = fields.GenericForeignKey('cell_type', 'cell_pk')
page = models.ForeignKey(Page, on_delete=models.CASCADE, blank=True, null=True)
url = models.CharField(max_length=500, blank=True, null=True)
title = models.CharField(max_length=500, blank=True, null=True)
indexed_text = models.TextField(blank=True, null=True)
public_access = models.BooleanField(default=False)
restricted_groups = models.ManyToManyField(Group, blank=True, related_name='+')
excluded_groups = models.ManyToManyField(Group, blank=True, related_name='+')
last_update_timestamp = models.DateTimeField(auto_now=True)