636 lines
22 KiB
Python
636 lines
22 KiB
Python
from __future__ import with_statement
|
|
|
|
import pytest
|
|
|
|
from whoosh import analysis, fields, formats, highlight, qparser, query
|
|
from whoosh.codec.whoosh3 import W3Codec
|
|
from whoosh.compat import u, xrange, text_type, permutations
|
|
from whoosh.filedb.filestore import RamStorage
|
|
from whoosh.util.testing import TempStorage, TempIndex
|
|
|
|
|
|
def test_score_retrieval():
|
|
schema = fields.Schema(title=fields.TEXT(stored=True),
|
|
content=fields.TEXT(stored=True))
|
|
storage = RamStorage()
|
|
ix = storage.create_index(schema)
|
|
writer = ix.writer()
|
|
writer.add_document(title=u("Miss Mary"),
|
|
content=u("Mary had a little white lamb its fleece"
|
|
" was white as snow"))
|
|
writer.add_document(title=u("Snow White"),
|
|
content=u("Snow white lived in the forest with seven"
|
|
" dwarfs"))
|
|
writer.commit()
|
|
|
|
with ix.searcher() as s:
|
|
results = s.search(query.Term("content", "white"))
|
|
assert len(results) == 2
|
|
assert results[0]['title'] == u("Miss Mary")
|
|
assert results[1]['title'] == u("Snow White")
|
|
assert results.score(0) is not None
|
|
assert results.score(0) != 0
|
|
assert results.score(0) != 1
|
|
|
|
|
|
def test_resultcopy():
|
|
schema = fields.Schema(a=fields.TEXT(stored=True))
|
|
st = RamStorage()
|
|
ix = st.create_index(schema)
|
|
|
|
w = ix.writer()
|
|
w.add_document(a=u("alfa bravo charlie"))
|
|
w.add_document(a=u("bravo charlie delta"))
|
|
w.add_document(a=u("charlie delta echo"))
|
|
w.add_document(a=u("delta echo foxtrot"))
|
|
w.commit()
|
|
|
|
with ix.searcher() as s:
|
|
r = s.search(qparser.QueryParser("a", None).parse(u("charlie")))
|
|
assert len(r) == 3
|
|
rcopy = r.copy()
|
|
assert r.top_n == rcopy.top_n
|
|
|
|
|
|
def test_resultslength():
|
|
schema = fields.Schema(id=fields.ID(stored=True),
|
|
value=fields.TEXT)
|
|
ix = RamStorage().create_index(schema)
|
|
|
|
w = ix.writer()
|
|
w.add_document(id=u("1"), value=u("alfa alfa alfa alfa alfa"))
|
|
w.add_document(id=u("2"), value=u("alfa alfa alfa alfa"))
|
|
w.add_document(id=u("3"), value=u("alfa alfa alfa"))
|
|
w.add_document(id=u("4"), value=u("alfa alfa"))
|
|
w.add_document(id=u("5"), value=u("alfa"))
|
|
w.add_document(id=u("6"), value=u("bravo"))
|
|
w.commit()
|
|
|
|
with ix.searcher() as s:
|
|
r = s.search(query.Term("value", u("alfa")), limit=3)
|
|
assert len(r) == 5
|
|
assert r.scored_length() == 3
|
|
assert r[10:] == []
|
|
|
|
|
|
def test_combine():
|
|
schema = fields.Schema(id=fields.ID(stored=True),
|
|
value=fields.TEXT)
|
|
ix = RamStorage().create_index(schema)
|
|
w = ix.writer()
|
|
w.add_document(id=u("1"), value=u("alfa bravo charlie all"))
|
|
w.add_document(id=u("2"), value=u("bravo charlie echo all"))
|
|
w.add_document(id=u("3"), value=u("charlie echo foxtrot all"))
|
|
w.add_document(id=u("4"), value=u("echo foxtrot india all"))
|
|
w.add_document(id=u("5"), value=u("foxtrot india juliet all"))
|
|
w.add_document(id=u("6"), value=u("india juliet alfa all"))
|
|
w.add_document(id=u("7"), value=u("juliet alfa bravo all"))
|
|
w.add_document(id=u("8"), value=u("charlie charlie charlie all"))
|
|
w.commit()
|
|
|
|
with ix.searcher() as s:
|
|
def idsof(r):
|
|
return "".join(hit["id"] for hit in r)
|
|
|
|
def check(r1, methodname, r2, ids):
|
|
getattr(r1, methodname)(r2)
|
|
assert idsof(r1) == ids
|
|
|
|
def rfor(t):
|
|
return s.search(query.Term("value", t))
|
|
|
|
assert idsof(rfor(u("foxtrot"))) == "345"
|
|
check(rfor(u("foxtrot")), "extend", rfor("charlie"), "345812")
|
|
check(rfor(u("foxtrot")), "filter", rfor("juliet"), "5")
|
|
check(rfor(u("charlie")), "filter", rfor("foxtrot"), "3")
|
|
check(rfor(u("all")), "filter", rfor("foxtrot"), "345")
|
|
check(rfor(u("all")), "upgrade", rfor("india"), "45612378")
|
|
check(rfor(u("charlie")), "upgrade_and_extend", rfor("echo"), "23814")
|
|
|
|
|
|
def test_results_filter():
|
|
schema = fields.Schema(id=fields.STORED, words=fields.KEYWORD(stored=True))
|
|
ix = RamStorage().create_index(schema)
|
|
w = ix.writer()
|
|
w.add_document(id="1", words=u("bravo top"))
|
|
w.add_document(id="2", words=u("alfa top"))
|
|
w.add_document(id="3", words=u("alfa top"))
|
|
w.add_document(id="4", words=u("alfa bottom"))
|
|
w.add_document(id="5", words=u("bravo bottom"))
|
|
w.add_document(id="6", words=u("charlie bottom"))
|
|
w.add_document(id="7", words=u("charlie bottom"))
|
|
w.commit()
|
|
|
|
with ix.searcher() as s:
|
|
def check(r, target):
|
|
result = "".join(s.stored_fields(d)["id"] for d in r.docs())
|
|
assert result == target
|
|
|
|
r = s.search(query.Term("words", u("alfa")))
|
|
r.filter(s.search(query.Term("words", u("bottom"))))
|
|
check(r, "4")
|
|
|
|
|
|
def test_extend_empty():
|
|
schema = fields.Schema(id=fields.STORED, words=fields.KEYWORD)
|
|
ix = RamStorage().create_index(schema)
|
|
w = ix.writer()
|
|
w.add_document(id=1, words=u("alfa bravo charlie"))
|
|
w.add_document(id=2, words=u("bravo charlie delta"))
|
|
w.add_document(id=3, words=u("charlie delta echo"))
|
|
w.add_document(id=4, words=u("delta echo foxtrot"))
|
|
w.add_document(id=5, words=u("echo foxtrot golf"))
|
|
w.commit()
|
|
|
|
with ix.searcher() as s:
|
|
# Get an empty results object
|
|
r1 = s.search(query.Term("words", u("hotel")))
|
|
# Copy it
|
|
r1c = r1.copy()
|
|
# Get a non-empty results object
|
|
r2 = s.search(query.Term("words", u("delta")))
|
|
# Copy it
|
|
r2c = r2.copy()
|
|
# Extend r1 with r2
|
|
r1c.extend(r2c)
|
|
assert [hit["id"] for hit in r1c] == [2, 3, 4]
|
|
assert r1c.scored_length() == 3
|
|
|
|
|
|
def test_extend_filtered():
|
|
schema = fields.Schema(id=fields.STORED, text=fields.TEXT(stored=True))
|
|
ix = RamStorage().create_index(schema)
|
|
w = ix.writer()
|
|
w.add_document(id=1, text=u("alfa bravo charlie"))
|
|
w.add_document(id=2, text=u("bravo charlie delta"))
|
|
w.add_document(id=3, text=u("juliet delta echo"))
|
|
w.add_document(id=4, text=u("delta bravo alfa"))
|
|
w.add_document(id=5, text=u("foxtrot sierra tango"))
|
|
w.commit()
|
|
|
|
hits = lambda result: [hit["id"] for hit in result]
|
|
|
|
with ix.searcher() as s:
|
|
r1 = s.search(query.Term("text", u("alfa")), filter=set([1, 4]))
|
|
assert r1.allowed == set([1, 4])
|
|
assert len(r1.top_n) == 0
|
|
|
|
r2 = s.search(query.Term("text", u("bravo")))
|
|
assert len(r2.top_n) == 3
|
|
assert hits(r2) == [1, 2, 4]
|
|
|
|
r3 = r1.copy()
|
|
assert r3.allowed == set([1, 4])
|
|
assert len(r3.top_n) == 0
|
|
r3.extend(r2)
|
|
assert len(r3.top_n) == 3
|
|
assert hits(r3) == [1, 2, 4]
|
|
|
|
|
|
def test_pages():
|
|
from whoosh.scoring import Frequency
|
|
|
|
schema = fields.Schema(id=fields.ID(stored=True), c=fields.TEXT)
|
|
ix = RamStorage().create_index(schema)
|
|
|
|
w = ix.writer()
|
|
w.add_document(id=u("1"), c=u("alfa alfa alfa alfa alfa alfa"))
|
|
w.add_document(id=u("2"), c=u("alfa alfa alfa alfa alfa"))
|
|
w.add_document(id=u("3"), c=u("alfa alfa alfa alfa"))
|
|
w.add_document(id=u("4"), c=u("alfa alfa alfa"))
|
|
w.add_document(id=u("5"), c=u("alfa alfa"))
|
|
w.add_document(id=u("6"), c=u("alfa"))
|
|
w.commit()
|
|
|
|
with ix.searcher(weighting=Frequency) as s:
|
|
q = query.Term("c", u("alfa"))
|
|
r = s.search(q)
|
|
assert [d["id"] for d in r] == ["1", "2", "3", "4", "5", "6"]
|
|
r = s.search_page(q, 2, pagelen=2)
|
|
assert [d["id"] for d in r] == ["3", "4"]
|
|
|
|
r = s.search_page(q, 2, pagelen=4)
|
|
assert r.total == 6
|
|
assert r.pagenum == 2
|
|
assert r.pagelen == 2
|
|
|
|
|
|
def test_pages_with_filter():
|
|
from whoosh.scoring import Frequency
|
|
|
|
schema = fields.Schema(id=fields.ID(stored=True),
|
|
type=fields.TEXT(),
|
|
c=fields.TEXT)
|
|
ix = RamStorage().create_index(schema)
|
|
|
|
w = ix.writer()
|
|
w.add_document(id=u("1"), type=u("odd"), c=u("alfa alfa alfa alfa alfa alfa"))
|
|
w.add_document(id=u("2"), type=u("even"), c=u("alfa alfa alfa alfa alfa"))
|
|
w.add_document(id=u("3"), type=u("odd"), c=u("alfa alfa alfa alfa"))
|
|
w.add_document(id=u("4"), type=u("even"), c=u("alfa alfa alfa"))
|
|
w.add_document(id=u("5"), type=u("odd"), c=u("alfa alfa"))
|
|
w.add_document(id=u("6"), type=u("even"), c=u("alfa"))
|
|
w.commit()
|
|
|
|
with ix.searcher(weighting=Frequency) as s:
|
|
q = query.Term("c", u("alfa"))
|
|
filterq = query.Term("type", u("even"))
|
|
r = s.search(q, filter=filterq)
|
|
assert [d["id"] for d in r] == ["2", "4", "6"]
|
|
r = s.search_page(q, 2, pagelen=2, filter=filterq)
|
|
assert [d["id"] for d in r] == ["6"]
|
|
|
|
|
|
def test_extra_slice():
|
|
schema = fields.Schema(key=fields.ID(stored=True))
|
|
ix = RamStorage().create_index(schema)
|
|
w = ix.writer()
|
|
for char in u("abcdefghijklmnopqrstuvwxyz"):
|
|
w.add_document(key=char)
|
|
w.commit()
|
|
|
|
with ix.searcher() as s:
|
|
r = s.search(query.Every(), limit=5)
|
|
assert r[6:7] == []
|
|
|
|
|
|
def test_page_counts():
|
|
from whoosh.scoring import Frequency
|
|
|
|
schema = fields.Schema(id=fields.ID(stored=True))
|
|
st = RamStorage()
|
|
ix = st.create_index(schema)
|
|
|
|
w = ix.writer()
|
|
for i in xrange(10):
|
|
w.add_document(id=text_type(i))
|
|
w.commit()
|
|
|
|
with ix.searcher(weighting=Frequency) as s:
|
|
q = query.Every("id")
|
|
|
|
r = s.search(q)
|
|
assert len(r) == 10
|
|
|
|
with pytest.raises(ValueError):
|
|
s.search_page(q, 0)
|
|
|
|
r = s.search_page(q, 1, 5)
|
|
assert len(r) == 10
|
|
assert r.pagecount == 2
|
|
|
|
r = s.search_page(q, 1, 5)
|
|
assert len(r) == 10
|
|
assert r.pagecount == 2
|
|
|
|
r = s.search_page(q, 2, 5)
|
|
assert len(r) == 10
|
|
assert r.pagecount == 2
|
|
assert r.pagenum == 2
|
|
|
|
r = s.search_page(q, 1, 10)
|
|
assert len(r) == 10
|
|
assert r.pagecount == 1
|
|
assert r.pagenum == 1
|
|
|
|
|
|
def test_resultspage():
|
|
schema = fields.Schema(id=fields.STORED, content=fields.TEXT(stored=True))
|
|
ix = RamStorage().create_index(schema)
|
|
|
|
domain = ("alfa", "bravo", "bravo", "charlie", "delta")
|
|
w = ix.writer()
|
|
for i, lst in enumerate(permutations(domain, 3)):
|
|
w.add_document(id=text_type(i), content=u(" ").join(lst))
|
|
w.commit()
|
|
|
|
with ix.searcher() as s:
|
|
q = query.Term("content", u("bravo"))
|
|
r = s.search(q, limit=10)
|
|
tops = list(r)
|
|
|
|
rp = s.search_page(q, 1, pagelen=5)
|
|
assert rp.scored_length() == 5
|
|
assert list(rp) == tops[0:5]
|
|
assert rp[10:] == []
|
|
|
|
rp = s.search_page(q, 2, pagelen=5)
|
|
assert list(rp) == tops[5:10]
|
|
|
|
rp = s.search_page(q, 1, pagelen=10)
|
|
assert len(rp) == 54
|
|
assert rp.pagecount == 6
|
|
rp = s.search_page(q, 6, pagelen=10)
|
|
assert len(list(rp)) == 4
|
|
assert rp.is_last_page()
|
|
|
|
with pytest.raises(ValueError):
|
|
s.search_page(q, 0)
|
|
assert s.search_page(q, 10).pagenum == 6
|
|
|
|
rp = s.search_page(query.Term("content", "glonk"), 1)
|
|
assert len(rp) == 0
|
|
assert rp.is_last_page()
|
|
|
|
|
|
def test_highlight_setters():
|
|
schema = fields.Schema(text=fields.TEXT)
|
|
ix = RamStorage().create_index(schema)
|
|
w = ix.writer()
|
|
w.add_document(text=u("Hello"))
|
|
w.commit()
|
|
|
|
r = ix.searcher().search(query.Term("text", "hello"))
|
|
hl = highlight.Highlighter()
|
|
ucf = highlight.UppercaseFormatter()
|
|
r.highlighter = hl
|
|
r.formatter = ucf
|
|
assert hl.formatter is ucf
|
|
|
|
|
|
def test_snippets():
|
|
ana = analysis.StemmingAnalyzer()
|
|
schema = fields.Schema(text=fields.TEXT(stored=True, analyzer=ana))
|
|
ix = RamStorage().create_index(schema)
|
|
w = ix.writer()
|
|
w.add_document(text=u("Lay out the rough animation by creating the important poses where they occur on the timeline."))
|
|
w.add_document(text=u("Set key frames on everything that's key-able. This is for control and predictability: you don't want to accidentally leave something un-keyed. This is also much faster than selecting the parameters to key."))
|
|
w.add_document(text=u("Use constant (straight) or sometimes linear transitions between keyframes in the channel editor. This makes the character jump between poses."))
|
|
w.add_document(text=u("Keying everything gives quick, immediate results. But it can become difficult to tweak the animation later, especially for complex characters."))
|
|
w.add_document(text=u("Copy the current pose to create the next one: pose the character, key everything, then copy the keyframe in the playbar to another frame, and key everything at that frame."))
|
|
w.commit()
|
|
|
|
target = ["Set KEY frames on everything that's KEY-able",
|
|
"Copy the current pose to create the next one: pose the character, KEY everything, then copy the keyframe in the playbar to another frame, and KEY everything at that frame",
|
|
"KEYING everything gives quick, immediate results"]
|
|
|
|
with ix.searcher() as s:
|
|
qp = qparser.QueryParser("text", ix.schema)
|
|
q = qp.parse(u("key"))
|
|
r = s.search(q, terms=True)
|
|
r.fragmenter = highlight.SentenceFragmenter()
|
|
r.formatter = highlight.UppercaseFormatter()
|
|
|
|
assert sorted([hit.highlights("text", top=1) for hit in r]) == sorted(target)
|
|
|
|
|
|
def test_keyterms():
|
|
ana = analysis.StandardAnalyzer()
|
|
vectorformat = formats.Frequency()
|
|
schema = fields.Schema(path=fields.ID,
|
|
content=fields.TEXT(analyzer=ana,
|
|
vector=vectorformat))
|
|
st = RamStorage()
|
|
ix = st.create_index(schema)
|
|
w = ix.writer()
|
|
w.add_document(path=u("a"), content=u("This is some generic content"))
|
|
w.add_document(path=u("b"), content=u("This is some distinctive content"))
|
|
w.commit()
|
|
|
|
with ix.searcher() as s:
|
|
docnum = s.document_number(path=u("b"))
|
|
keyterms = list(s.key_terms([docnum], "content"))
|
|
assert len(keyterms) > 0
|
|
assert keyterms[0][0] == "distinctive"
|
|
|
|
r = s.search(query.Term("path", u("b")))
|
|
keyterms2 = list(r.key_terms("content"))
|
|
assert len(keyterms2) > 0
|
|
assert keyterms2[0][0] == "distinctive"
|
|
|
|
|
|
def test_lengths():
|
|
schema = fields.Schema(id=fields.STORED, text=fields.TEXT)
|
|
ix = RamStorage().create_index(schema)
|
|
|
|
w = ix.writer()
|
|
w.add_document(id=1, text=u("alfa bravo charlie delta echo"))
|
|
w.add_document(id=2, text=u("bravo charlie delta echo foxtrot"))
|
|
w.add_document(id=3, text=u("charlie needle echo foxtrot golf"))
|
|
w.add_document(id=4, text=u("delta echo foxtrot golf hotel"))
|
|
w.add_document(id=5, text=u("echo needle needle hotel india"))
|
|
w.add_document(id=6, text=u("foxtrot golf hotel india juliet"))
|
|
w.add_document(id=7, text=u("golf needle india juliet kilo"))
|
|
w.add_document(id=8, text=u("hotel india juliet needle lima"))
|
|
w.commit()
|
|
|
|
with ix.searcher() as s:
|
|
q = query.Or([query.Term("text", u("needle")), query.Term("text", u("charlie"))])
|
|
r = s.search(q, limit=2)
|
|
assert not r.has_exact_length()
|
|
assert r.estimated_length() == 7
|
|
assert r.estimated_min_length() == 3
|
|
assert r.scored_length() == 2
|
|
assert len(r) == 6
|
|
|
|
|
|
def test_lengths2():
|
|
schema = fields.Schema(text=fields.TEXT(stored=True))
|
|
ix = RamStorage().create_index(schema)
|
|
count = 0
|
|
for _ in xrange(3):
|
|
w = ix.writer()
|
|
for ls in permutations(u("alfa bravo charlie").split()):
|
|
if "bravo" in ls and "charlie" in ls:
|
|
count += 1
|
|
w.add_document(text=u(" ").join(ls))
|
|
w.commit(merge=False)
|
|
|
|
with ix.searcher() as s:
|
|
q = query.Or([query.Term("text", u("bravo")), query.Term("text", u("charlie"))])
|
|
r = s.search(q, limit=None)
|
|
assert len(r) == count
|
|
|
|
r = s.search(q, limit=3)
|
|
assert len(r) == count
|
|
|
|
|
|
def test_stability():
|
|
schema = fields.Schema(text=fields.TEXT)
|
|
ix = RamStorage().create_index(schema)
|
|
domain = u("alfa bravo charlie delta").split()
|
|
w = ix.writer()
|
|
for ls in permutations(domain, 3):
|
|
w.add_document(text=u(" ").join(ls))
|
|
w.commit()
|
|
|
|
with ix.searcher() as s:
|
|
q = query.Term("text", u("bravo"))
|
|
last = []
|
|
for i in xrange(s.doc_frequency("text", u("bravo"))):
|
|
# Only un-optimized results are stable
|
|
r = s.search(q, limit=i + 1, optimize=False)
|
|
docnums = [hit.docnum for hit in r]
|
|
assert docnums[:-1] == last
|
|
last = docnums
|
|
|
|
|
|
def test_terms():
|
|
schema = fields.Schema(text=fields.TEXT(stored=True))
|
|
ix = RamStorage().create_index(schema)
|
|
w = ix.writer()
|
|
w.add_document(text=u("alfa sierra tango"))
|
|
w.add_document(text=u("bravo charlie delta"))
|
|
w.add_document(text=u("charlie delta echo"))
|
|
w.add_document(text=u("delta echo foxtrot"))
|
|
w.commit()
|
|
|
|
qp = qparser.QueryParser("text", ix.schema)
|
|
q = qp.parse(u("(bravo AND charlie) OR foxtrot OR missing"))
|
|
r = ix.searcher().search(q, terms=True)
|
|
|
|
fieldobj = schema["text"]
|
|
|
|
def txts(tset):
|
|
return sorted(fieldobj.from_bytes(t[1]) for t in tset)
|
|
|
|
assert txts(r.matched_terms()) == ["bravo", "charlie", "foxtrot"]
|
|
for hit in r:
|
|
value = hit["text"]
|
|
for txt in txts(hit.matched_terms()):
|
|
assert txt in value
|
|
|
|
|
|
def test_hit_column():
|
|
# Not stored
|
|
schema = fields.Schema(text=fields.TEXT())
|
|
ix = RamStorage().create_index(schema)
|
|
with ix.writer() as w:
|
|
w.add_document(text=u("alfa bravo charlie"))
|
|
|
|
with ix.searcher() as s:
|
|
r = s.search(query.Term("text", "alfa"))
|
|
assert len(r) == 1
|
|
hit = r[0]
|
|
with pytest.raises(KeyError):
|
|
_ = hit["text"]
|
|
|
|
# With column
|
|
schema = fields.Schema(text=fields.TEXT(sortable=True))
|
|
ix = RamStorage().create_index(schema)
|
|
with ix.writer(codec=W3Codec()) as w:
|
|
w.add_document(text=u("alfa bravo charlie"))
|
|
|
|
with ix.searcher() as s:
|
|
r = s.search(query.Term("text", "alfa"))
|
|
assert len(r) == 1
|
|
hit = r[0]
|
|
assert hit["text"] == u("alfa bravo charlie")
|
|
|
|
|
|
def test_closed_searcher():
|
|
from whoosh.reading import ReaderClosed
|
|
|
|
schema = fields.Schema(key=fields.KEYWORD(stored=True, sortable=True))
|
|
|
|
with TempStorage() as st:
|
|
ix = st.create_index(schema)
|
|
with ix.writer() as w:
|
|
w.add_document(key=u"alfa")
|
|
w.add_document(key=u"bravo")
|
|
w.add_document(key=u"charlie")
|
|
w.add_document(key=u"delta")
|
|
w.add_document(key=u"echo")
|
|
|
|
s = ix.searcher()
|
|
r = s.search(query.TermRange("key", "b", "d"))
|
|
s.close()
|
|
assert s.is_closed
|
|
with pytest.raises(ReaderClosed):
|
|
assert r[0]["key"] == "bravo"
|
|
with pytest.raises(ReaderClosed):
|
|
s.reader().column_reader("key")
|
|
with pytest.raises(ReaderClosed):
|
|
s.suggest("key", "brovo")
|
|
|
|
s = ix.searcher()
|
|
r = s.search(query.TermRange("key", "b", "d"))
|
|
assert r[0]
|
|
assert r[0]["key"] == "bravo"
|
|
c = s.reader().column_reader("key")
|
|
assert c[1] == "bravo"
|
|
assert s.suggest("key", "brovo") == ["bravo"]
|
|
|
|
|
|
def test_paged_highlights():
|
|
schema = fields.Schema(text=fields.TEXT(stored=True))
|
|
ix = RamStorage().create_index(schema)
|
|
with ix.writer() as w:
|
|
w.add_document(text=u("alfa bravo charlie delta echo foxtrot"))
|
|
w.add_document(text=u("bravo charlie delta echo foxtrot golf"))
|
|
w.add_document(text=u("charlie delta echo foxtrot golf hotel"))
|
|
w.add_document(text=u("delta echo foxtrot golf hotel india"))
|
|
w.add_document(text=u("echo foxtrot golf hotel india juliet"))
|
|
w.add_document(text=u("foxtrot golf hotel india juliet kilo"))
|
|
|
|
with ix.searcher() as s:
|
|
q = query.Term("text", u("alfa"))
|
|
page = s.search_page(q, 1, pagelen=3)
|
|
|
|
page.results.fragmenter = highlight.WholeFragmenter()
|
|
page.results.formatter = highlight.UppercaseFormatter()
|
|
hi = page[0].highlights("text")
|
|
assert hi == u("ALFA bravo charlie delta echo foxtrot")
|
|
|
|
|
|
def test_phrase_keywords():
|
|
schema = fields.Schema(text=fields.TEXT(stored=True))
|
|
ix = RamStorage().create_index(schema)
|
|
with ix.writer() as w:
|
|
w.add_document(text=u("alfa bravo charlie delta"))
|
|
w.add_document(text=u("bravo charlie delta echo"))
|
|
w.add_document(text=u("charlie delta echo foxtrot"))
|
|
w.add_document(text=u("delta echo foxtrot alfa"))
|
|
w.add_document(text=u("echo foxtrot alfa bravo"))
|
|
|
|
with ix.searcher() as s:
|
|
q = query.Phrase("text", u("alfa bravo").split())
|
|
r = s.search(q)
|
|
assert len(r) == 2
|
|
kts = " ".join(t for t, score in r.key_terms("text"))
|
|
assert kts == "alfa bravo charlie foxtrot delta"
|
|
|
|
|
|
def test_every_keywords():
|
|
schema = fields.Schema(title=fields.TEXT, content=fields.TEXT(stored=True))
|
|
ix = RamStorage().create_index(schema)
|
|
with ix.writer() as w:
|
|
w.add_document(title=u("alfa"), content=u("bravo"))
|
|
w.add_document(title=u("charlie"), content=u("delta"))
|
|
|
|
with ix.searcher() as s:
|
|
q = qparser.QueryParser("content", ix.schema).parse("*")
|
|
assert isinstance(q, query.Every)
|
|
|
|
r = s.search(q, terms=True)
|
|
assert len(r) == 2
|
|
hit = r[0]
|
|
assert hit["content"] == "bravo"
|
|
assert hit.highlights("content") == ""
|
|
|
|
|
|
def test_filter_by_result():
|
|
schema = fields.Schema(title=fields.TEXT(stored=True),
|
|
content=fields.TEXT(stored=True))
|
|
|
|
with TempIndex(schema, "filter") as ix:
|
|
words = u("foo bar baz qux barney").split()
|
|
with ix.writer() as w:
|
|
for x in xrange(100):
|
|
t = u("even" if x % 2 == 0 else "odd")
|
|
c = words[x % len(words)]
|
|
w.add_document(title=t, content=c)
|
|
|
|
with ix.searcher() as searcher:
|
|
fq = query.Term("title", "even")
|
|
filter_result = searcher.search(fq)
|
|
assert filter_result.docset is None
|
|
|
|
q = query.Term("content", "foo")
|
|
|
|
# filter_result.docs()
|
|
result = searcher.search(q, filter=filter_result)
|
|
assert all(x["title"] == "even" and x["content"] == "foo"
|
|
for x in result)
|
|
|