This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
tabellio.documents/tabellio/documents/indexer.py

243 lines
7.7 KiB
Python

import logging
from zope.interface import Interface
from ZODB.POSException import ConflictError
from five import grok
from Products.CMFCore.utils import getToolByName
from plone.dexterity.utils import iterSchemata
from plone.indexer import indexer
from plone.z3cform import z2
from z3c.form.field import Field
from z3c.form.interfaces import DISPLAY_MODE, IFieldWidget
from z3c.form.interfaces import IContextAware, IFormLayer, IField
from zope import schema
from zope.component import getAdapters, getMultiAdapter
from zope.interface import alsoProvides
from DateTime import DateTime
from document import IDocument
from dossier import IDossier
from question import IQuestion
log = logging.getLogger('Plone')
def dateIndexer(obj):
if obj.date is None:
return None
return DateTime(obj.date.isoformat())
documentDateIndexer = indexer(IDocument)(dateIndexer)
dossierDateIndexer = indexer(IDossier)(dateIndexer)
questionDateIndexer = indexer(IQuestion)(dateIndexer)
grok.global_adapter(documentDateIndexer, name="dateDoc")
grok.global_adapter(dossierDateIndexer, name="dateDoc")
grok.global_adapter(questionDateIndexer, name="dateDoc")
def polgroupsIndexer(obj):
if not obj.polgroups:
return None
return [item.to_object.id for item in obj.polgroups]
documentPolgroupsIndexer = indexer(IDocument)(polgroupsIndexer)
dossierPolgroupsIndexer = indexer(IDossier)(polgroupsIndexer)
questionPolgroupsIndexer = indexer(IQuestion)(polgroupsIndexer)
grok.global_adapter(documentPolgroupsIndexer, name="polgroupsDoc")
grok.global_adapter(dossierPolgroupsIndexer, name="polgroupsDoc")
grok.global_adapter(questionPolgroupsIndexer, name="polgroupsDoc")
def authorsIndexer(obj):
if not obj.authors:
return None
return [item.to_object.id for item in obj.authors]
documentAuthorsIndexer = indexer(IDocument)(authorsIndexer)
dossierAuthorsIndexer = indexer(IDossier)(authorsIndexer)
questionAuthorsIndexer = indexer(IQuestion)(authorsIndexer)
grok.global_adapter(documentAuthorsIndexer, name="authorsDoc")
grok.global_adapter(dossierAuthorsIndexer, name="authorsDoc")
grok.global_adapter(questionAuthorsIndexer, name="authorsDoc")
def interveningPolgroupsIndexer(obj):
return None
documentInterveningPolgroupsIndexer = indexer(IDocument)(interveningPolgroupsIndexer)
dossierInterveningPolgroupsIndexer = indexer(IDossier)(interveningPolgroupsIndexer)
questionInterveningPolgroupsIndexer = indexer(IQuestion)(interveningPolgroupsIndexer)
grok.global_adapter(documentInterveningPolgroupsIndexer, name="interveningPolgroupsDoc")
grok.global_adapter(dossierInterveningPolgroupsIndexer, name="interveningPolgroupsDoc")
grok.global_adapter(questionInterveningPolgroupsIndexer, name="interveningPolgroupsDoc")
def interveningPersonsIndexer(obj):
if not obj.histolines:
return None
r = []
for line in obj.histolines:
if not line.authors:
continue
r.extend([item.to_object.id for item in line.authors])
return r
dossierInterveningPersonsIndexer = indexer(IDossier)(interveningPersonsIndexer)
questionInterveningPersonsIndexer = indexer(IQuestion)(interveningPersonsIndexer)
grok.global_adapter(dossierInterveningPersonsIndexer, name="interveningPersonsDoc")
grok.global_adapter(questionInterveningPersonsIndexer, name="interveningPersonsDoc")
def commissionsIndexer(obj):
if not obj.commissions:
return None
return [item.to_object.id for item in obj.commissions]
dossierCommissionsIndexer = indexer(IDossier)(commissionsIndexer)
grok.global_adapter(dossierCommissionsIndexer, name="commissionsDoc")
class IDocumentIndexer(Interface):
"""Dexterity behavior interface for enabling the dynamic SearchableText
indexer on Document objecgs."""
class IAuthorsIndexer(Interface):
"""Dexterity behavior interface for enabling the dynamic SearchableText
indexer on Authors fields."""
class FakeView(object):
"""This fake view is used for enabled z3c forms z2 mode on.
"""
def __init__(self, context, request):
self.context = context
self.request = request
def raw_author_dynamic_searchable_text_indexer(obj):
"""Dynamic searchable text indexer.
"""
# We need to make sure that we have z2 mode switched on for z3c form.
# Since we do not really have any view to do this on, we just use
# a fake view. For switching z2 mode on, it's only necessary that
# there is a view.request.
view = FakeView(obj, obj.REQUEST)
z2.switch_on(view, request_layer=IFormLayer)
indexed = []
for fieldname in ('title', 'authors',):
try:
value = getattr(obj, fieldname)
except AttributeError:
continue
if not value:
continue
if type(value) is list:
for item in value:
item_value = item.to_object.title
# be sure that it is utf-8 encoded
if isinstance(item_value, unicode):
item_value = item_value.encode('utf-8')
indexed.append(item_value)
else:
if isinstance(value, unicode):
value = value.encode('utf-8')
indexed.append(value)
return ' '.join(indexed)
@indexer(IAuthorsIndexer)
def author_dynamic_searchable_text_indexer(obj):
return raw_author_dynamic_searchable_text_indexer(obj)
grok.global_adapter(author_dynamic_searchable_text_indexer,
name='SearchableText')
@indexer(IDocumentIndexer)
def document_dynamic_searchable_text_indexer(obj):
"""Dynamic searchable text indexer.
"""
title_and_authors = raw_author_dynamic_searchable_text_indexer(obj)
data = obj.file
if not data or data.getSize() == 0:
return title_and_authors
# if there is no path to text/plain, do nothing
transforms = getToolByName(obj, 'portal_transforms')
if not transforms._findPath(data.contentType, 'text/plain'):
log.info(' limiting return to title and authors (2) (%s)' % data.contentType)
return title_and_authors
# convert it to text/plain
try:
datastream = transforms.convertTo(
'text/plain', data.data, mimetype=data.contentType,
filename=data.filename)
return title_and_authors + ' ' + datastream.getData()
except (ConflictError, KeyboardInterrupt):
raise
return title_and_authors
grok.global_adapter(document_dynamic_searchable_text_indexer,
name='SearchableText')
def get_field_widget(obj, field):
"""Returns the field widget of a field in display mode without
touching any form.
The `field` should be a z3c form field, not a zope schema field.
"""
assert IField.providedBy(field), 'field is not a form field'
if field.widgetFactory.get(DISPLAY_MODE) is not None:
factory = field.widgetFactory.get(DISPLAY_MODE)
widget = factory(field.field, obj.REQUEST)
else:
widget = getMultiAdapter(
(field.field, obj.REQUEST), IFieldWidget)
widget.name = '' + field.__name__ # prefix not needed
widget.id = widget.name.replace('.', '-')
widget.context = obj
alsoProvides(widget, IContextAware)
widget.mode = DISPLAY_MODE
widget.update()
return widget
def get_searchable_contexts_and_fields(obj):
"""Returns a generator of tuples, which contains a storage object for
each schema (adapted `obj`) and a list of fields on this schema which
are searchable.
"""
for schemata in iterSchemata(obj):
fields = []
for name in ('title', 'authors',):
field = schema.getFields(schemata).get(name)
if field is None:
continue
fields.append(field)
if fields:
storage = schemata(obj)
yield storage, fields