This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
tabellio.searchform/tabellio/searchform/form.py

581 lines
22 KiB
Python

# -*- coding: utf-8 -*-
import re
import time
from five import grok
from plone.memoize import instance, ram
from zope import interface, schema, component
from z3c.form import form, field, button
from plone.z3cform.layout import wrap_form
from Products.CMFCore.utils import getToolByName
from zope.interface import implements
from z3c.form.ptcompat import ViewPageTemplateFile
import z3c.form.interfaces
from z3c.form.browser import text
from z3c.form import widget
from z3c.form.interfaces import ITextWidget
from z3c.relationfield.schema import RelationChoice, RelationList
from plone.formwidget.contenttree import ObjPathSourceBinder
from tabellio.searchform.interfaces import MessageFactory as _
from Products.Five import BrowserView
from Products.Five.browser.pagetemplatefile import ViewPageTemplateFile
from zope.schema.interfaces import IContextSourceBinder
from zope.schema.vocabulary import SimpleVocabulary, SimpleTerm
from plone.registry.interfaces import IRegistry
from tabellio.config.interfaces import ITabellioSettings
import tabellio.config.utils
from tabellio.documents.typenames import MAPPING
class ListAuthorsView(BrowserView):
def get_folder_at_path(self, path):
current = self.portal
for part in path.split('/'):
if not part:
continue
current = getattr(current, part)
return current
_deputies_folder = None
def deputies_folder(self):
if self._deputies_folder:
return self._deputies_folder
path = self.settings.deputiesPath
self._deputies_folder = self.get_folder_at_path(path)
return self._deputies_folder
deputies_folder = property(deputies_folder)
def __call__(self):
self.portal = getToolByName(self.context, 'portal_url').getPortalObject()
self.settings = component.getUtility(IRegistry).forInterface(ITabellioSettings, False)
self.request.response.setHeader('Content-type', 'text/plain')
id = self.request.form.get('id')
if id:
try:
t = getattr(self.deputies_folder, id)
except AttributeError:
return 'XXX'
return t.Title()
s = []
q = self.request.form.get('q').lower()
for object in self.deputies_folder.objectValues():
if object.portal_type != 'themis.datatypes.deputy':
continue
if q in object.id or q in object.Title().lower():
s.append(object)
return '\n'.join(['%s|%s' % (x.Title(), x.id) for x in s])
class ListPolgroupsView(BrowserView):
def get_folder_at_path(self, path):
current = self.portal
for part in path.split('/'):
if not part:
continue
current = getattr(current, part)
return current
_polgroups_folder = None
def polgroups_folder(self):
if self._polgroups_folder:
return self._polgroups_folder
path = self.settings.polgroupsPath
self._polgroups_folder = self.get_folder_at_path(path)
return self._polgroups_folder
polgroups_folder = property(polgroups_folder)
def __call__(self):
self.portal = getToolByName(self.context, 'portal_url').getPortalObject()
self.settings = component.getUtility(IRegistry).forInterface(ITabellioSettings, False)
self.request.response.setHeader('Content-type', 'text/plain')
id = self.request.form.get('id')
if id:
try:
t = getattr(self.polgroups_folder, id)
except AttributeError:
return 'XXX'
return t.Title()
s = []
q = self.request.form.get('q').lower()
for object in self.polgroups_folder.objectValues():
if object.portal_type != 'themis.datatypes.polgroup':
continue
if q in object.id or q in object.Title().lower():
s.append(object)
return '\n'.join(['%s|%s' % (x.Title(), x.id) for x in s])
class ListTopicsView(BrowserView):
def __call__(self):
from plone.i18n.normalizer.fr import normalizer
topics = tabellio.config.utils.get_topics_dict()
self.request.response.setHeader('Content-type', 'text/plain')
id = self.request.form.get('id')
if id:
try:
return '%s' % topics.get(id, id)[0]
except KeyError:
return 'XXX'
query_terms = [normalizer.normalize(x).lower() for x in self.request.form.get('q').split()]
r = []
for key, value in topics.items():
for term in query_terms:
if not term in value[1]:
break
else:
r.append('%s|%s' % (value[0], key))
if len(r) > 30:
break
return '\n'.join(sorted(r))
class IAuthorsWidget(ITextWidget):
pass
class AuthorsWidget(text.TextWidget):
implements(IAuthorsWidget)
klass = u'authors'
def FieldAuthorsWidget(field, request):
return widget.FieldWidget(field, AuthorsWidget(request))
class IPolgroupsWidget(ITextWidget):
pass
class PolgroupsWidget(text.TextWidget):
implements(IPolgroupsWidget)
klass = u'polgroups'
def FieldPolgroupsWidget(field, request):
return widget.FieldWidget(field, PolgroupsWidget(request))
class ITopicsWidget(ITextWidget):
pass
class TopicsWidget(text.TextWidget):
implements(ITopicsWidget)
klass = u'topics'
def FieldTopicsWidget(field, request):
return widget.FieldWidget(field, TopicsWidget(request))
class IFolderWithDocuments(interface.Interface):
pass
class IFolderWithPfbDocuments(interface.Interface):
pass
@grok.provider(IContextSourceBinder)
def possible_document_types(context):
catalog = getToolByName(context, 'portal_catalog')
possible_doctypes = catalog.uniqueValuesFor('doctype')
terms = []
for doctype in possible_doctypes:
if not doctype:
continue
doctype_id = doctype.encode('ascii', 'replace')
doctype_str = MAPPING.get(doctype, doctype)
terms.append(SimpleVocabulary.createTerm(doctype_id, doctype_id, doctype_str))
return SimpleVocabulary(terms)
@grok.provider(IContextSourceBinder)
def possible_dossier_types(context):
catalog = getToolByName(context, 'portal_catalog')
possible_dostypes = catalog.uniqueValuesFor('dostype')
terms = []
for dostype in possible_dostypes:
if not dostype:
continue
dostype_id = dostype.encode('ascii', 'replace')
dostype_str = MAPPING.get(dostype, dostype)
terms.append(SimpleVocabulary.createTerm(dostype_id, dostype_id, dostype_str))
return SimpleVocabulary(terms)
@grok.provider(IContextSourceBinder)
def possible_question_types(context):
catalog = getToolByName(context, 'portal_catalog')
possible_questypes = catalog.uniqueValuesFor('questype')
terms = []
for questype in possible_questypes:
if not questype:
continue
questype_id = questype.encode('ascii', 'replace')
questype_str = MAPPING.get(questype, questype)
terms.append(SimpleVocabulary.createTerm(questype_id, questype_id, questype_str))
return SimpleVocabulary(terms)
@grok.provider(IContextSourceBinder)
def possible_sessions(context):
terms = []
for term in tabellio.config.utils.get_legisl_and_sessions():
term_id = term.encode('ascii', 'replace')
terms.append(SimpleVocabulary.createTerm(term_id, term_id, term))
return SimpleVocabulary(terms)
class IDocumentSearch(interface.Interface):
search_type_is_document = schema.TextLine(title=u'Search Type', default=u'1', required=False)
nodoc = schema.TextLine(title=_(u'Document Number'), required=False)
nosuite = schema.TextLine(title=_(u'Suite Number'), required=False)
doctype = schema.Choice(title=_(u'Type'), required=False,
source=possible_document_types)
ttitle = schema.TextLine(title=_(u'Title'), required=False)
text = schema.TextLine(title=_(u'Text'), required=False)
authors = schema.TextLine(title=_(u'Authors'), required=False)
polgroups = schema.TextLine(title=_(u'Political Groups'), required=False)
topics = schema.TextLine(title=_(u'Topics'), required=False)
session = schema.Choice(title=_(u'Legislature / Session'), required=False,
source=possible_sessions)
start = schema.Date(title=_(u'Start'), required=False)
end = schema.Date(title=_(u'End'), required=False)
class DocumentSearchForm(form.Form):
method = 'get'
prefix = 'document'
fields = field.Fields(IDocumentSearch)
fields['authors'].widgetFactory = FieldAuthorsWidget
fields['polgroups'].widgetFactory = FieldPolgroupsWidget
fields['topics'].widgetFactory = FieldTopicsWidget
ignoreContext = True
template = ViewPageTemplateFile('form_templates/view_effectivesearch.pt')
def updateWidgets(self):
super(DocumentSearchForm, self).updateWidgets()
self.widgets['search_type_is_document'].mode = z3c.form.interfaces.HIDDEN_MODE
@button.buttonAndHandler(_(u'Search'))
def handleApply(self, action):
data, errors = self.extractData()
if not errors and False:
plone_utils = getToolByName(self.context.context, 'plone_utils')
plone_utils.addPortalMessage(_('Your search has been completed!'))
return self.request.response.redirect('./')
return
class IDossierSearch(interface.Interface):
search_type_is_dossier = schema.TextLine(title=u'Search Type', default=u'1', required=False)
nodos = schema.TextLine(title=_(u'Dossier Number'), required=False)
dostype = schema.Choice(title=_(u'Type'), required=False,
source=possible_dossier_types)
ttitle = schema.TextLine(title=_(u'Title'), required=False)
authors = schema.TextLine(title=_(u'Authors'), required=False)
polgroups = schema.TextLine(title=_(u'Political Groups'), required=False)
topics = schema.TextLine(title=_(u'Topics'), required=False)
session = schema.Choice(title=_(u'Legislature / Session'), required=False,
source=possible_sessions)
start = schema.Date(title=_(u'Start'), required=False)
end = schema.Date(title=_(u'End'), required=False)
class DossierSearchForm(form.Form):
prefix = 'dossier'
method = 'get'
fields = field.Fields(IDossierSearch)
fields['authors'].widgetFactory = FieldAuthorsWidget
fields['polgroups'].widgetFactory = FieldPolgroupsWidget
fields['topics'].widgetFactory = FieldTopicsWidget
ignoreContext = True
template = ViewPageTemplateFile('form_templates/view_dossier_search.pt')
def updateWidgets(self):
super(DossierSearchForm, self).updateWidgets()
self.widgets['search_type_is_dossier'].mode = z3c.form.interfaces.HIDDEN_MODE
@button.buttonAndHandler(_(u'Search'))
def handleApply(self, action):
data, errors = self.extractData()
if not errors and False:
plone_utils = getToolByName(self.context.context, 'plone_utils')
plone_utils.addPortalMessage(_('Your search has been completed!'))
return self.request.response.redirect('./')
return
class IQuestionSearch(interface.Interface):
search_type_is_question = schema.TextLine(title=u'Search Type', default=u'1', required=False)
questype = schema.Choice(title=_(u'Type'), required=False,
source=possible_question_types)
ttitle = schema.TextLine(title=_(u'Title'), required=False)
authors = schema.TextLine(title=_(u'Authors'), required=False)
polgroups = schema.TextLine(title=_(u'Political Groups'), required=False)
topics = schema.TextLine(title=_(u'Topics'), required=False)
session = schema.Choice(title=_(u'Legislature / Session'), required=False,
source=possible_sessions)
start = schema.Date(title=_(u'Start'), required=False)
end = schema.Date(title=_(u'End'), required=False)
class QuestionSearchForm(form.Form):
prefix = 'question'
fields = field.Fields(IQuestionSearch)
fields['authors'].widgetFactory = FieldAuthorsWidget
fields['polgroups'].widgetFactory = FieldPolgroupsWidget
fields['topics'].widgetFactory = FieldTopicsWidget
ignoreContext = True
template = ViewPageTemplateFile('form_templates/view_question_search.pt')
def updateWidgets(self):
super(QuestionSearchForm, self).updateWidgets()
self.widgets['search_type_is_question'].mode = z3c.form.interfaces.HIDDEN_MODE
@button.buttonAndHandler(_(u'Search'))
def handleApply(self, action):
data, errors = self.extractData()
if not errors and False:
plone_utils = getToolByName(self.context.context, 'plone_utils')
plone_utils.addPortalMessage(_('Your search has been completed!'))
return self.request.response.redirect('./')
return
class IDocumentPfbSearch(interface.Interface):
search_type_is_document = schema.TextLine(title=u'Search Type', default=u'1')
nodoc = schema.TextLine(title=_(u'Document Number'), required=False)
doctype = schema.Choice(title=_(u'Type'), required=False,
source=possible_document_types)
ttitle = schema.TextLine(title=_(u'Title'), required=False)
text = schema.TextLine(title=_(u'Text'), required=False)
authors = schema.TextLine(title=_(u'Authors'), required=False)
polgroups = schema.TextLine(title=_(u'Political Groups'), required=False)
topics = schema.TextLine(title=_(u'Topics'), required=False)
session = schema.Choice(title=_(u'Legislature / Session'), required=False,
source=possible_sessions)
start = schema.Date(title=_(u'Start'), required=False)
end = schema.Date(title=_(u'End'), required=False)
sort_on = schema.Choice(title=_(u'Sort By'), required=True,
values=[_(u'Type'), _(u'Number'), _(u'Session')])
class DocumentPfbSearchForm(form.Form):
method = 'get'
prefix = 'document'
fields = field.Fields(IDocumentPfbSearch)
fields['authors'].widgetFactory = FieldAuthorsWidget
fields['polgroups'].widgetFactory = FieldPolgroupsWidget
fields['topics'].widgetFactory = FieldTopicsWidget
ignoreContext = True
template = ViewPageTemplateFile('form_templates/view_pfbdocsearch.pt')
def updateWidgets(self):
super(DocumentPfbSearchForm, self).updateWidgets()
self.widgets['search_type_is_document'].mode = z3c.form.interfaces.HIDDEN_MODE
@button.buttonAndHandler(_(u'Search'))
def handleApply(self, action):
data, errors = self.extractData()
if not errors and False:
plone_utils = getToolByName(self.context.context, 'plone_utils')
plone_utils.addPortalMessage(_('Your search has been completed!'))
return self.request.response.redirect('./')
return
class IGlobalSearchForm(interface.Interface):
search_type_is_document = schema.TextLine(title=u'Search Type', required=False)
search_type_is_dossier = schema.TextLine(title=u'Search Type', required=False)
search_type_is_question = schema.TextLine(title=u'Search Type', required=False)
nodoc = schema.TextLine(title=_(u'Document Number'), required=False)
nosuite = schema.TextLine(title=_(u'Suite Number'), required=False)
doctype = schema.Choice(title=_(u'Type'), required=False,
source=possible_document_types)
nodos = schema.TextLine(title=_(u'Dossier Number'), required=False)
dostype = schema.Choice(title=_(u'Type'), required=False,
source=possible_dossier_types)
questype = schema.Choice(title=_(u'Type'), required=False,
source=possible_question_types)
ttitle = schema.TextLine(title=_(u'Title'), required=False)
text = schema.TextLine(title=_(u'Text'), required=False)
authors = schema.TextLine(title=_(u'Authors'), required=False)
polgroups = schema.TextLine(title=_(u'Poltical Groups'), required=False)
topics = schema.TextLine(title=_(u'Topics'), required=False)
session = schema.Choice(title=_(u'Legislature / Session'), required=False,
source=possible_sessions)
start = schema.Date(title=_(u'Start'), required=False)
end = schema.Date(title=_(u'End'), required=False)
class GlobalSearchForm(form.Form):
fields = field.Fields(IGlobalSearchForm)
ignoreContext = True
class SearchView(BrowserView):
batch_macros = ViewPageTemplateFile('batch_macros.pt')
def db_connection(self):
portal = getToolByName(self.context, 'portal_url').getPortalObject()
return portal.db._wrapper.connection
db_connection = property(db_connection)
def _get_ids_cache_key(method, self, text):
return (text, )
@ram.cache(_get_ids_cache_key)
def get_ids_from_postgres(self, text):
try:
cursor = self.db_connection.cursor()
except AttributeError:
return []
# looks like there's no way to quote things properly for to_tsquery,
from plone.i18n.normalizer.fr import normalizer
text = re.sub(r'[^\w\s]', ' ', normalizer.normalize(text))
cursor.execute("""SELECT t_document.id
FROM t_document JOIN t_file
ON (t_document.text1id = t_file.fileid OR
t_document.textdefid = t_file.fileid)
WHERE object_fts @@ to_tsquery('default_french', '''%s''')""" % text)
ids = [x[0] for x in cursor.fetchall()]
cursor.close()
return ids
def get_batchlinkparams(self):
d = dict()
for key in self.request.form:
d[key] = self.request.form[key]
if type(d[key]) is str:
d[key] = unicode(d[key], 'utf-8').encode('utf-8')
elif type(d[key]) is unicode:
d[key] = d[key].encode('utf-8')
return d
def search_results(self, search_type):
#print 'search type:', search_type
if self.request.form.get('document.widgets.search_type_is_document'):
GlobalSearchForm.prefix = 'document'
elif self.request.form.get('dossier.widgets.search_type_is_dossier'):
GlobalSearchForm.prefix = 'dossier'
elif self.request.form.get('question.widgets.search_type_is_question'):
GlobalSearchForm.prefix = 'question'
f = GlobalSearchForm(self.context, self.request)
f.update()
data, errors = f.extractData()
kw = {}
if not data.get('search_type_is_%s' % search_type):
return None
if data.get('ttitle'):
kw['Title'] = data.get('ttitle')
if data.get('nodoc'):
kw['no'] = data.get('nodoc')
if data.get('nosuite'):
kw['nodoc'] = data.get('nosuite')
if data.get('nodos'):
kw['nodos'] = data.get('nodos')
if data.get('doctype'):
kw['doctype'] = possible_document_types(self.context).getTermByToken(data.get('doctype')).title
if data.get('dostype'):
kw['dostype'] = possible_dossier_types(self.context).getTermByToken(data.get('dostype')).title
if data.get('start') and data.get('end'):
kw['dateDoc'] = {'query': [data.get('start'), data.get('end')], 'range': 'minmax'}
elif data.get('start'):
kw['dateDoc'] = {'query': data.get('start'), 'range': 'min'}
elif data.get('end'):
kw['dateDoc'] = {'query': data.get('end'), 'range': 'max'}
if data.get('session'):
kw['session'] = tabellio.config.utils.get_list_of_sessions(data.get('session'))
if data.get('authors'):
kw['authorsDoc'] = {'query': data.get('authors').strip().split(),
'operator': 'and'}
if data.get('polgroups'):
kw['polgroupsDoc'] = {'query': data.get('polgroups').strip().split(),
'operator': 'and'}
if data.get('topics'):
kw['topics'] = {'query': data.get('topics').strip().split(),
'operator': 'and'}
if data.get('sort_on') == 'Type':
kw['sort_on'] = 'doctype'
kw['sort_order'] = 'ascending'
elif data.get('sort_on') == 'Number':
kw['sort_on'] = 'no'
kw['sort_order'] = 'ascending'
if not kw and not data.get('text'):
return []
if data.get('text'):
# plaintext search, get document ids from postgresql
kw['id'] = self.get_ids_from_postgres(data.get('text'))
if data.get('search_type_is_document'):
kw['portal_type'] = 'tabellio.documents.document'
elif data.get('search_type_is_dossier'):
kw['portal_type'] = 'tabellio.documents.dossier'
elif data.get('search_type_is_question'):
kw['portal_type'] = 'tabellio.documents.question'
if not kw.get('sort_on') and data.get('search_type_is_document'):
kw['sort_on'] = 'dateDoc'
kw['sort_order'] = 'descending'
print 'kw:', kw
print '--'
catalog = getToolByName(self.context, 'portal_catalog')
return catalog(**kw)
def document_pfb_search_form(self):
f = DocumentPfbSearchForm(self.context, self.request)
f.update()
return f.render()
def document_search_form(self):
f = DocumentSearchForm(self.context, self.request)
f.update()
return f.render()
def document_search_results(self):
return self.search_results(search_type='document')
def dossier_search_form(self):
f = DossierSearchForm(self.context, self.request)
f.update()
return f.render()
def dossier_search_results(self):
return self.search_results(search_type='dossier')
def question_search_form(self):
f = QuestionSearchForm(self.context, self.request)
f.update()
return f.render()
def question_search_results(self):
return self.search_results(search_type='question')