This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
themis.search/themis/search/indexer.py

338 lines
10 KiB
Python

from DateTime import DateTime
from plone.indexer import indexer
from five import grok
import plone.dexterity.interfaces
from Products.CMFCore.utils import getToolByName
from ZODB.POSException import ConflictError
from themis.fields.vocabs import ContactsSource
@indexer(plone.dexterity.interfaces.IDexterityItem)
def mailDateIndexer(obj):
if obj.portal_type not in ('courrier_entrant', 'courrier_sortant'):
return None
for attr in ('date_reelle_courrier', 'date_reception'):
if not hasattr(obj, attr):
continue
if getattr(obj, attr):
return DateTime(getattr(obj, attr).isoformat())
return None
grok.global_adapter(mailDateIndexer, name="mailDate")
@indexer(plone.dexterity.interfaces.IDexterityItem)
def docDateIndexer(obj):
if not '(D)' in obj.Type():
return None
for attr in ('date_document_imprime', 'date_du_document',
'date_de_publication', 'date_du_rapport'):
if not hasattr(obj, attr):
continue
if getattr(obj, attr):
return DateTime(getattr(obj, attr).isoformat())
return None
grok.global_adapter(docDateIndexer, name="docDate")
@indexer(plone.dexterity.interfaces.IDexterityItem)
def docMeetingDateIndexer(obj):
if not '(D)' in obj.Type():
return None
for attr in ('date_seance', 'date_de_la_commission',
'date_seance_ou_commission'):
if not hasattr(obj, attr):
continue
if getattr(obj, attr):
return DateTime(getattr(obj, attr).isoformat())
return None
grok.global_adapter(docMeetingDateIndexer, name="docMeetingDate")
@indexer(plone.dexterity.interfaces.IDexterityItem)
def mailCategoryTxtIndexer(obj):
if obj.portal_type not in ('courrier_entrant', 'courrier_sortant'):
return None
for attr in ('categorie_de_courrier',):
if not hasattr(obj, attr):
continue
if getattr(obj, attr):
return getattr(obj, attr)[0]
return None
grok.global_adapter(mailCategoryTxtIndexer, name="mailCategoryTxt")
@indexer(plone.dexterity.interfaces.IDexterityItem)
def mailRelatedDocsTxtIndexer(obj):
if obj.portal_type not in ('courrier_entrant', 'courrier_sortant'):
return None
values = []
for attr in ('docs_related',):
if not hasattr(obj, attr):
continue
if getattr(obj, attr):
for relationvalue in getattr(obj, attr):
values.append(relationvalue.to_object.Title())
if values:
return ', '.join(values)
else:
return None
grok.global_adapter(mailRelatedDocsTxtIndexer, name="mailRelatedDocsTxt")
@indexer(plone.dexterity.interfaces.IDexterityItem)
def docSessionIndexer(obj):
if not '(D)' in obj.Type():
return None
for attr in ('session',):
if not hasattr(obj, attr):
continue
if getattr(obj, attr):
return getattr(obj, attr)
return None
grok.global_adapter(docSessionIndexer, name='docSession')
def get_data_to_index(obj, data):
# if there is no path to text/plain, do nothing
transforms = getToolByName(obj, 'portal_transforms')
if not transforms._findPath(data.contentType, 'text/plain'):
return obj.title
# convert it to text/plain
try:
datastream = transforms.convertTo(
'text/plain', data.data, mimetype=data.contentType,
filename=data.filename)
data = datastream.getData()
except (ConflictError, KeyboardInterrupt):
raise
try:
data = unicode(datastream.getData(), 'utf-8')
except UnicodeDecodeError, e:
try:
data = unicode(datastream.getData()[:e.start], 'utf-8')
except UnicodeDecodeError:
# ok, forget it
data = ''
return data
@indexer(plone.dexterity.interfaces.IDexterityItem)
def mail_dynamic_searchable_text_indexer(obj):
"""Dynamic searchable text indexer.
"""
if obj.portal_type not in ('courrier_entrant', 'courrier_sortant'):
return None
data = obj.fichier
if not data or data.getSize() == 0:
return obj.title
return obj.title + ' ' + get_data_to_index(obj, data)
grok.global_adapter(mail_dynamic_searchable_text_indexer, name='mailSearchableText')
@indexer(plone.dexterity.interfaces.IDexterityItem)
def doc_dynamic_searchable_text_indexer(obj):
"""Dynamic searchable text indexer.
"""
if not '(D)' in obj.Type():
return None
data = None
for attr in ('fichier' 'document_imprime'):
if not hasattr(obj, attr):
continue
if getattr(obj, attr):
data = getattr(obj, attr)
break
if not data or data.getSize() == 0:
return obj.title
return obj.title + ' ' + get_data_to_index(obj, data)
grok.global_adapter(doc_dynamic_searchable_text_indexer, name='docSearchableText')
@indexer(plone.dexterity.interfaces.IDexterityItem)
def contactIndexer(obj):
if obj.portal_type not in ('courrier_entrant', 'courrier_sortant'):
return None
contacts_dir = getattr(getToolByName(obj, 'portal_url').getPortalObject(), 'contacts')
for attr in ('expediteur', 'destinataire'):
if not hasattr(obj, attr):
continue
v = getattr(obj, attr)
if not v:
continue
v = v[0]
if ':' in v:
src = ContactsSource()
try:
r = src.fastGetTitleByToken(obj, v)
except KeyError:
continue
if not type(r) is unicode:
r = unicode(r, 'utf-8')
return r
else:
return v
return None
grok.global_adapter(contactIndexer, name="mailContact")
@indexer(plone.dexterity.interfaces.IDexterityItem)
def contactFuzzyIndexer(obj):
if obj.portal_type not in ('courrier_entrant', 'courrier_sortant'):
return None
for attr in ('expediteur', 'destinataire'):
if not hasattr(obj, attr):
continue
v = getattr(obj, attr)
if not v:
continue
# it may happen there are several items, merge all of them into a
# single value
result = []
for contact in v:
if ':' in contact:
src = ContactsSource()
try:
r = src.fastGetTitleByToken(obj, contact)
except KeyError:
result.append(contact.split(':')[1:])
continue
if not type(r) is unicode:
r = unicode(r, 'utf-8')
result.append(r)
else:
result.append(contact)
if result:
return u' '.join(result)
return None
grok.global_adapter(contactFuzzyIndexer, name="mailContactFuzzy")
@indexer(plone.dexterity.interfaces.IDexterityItem)
def docNumberIndexer(obj):
if not '(D)' in obj.Type():
return None
for attr in ('numero_biq', 'numero_document', 'numero_bqr'):
if not hasattr(obj, attr):
continue
if getattr(obj, attr):
return getattr(obj, attr)
return None
grok.global_adapter(docNumberIndexer, name="docNumber")
@indexer(plone.dexterity.interfaces.IDexterityItem)
def docCommissionsIndexer(obj):
if not '(D)' in obj.Type():
return None
l = []
for attr in ('commissions', 'commission_qui_examine', 'commission',
'examine_en', 'commisions__examine'):
if not hasattr(obj, attr):
continue
t = getattr(obj, attr)
if t:
if type(t) is list:
l.extend(t)
elif type(t) in (unicode, str):
l.append(t)
if not l:
return None
return l
grok.global_adapter(docCommissionsIndexer, name="docCommissions")
@indexer(plone.dexterity.interfaces.IDexterityItem)
def docCategoryIndexer(obj):
if not '(D)' in obj.Type():
return None
category = [obj.Type().replace('(D)', '').strip()]
if hasattr(obj, 'type_de_projet') and getattr(obj, 'type_de_projet'):
s = getattr(obj, 'type_de_projet')
if type(s) is list:
category.extend(s)
elif s:
category.append(s)
if hasattr(obj, 'type_de_proposition') and getattr(obj, 'type_de_proposition'):
s = getattr(obj, 'type_de_proposition')
if type(s) is list:
category.extend(s)
elif s:
category.append(s)
return category
grok.global_adapter(docCategoryIndexer, name="docCategory")
def get_doc_persons(obj):
persons = []
src = ContactsSource()
for attr in ('auteur', 'auteurs', 'rapporteurs', 'orateurs_seance',
'orateurs', 'orateurs_en_commission', 'orateurs__en_seanceprop',
'orateurs_rapportcom', 'orateurs_seance_reponse_orale',
'ministres_concernes'):
if not hasattr(obj, attr):
continue
value = getattr(obj, attr)
if not value:
continue
if type(value) is not list:
value = [value]
for item in value:
if ':' in item:
try:
r = src.fastGetTitleByToken(obj, item)
except KeyError:
continue
if not type(r) is unicode:
r = unicode(r, 'utf-8')
persons.append(r)
else:
persons.append(item)
return persons
@indexer(plone.dexterity.interfaces.IDexterityItem)
def personsFuzzyIndexer(obj):
if not '(D)' in obj.Type():
return None
return ' '.join(get_doc_persons(obj))
grok.global_adapter(personsFuzzyIndexer, name='docPersonsFuzzy')
@indexer(plone.dexterity.interfaces.IDexterityItem)
def personsStrIndexer(obj):
if not '(D)' in obj.Type():
return None
return ', '.join(get_doc_persons(obj))
grok.global_adapter(personsStrIndexer, name='docPersonsStr')
@indexer(plone.dexterity.interfaces.IDexterityItem)
def statusTitleIndexer(obj):
if obj.portal_type not in ('courrier_entrant', 'courrier_sortant'):
if not '(D)' in obj.Type():
return None
portal_workflow = getToolByName(obj, 'portal_workflow')
current_state = portal_workflow.getStatusOf(
portal_workflow.getChainFor(obj)[0], obj).get('review_state')
for label, id in portal_workflow.listWFStatesByTitle():
if current_state == id:
return label
return current_state
grok.global_adapter(statusTitleIndexer, name='statusTitle')