This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
plone.formwidget.contenttree/plone/formwidget/contenttree/source.py

251 lines
8.8 KiB
Python

import logging
import Missing
from zope.interface import implements
from zope.component import getMultiAdapter
from zope.schema.interfaces import IContextSourceBinder
from zope.schema.vocabulary import SimpleTerm
from plone.app.layout.navigation.interfaces import INavigationQueryBuilder
from plone.app.layout.navigation.root import getNavigationRootObject
from plone.app.vocabularies.catalog import parse_query
from Products.CMFCore.utils import getToolByName
from Products.ZCTextIndex.ParseTree import ParseError
from plone.formwidget.contenttree.interfaces import IContentSource
from plone.formwidget.contenttree.interfaces import IContentFilter
from plone.formwidget.contenttree.utils import closest_content
from OFS.interfaces import ITraversable
logger = logging.getLogger(__name__)
class CustomFilter(object):
"""A filter that can be used to test simple values in brain metadata and
indexes.
Limitations:
- Will probably only work on FieldIndex, KeywordIndex and PathIndex indexes
"""
implements(IContentFilter)
def __init__(self, **kw):
self.criteria = {}
for key, value in kw.items():
if (not isinstance(value, (list, tuple, set, frozenset))
and not key == 'path'):
self.criteria[key] = [value]
elif isinstance(value, (set, frozenset)):
self.criteria[key] = list(value)
else:
self.criteria[key] = value
def __call__(self, brain, index_data):
for key, value in self.criteria.items():
test_value = index_data.get(key, None)
if test_value is not None:
if (not isinstance(test_value, (list, tuple, set, frozenset))
and not key == 'path'):
test_value = set([test_value])
elif isinstance(value, (list, tuple)):
test_value = set(test_value)
if key == 'path':
if not test_value.startswith(value['query']):
return False
elif not test_value.intersection(value):
return False
return True
class PathSource(object):
implements(IContentSource)
def __init__(self, context, selectable_filter, navigation_tree_query=None):
self.context = context
nav_root = getNavigationRootObject(context, None)
query_builder = getMultiAdapter((nav_root, self),
INavigationQueryBuilder)
query = query_builder()
if navigation_tree_query is None:
navigation_tree_query = {}
# Copy path from selectable_filter into the navigation_tree_query
# normally it does not make sense to show elements that wouldn't be
# selectable anyway and are unneeded to navigate to selectable items
if ('path' not in navigation_tree_query
and 'path' in selectable_filter.criteria):
navigation_tree_query['path'] = selectable_filter.criteria['path']
query.update(navigation_tree_query)
self.navigation_tree_query = query
self.selectable_filter = selectable_filter
self.catalog = getToolByName(context, "portal_catalog")
portal_tool = getToolByName(context, "portal_url")
self.portal_path = portal_tool.getPortalPath()
# Tokenised vocabulary API
def __iter__(self):
return [].__iter__()
def __contains__(self, value):
try:
brain = self._getBrainByValue(value)
# If brain was not found, assume item is still good. This seems
# somewhat nonsensical, but:-
# (a) If an item is invisible to the current editor, they should
# be able to keep the item there.
# (b) If using PathSource and an item the path points to was
# deleted by mistake, I may want save my changes then restore
# this page, rather than remove the relation.
if brain is None:
return True
return self.isBrainSelectable(brain)
except (KeyError, IndexError):
return False
def getTermByToken(self, token):
if token.startswith('#error-missing-'):
value = token.partition('#error-missing-')[2]
return self._placeholderTerm(value)
brain = self._getBrainByToken(token)
if not self.isBrainSelectable(brain):
raise LookupError(token)
return self.getTermByBrain(brain)
def getTerm(self, value):
brain = self._getBrainByValue(value)
if brain is None:
return self._placeholderTerm(value)
if not self.isBrainSelectable(brain):
raise LookupError('Value "%s" does not match criteria for field'
, str(value))
return self.getTermByBrain(brain)
# Query API - used to locate content, e.g. in non-JS mode
def search(self, query, limit=20):
catalog_query = self.selectable_filter.criteria.copy()
catalog_query.update(parse_query(query, self.portal_path))
if limit and 'sort_limit' not in catalog_query:
catalog_query['sort_limit'] = limit
try:
results = (self.getTermByBrain(brain, real_value=False)
for brain in self.catalog(**catalog_query))
except ParseError:
return []
return results
def isBrainSelectable(self, brain):
if brain is None:
return False
index_data = self.catalog.getIndexDataForRID(brain.getRID())
return self.selectable_filter(brain, index_data)
def getTermByBrain(self, brain, real_value=True):
value = brain.getPath()[len(self.portal_path):]
return SimpleTerm(value, token=brain.getPath(), title=brain.Title or
brain.id)
def tokenToPath(self, token):
# token==path for existing sources, but may not be true in future
return token
# Helper functions
def _getBrainByToken(self, token):
rid = self.catalog.getrid(token)
if not(rid):
return None
return self.catalog._catalog[rid]
def _getBrainByValue(self, value):
if ITraversable.providedBy(value):
token = '/'.join(value.getPhysicalPath())
else:
token = self.portal_path + value
return self._getBrainByToken(token)
# Generate a term to persist the value, even when we can't resolve the
# brain. These will get hidden in the display templates
def _placeholderTerm(self, value):
return SimpleTerm(str(value),
token='#error-missing-' + str(value),
title=u"Hidden or missing item '%s'" % value)
class ObjPathSource(PathSource):
def _getBrainByValue(self, value):
return self._getBrainByToken('/'.join(value.getPhysicalPath()))
def getTermByBrain(self, brain, real_value=True):
if real_value:
value = brain._unrestrictedGetObject()
else:
value = brain.getPath()[len(self.portal_path):]
return SimpleTerm(value, token=brain.getPath(), title=brain.Title or
brain.id)
class UUIDSource(PathSource):
"""
A source that stores UUIDs as values, so references don't get broken if
content is moved.
"""
def _getBrainByValue(self, value):
try:
return self.catalog(UID=value)[0]
except (KeyError, IndexError):
return None
def getTermByBrain(self, brain, real_value=True):
value = brain.UID
if value is Missing.Value:
# This is likely to give problems at some point.
logger.warn("Brain in UUIDSource has missing UID value. Maybe you "
"need to enable plone.app.referenceablebehavior on "
"portal type %s?", brain.portal_type)
return SimpleTerm(value, token=brain.getPath(), title=brain.Title or
brain.id)
class PathSourceBinder(object):
implements(IContextSourceBinder)
path_source = PathSource
def __init__(self, navigation_tree_query=None, **kw):
self.selectable_filter = CustomFilter(**kw)
self.navigation_tree_query = navigation_tree_query
def __call__(self, context):
return self.path_source(
closest_content(context),
selectable_filter=self.selectable_filter,
navigation_tree_query=self.navigation_tree_query)
def __contains__(self, value):
# If used without being properly bound (looks at DataGridField), bind
# now and pass through to the bound version
return self(None).__contains__(value)
class ObjPathSourceBinder(PathSourceBinder):
path_source = ObjPathSource
class UUIDSourceBinder(PathSourceBinder):
path_source = UUIDSource