debian-python-whoosh/tests/test_collector.py

230 lines
7.9 KiB
Python

from __future__ import with_statement
import pytest
from whoosh import collectors, fields, query, searching
from whoosh.compat import b, u, xrange
from whoosh.filedb.filestore import RamStorage
from whoosh.util.testing import TempIndex
def test_add():
schema = fields.Schema(id=fields.STORED, text=fields.TEXT)
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=1, text=u("alfa bravo charlie"))
w.add_document(id=2, text=u("alfa bravo delta"))
w.add_document(id=3, text=u("alfa charlie echo"))
w.commit()
with ix.searcher() as s:
assert s.doc_frequency("text", u("charlie")) == 2
r = s.search(query.Term("text", u("charlie")))
assert [hit["id"] for hit in r] == [1, 3]
assert len(r) == 2
def test_filter_that_matches_no_document():
schema = fields.Schema(id=fields.STORED, text=fields.TEXT)
ix = RamStorage().create_index(schema)
w = ix.writer()
w.add_document(id=1, text=u("alfa bravo charlie"))
w.add_document(id=2, text=u("alfa bravo delta"))
w.commit()
with ix.searcher() as s:
r = s.search(
query.Every(),
filter=query.Term("text", u("echo")))
assert [hit["id"] for hit in r] == []
assert len(r) == 0
def test_timelimit():
schema = fields.Schema(text=fields.TEXT)
ix = RamStorage().create_index(schema)
w = ix.writer()
for _ in xrange(50):
w.add_document(text=u("alfa"))
w.commit()
import time
from whoosh import collectors, matching
class SlowMatcher(matching.WrappingMatcher):
def next(self):
time.sleep(0.02)
self.child.next()
class SlowQuery(query.WrappingQuery):
def matcher(self, searcher, context=None):
return SlowMatcher(self.child.matcher(searcher, context))
with ix.searcher() as s:
oq = query.Term("text", u("alfa"))
sq = SlowQuery(oq)
col = collectors.TimeLimitCollector(s.collector(limit=None),
timelimit=0.1)
with pytest.raises(searching.TimeLimit):
s.search_with_collector(sq, col)
col = collectors.TimeLimitCollector(s.collector(limit=40),
timelimit=0.1)
with pytest.raises(collectors.TimeLimit):
s.search_with_collector(sq, col)
col = collectors.TimeLimitCollector(s.collector(limit=None),
timelimit=0.25)
try:
s.search_with_collector(sq, col)
assert False # Shouldn't get here
except collectors.TimeLimit:
r = col.results()
assert r.scored_length() > 0
col = collectors.TimeLimitCollector(s.collector(limit=None),
timelimit=0.5)
s.search_with_collector(oq, col)
assert col.results().runtime < 0.5
@pytest.mark.skipif("not hasattr(__import__('signal'), 'SIGALRM')")
def test_timelimit_alarm():
import time
from whoosh import matching
class SlowMatcher(matching.Matcher):
def __init__(self):
self._id = 0
def id(self):
return self._id
def is_active(self):
return self._id == 0
def next(self):
time.sleep(10)
self._id = 1
def score(self):
return 1.0
class SlowQuery(query.Query):
def matcher(self, searcher, context=None):
return SlowMatcher()
schema = fields.Schema(text=fields.TEXT)
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(text=u("Hello"))
with ix.searcher() as s:
q = SlowQuery()
t = time.time()
c = s.collector()
c = collectors.TimeLimitCollector(c, 0.2)
with pytest.raises(searching.TimeLimit):
_ = s.search_with_collector(q, c)
assert time.time() - t < 0.5
def test_reverse_collapse():
from whoosh import sorting
schema = fields.Schema(title=fields.TEXT(stored=True),
content=fields.TEXT,
path=fields.ID(stored=True),
tags=fields.KEYWORD,
order=fields.NUMERIC(stored=True))
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(title=u"First document",
content=u"This is my document!",
path=u"/a", tags=u"first", order=20.0)
w.add_document(title=u"Second document",
content=u"This is the second example.",
path=u"/b", tags=u"second", order=12.0)
w.add_document(title=u"Third document",
content=u"Examples are many.",
path=u"/c", tags=u"third", order=15.0)
w.add_document(title=u"Thirdish document",
content=u"Examples are too many.",
path=u"/d", tags=u"third", order=25.0)
with ix.searcher() as s:
q = query.Every('content')
r = s.search(q)
assert [hit["path"] for hit in r] == ["/a", "/b", "/c", "/d"]
q = query.Or([query.Term("title", "document"),
query.Term("content", "document"),
query.Term("tags", "document")])
cf = sorting.FieldFacet("tags")
of = sorting.FieldFacet("order", reverse=True)
r = s.search(q, collapse=cf, collapse_order=of, terms=True)
assert [hit["path"] for hit in r] == ["/a", "/b", "/d"]
def test_termdocs():
schema = fields.Schema(key=fields.TEXT, city=fields.ID)
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(key=u"ant", city=u"london")
w.add_document(key=u"anteater", city=u"roma")
w.add_document(key=u"bear", city=u"london")
w.add_document(key=u"bees", city=u"roma")
w.add_document(key=u"anorak", city=u"london")
w.add_document(key=u"antimatter", city=u"roma")
w.add_document(key=u"angora", city=u"london")
w.add_document(key=u"angels", city=u"roma")
with ix.searcher() as s:
cond_q = query.Term("city", u"london")
pref_q = query.Prefix("key", u"an")
q = query.And([cond_q, pref_q]).normalize()
r = s.search(q, scored=False, terms=True)
field = s.schema["key"]
terms = [field.from_bytes(term) for fieldname, term in r.termdocs
if fieldname == "key"]
assert sorted(terms) == [u"angora", u"anorak", u"ant"]
def test_termdocs2():
schema = fields.Schema(key=fields.TEXT, city=fields.ID)
ix = RamStorage().create_index(schema)
with ix.writer() as w:
w.add_document(key=u"ant", city=u"london")
w.add_document(key=u"anteater", city=u"roma")
w.add_document(key=u"bear", city=u"london")
w.add_document(key=u"bees", city=u"roma")
w.add_document(key=u"anorak", city=u"london")
w.add_document(key=u"antimatter", city=u"roma")
w.add_document(key=u"angora", city=u"london")
w.add_document(key=u"angels", city=u"roma")
with ix.searcher() as s:
# A query that matches the applicable documents
cond_q = query.Term("city", "london")
# Get a list of the documents that match the condition(s)
cond_docnums = set(cond_q.docs(s))
# Grab the suggestion field for later
field = s.schema["key"]
terms = []
# Expand the prefix
for term in s.reader().expand_prefix("key", "an"):
# Get the documents the term is in
for docnum in s.document_numbers(key=term):
# Check if it's in the set matching the condition(s)
if docnum in cond_docnums:
# If so, decode the term from bytes and add it to the list,
# then move on to the next term
terms.append(field.from_bytes(term))
break
assert terms == ["angora", "anorak", "ant"]