combo/combo/apps/search/utils.py

112 lines
4.5 KiB
Python

# combo - content management system
# Copyright (C) 2014-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.contrib.postgres.search import SearchQuery, SearchRank, SearchVector
from combo.data.models import CellBase
from django.db import connection
from django.db.models import Q
from django.db.transaction import atomic
from .models import IndexedCell
def set_cell_access(indexed_cell, cell):
indexed_cell.public_access = bool(cell.page.public and cell.public)
indexed_cell.excluded_groups.clear()
indexed_cell.restricted_groups.clear()
if not indexed_cell.public_access:
indexed_cell.restricted_groups.set(cell.groups.all())
if cell.restricted_to_unlogged:
indexed_cell.excluded_groups.set(cell.page.groups.all())
else:
for group in cell.page.groups.all():
indexed_cell.restricted_groups.add(group)
indexed_cell.save()
@atomic
def index_site():
IndexedCell.objects.all().delete()
external_urls = {}
for klass in CellBase.get_cell_classes():
for cell in klass.objects.filter(page__snapshot__isnull=True).exclude(placeholder__startswith='_'):
cell_type = ContentType.objects.get_for_model(cell)
indexed_cell = IndexedCell(cell_type=cell_type, cell_pk=cell.id)
try:
indexed_cell.indexed_text = cell.render_for_search()
except Exception: # ignore rendering error
continue
if indexed_cell.indexed_text:
indexed_cell.page_id = cell.page_id
indexed_cell.url = cell.page.get_online_url()
indexed_cell.title = cell.page.title
indexed_cell.save()
set_cell_access(indexed_cell, cell)
for link_data in cell.get_external_links_data():
# index external links
indexed_cell = external_urls.get(indexed_cell.url)
if indexed_cell is None:
# create an entry for that link.
indexed_cell = IndexedCell(cell_type=cell_type, cell_pk=cell.id)
indexed_cell.save()
set_cell_access(indexed_cell, cell)
indexed_cell.url = link_data['url']
indexed_cell.title = link_data['title']
indexed_cell.indexed_text = link_data.get('text') or ''
external_urls[indexed_cell.url] = indexed_cell
else:
# if that link already exists, add detailed texts
indexed_cell.indexed_text += ' ' + link_data['title']
indexed_cell.indexed_text += ' ' + link_data.get('text') or ''
indexed_cell.save()
def search_site(request, query):
if connection.vendor == 'postgresql':
config = settings.POSTGRESQL_FTS_SEARCH_CONFIG
vector = SearchVector('title', config=config, weight='A') + SearchVector('indexed_text', config=config, weight='A')
query = SearchQuery(query)
qs = IndexedCell.objects.annotate(rank=SearchRank(vector, query)).filter(rank__gte=0.3).order_by('-rank')
else:
qs = IndexedCell.objects.filter(
Q(indexed_text__icontains=query) | Q(title__icontains=query))
if request.user.is_anonymous:
qs = qs.exclude(public_access=False)
else:
qs = qs.filter(
Q(restricted_groups=None) |
Q(restricted_groups__in=request.user.groups.all()))
qs = qs.exclude(excluded_groups__in=request.user.groups.all())
hits = []
seen = {}
for hit in qs:
if hit.url in seen:
continue
hits.append({
'text': hit.title,
'rank': getattr(hit, 'rank', None),
'url': hit.url,
})
seen[hit.url] = True
if len(hits) == 10:
break
return hits