combo/combo/apps/search/utils.py

154 lines
6.1 KiB
Python

# combo - content management system
# Copyright (C) 2014-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.contrib.postgres.search import SearchQuery, SearchRank, SearchVector
from django.db import connection
from django.db.models import Q, Prefetch
from django.db.transaction import atomic
from combo.data.models import Page, CellBase, ValidityInfo
from .models import IndexedCell
def set_cell_groups(indexed_cell, cell):
restricted_groups = []
excluded_groups = []
if not indexed_cell.public_access:
restricted_groups = cell.prefetched_groups
if cell.restricted_to_unlogged:
excluded_groups = cell.page.prefetched_groups
else:
for group in cell.page.prefetched_groups:
restricted_groups.append(group)
if restricted_groups:
indexed_cell.restricted_groups.add(*restricted_groups)
if excluded_groups:
indexed_cell.excluded_groups.add(*excluded_groups)
@atomic
def index_site():
cell_classes = list(CellBase.get_cell_classes())
# populate ContentType cache
ContentType.objects.get_for_models(*cell_classes)
IndexedCell.objects.all().delete()
external_urls = {}
validity_info_list = list(ValidityInfo.objects.select_related('content_type'))
pages_by_pk = {
p.pk: p for p in (Page.objects.prefetch_related(Prefetch('groups', to_attr='prefetched_groups')))
}
for klass in cell_classes:
if getattr(klass, 'exclude_from_search', False) is True:
# do not load cells marked as excluded from search (example: MenuCell, SearchCell, ...)
continue
queryset = (
klass.objects.filter(page__snapshot__isnull=True, page__sub_slug='')
.exclude(placeholder__startswith='_')
.prefetch_related(Prefetch('groups', to_attr='prefetched_groups'))
)
for cell in queryset:
cell.page = pages_by_pk.get(cell.page_id)
# exclude cells with an inactive placeholder
if not cell.is_placeholder_active():
continue
cell.prefetched_validity_info = [
v
for v in validity_info_list
if v.object_id == cell.pk and v.content_type.model_class() == cell.__class__
]
cell_type = ContentType.objects.get_for_model(cell)
indexed_cell = IndexedCell(cell_type=cell_type, cell_pk=cell.id)
try:
indexed_cell.indexed_text = cell.render_for_search()
except Exception: # ignore rendering error
continue
if indexed_cell.indexed_text:
indexed_cell.public_access = bool(cell.page.public and cell.public)
indexed_cell.page_id = cell.page_id
indexed_cell.url = cell.page.get_online_url()
indexed_cell.title = cell.page.title
indexed_cell.save()
set_cell_groups(indexed_cell, cell)
for link_data in cell.get_external_links_data():
# index external links
indexed_cell = external_urls.get(link_data.get('url'))
if indexed_cell is None:
# create an entry for that link.
indexed_cell = IndexedCell(
cell_type=cell_type,
cell_pk=cell.id,
public_access=bool(cell.page.public and cell.public),
url=link_data['url'],
title=link_data['title'],
indexed_text=link_data.get('text') or '',
)
indexed_cell.save()
set_cell_groups(indexed_cell, cell)
external_urls[indexed_cell.url] = indexed_cell
else:
# if that link already exists, add detailed texts
indexed_cell.indexed_text += ' ' + link_data['title']
indexed_cell.indexed_text += ' ' + (link_data.get('text') or '')
indexed_cell.save()
def search_site(request, query, pages=None, with_description=None):
pages = pages or []
if connection.vendor == 'postgresql':
config = settings.POSTGRESQL_FTS_SEARCH_CONFIG
vector = SearchVector('title', config=config, weight='A') + SearchVector(
'indexed_text', config=config, weight='B'
)
query = SearchQuery(query, config=config)
qs = (
IndexedCell.objects.annotate(rank=SearchRank(vector, query))
.filter(rank__gte=0.2)
.order_by('-rank')
)
else:
qs = IndexedCell.objects.filter(Q(indexed_text__icontains=query) | Q(title__icontains=query))
if request.user.is_anonymous:
qs = qs.exclude(public_access=False)
else:
qs = qs.filter(Q(restricted_groups=None) | Q(restricted_groups__in=request.user.groups.all()))
qs = qs.exclude(excluded_groups__in=request.user.groups.all())
if pages:
qs = qs.filter(page__in=pages)
hits = []
seen = {}
for hit in qs:
if hit.url in seen:
continue
hits.append(
{
'text': hit.title,
'rank': getattr(hit, 'rank', None),
'url': hit.url,
'description': hit.page.description if (hit.page and with_description is True) else '',
}
)
seen[hit.url] = True
if len(hits) == 10:
break
return hits